OA0 = Omni AI 0
OA0 是一个探索 AI 的论坛
现在注册
已注册用户请  登录
OA0  ›  技能包  ›  senior-data-engineer:用于构建可扩展数据管道的高级数据工程技能技能

senior-data-engineer:用于构建可扩展数据管道的高级数据工程技能技能

 
  cloud ·  2026-02-25 07:02:46 · 2 次点击  · 0 条评论  

名称: senior-data-engineer
描述: 用于构建可扩展数据管道、ETL/ELT系统和数据基础设施的数据工程技能。精通 Python、SQL、Spark、Airflow、dbt、Kafka 和现代数据栈。涵盖数据建模、管道编排、数据质量和 DataOps。适用于设计数据架构、构建数据管道、优化数据工作流、实施数据治理或排查数据问题。


高级数据工程师

用于构建可扩展、可靠数据系统的生产级数据工程技能。

目录

  1. 触发短语
  2. 快速开始
  3. 工作流
  4. 架构决策框架
  5. 技术栈
  6. 参考文档
  7. 故障排除

触发短语

当您看到以下情况时,激活此技能:

管道设计:
* "为...设计一个数据管道"
* "构建一个 ETL/ELT 流程..."
* "我应该如何从...摄取数据"
* "设置从...的数据提取"

架构:
* "我应该使用批处理还是流处理?"
* "Lambda 与 Kappa 架构"
* "如何处理迟到数据"
* "设计一个数据湖仓"

数据建模:
* "创建一个维度模型..."
* "星型模式 vs 雪花模式"
* "实现缓慢变化维度"
* "设计数据仓库"

数据质量:
* "为...添加数据验证"
* "设置数据质量检查"
* "监控数据新鲜度"
* "实施数据契约"

性能:
* "优化这个 Spark 作业"
* "查询运行缓慢"
* "减少管道执行时间"
* "调优 Airflow DAG"


快速开始

核心工具

# 生成管道编排配置
python scripts/pipeline_orchestrator.py generate \
  --type airflow \
  --source postgres \
  --destination snowflake \
  --schedule "0 5 * * *"

# 验证数据质量
python scripts/data_quality_validator.py validate \
  --input data/sales.parquet \
  --schema schemas/sales.json \
  --checks freshness,completeness,uniqueness

# 优化 ETL 性能
python scripts/etl_performance_optimizer.py analyze \
  --query queries/daily_aggregation.sql \
  --engine spark \
  --recommend

工作流

工作流 1:构建批处理 ETL 管道

场景: 从 PostgreSQL 提取数据,使用 dbt 转换,加载到 Snowflake。

步骤 1:定义源模式

-- 记录源表结构
SELECT
    table_name,
    column_name,
    data_type,
    is_nullable
FROM information_schema.columns
WHERE table_schema = 'source_schema'
ORDER BY table_name, ordinal_position;

步骤 2:生成提取配置

python scripts/pipeline_orchestrator.py generate \
  --type airflow \
  --source postgres \
  --tables orders,customers,products \
  --mode incremental \
  --watermark updated_at \
  --output dags/extract_source.py

步骤 3:创建 dbt 模型

-- models/staging/stg_orders.sql
WITH source AS (
    SELECT * FROM {{ source('postgres', 'orders') }}
),

renamed AS (
    SELECT
        order_id,
        customer_id,
        order_date,
        total_amount,
        status,
        _extracted_at
    FROM source
    WHERE order_date >= DATEADD(day, -3, CURRENT_DATE)
)

SELECT * FROM renamed
-- models/marts/fct_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_id',
        cluster_by=['order_date']
    )
}}

SELECT
    o.order_id,
    o.customer_id,
    c.customer_segment,
    o.order_date,
    o.total_amount,
    o.status
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c
    ON o.customer_id = c.customer_id

{% if is_incremental() %}
WHERE o._extracted_at > (SELECT MAX(_extracted_at) FROM {{ this }})
{% endif %}

步骤 4:配置数据质量测试

# models/marts/schema.yml
**版本:** 2

models:
  - name: fct_orders
    description: "订单事实表"
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: total_amount
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 1000000
      - name: order_date
        tests:
          - not_null
          - dbt_utils.recency:
              datepart: day
              field: order_date
              interval: 1

步骤 5:创建 Airflow DAG

# dags/daily_etl.py
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['data-alerts@company.com'],
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'daily_etl_pipeline',
    default_args=default_args,
    description='从 PostgreSQL 到 Snowflake 的每日 ETL',
    schedule_interval='0 5 * * *',
    start_date=days_ago(1),
    catchup=False,
    tags=['etl', 'daily'],
) as dag:

    extract = BashOperator(
        task_id='extract_source_data',
        bash_command='python /opt/airflow/scripts/extract.py --date {{ ds }}',
    )

    transform = BashOperator(
        task_id='run_dbt_models',
        bash_command='cd /opt/airflow/dbt && dbt run --select marts.*',
    )

    test = BashOperator(
        task_id='run_dbt_tests',
        bash_command='cd /opt/airflow/dbt && dbt test --select marts.*',
    )

    notify = BashOperator(
        task_id='send_notification',
        bash_command='python /opt/airflow/scripts/notify.py --status success',
        trigger_rule='all_success',
    )

    extract >> transform >> test >> notify

步骤 6:验证管道

# 本地测试
dbt run --select stg_orders fct_orders
dbt test --select fct_orders

# 验证数据质量
python scripts/data_quality_validator.py validate \
  --table fct_orders \
  --checks all \
  --output reports/quality_report.json

工作流 2:实现实时流处理

场景: 从 Kafka 流式传输事件,使用 Flink/Spark Streaming 处理,下沉到数据湖。

步骤 1:定义事件模式

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "UserEvent",
  "type": "object",
  "required": ["event_id", "user_id", "event_type", "timestamp"],
  "properties": {
    "event_id": {"type": "string", "format": "uuid"},
    "user_id": {"type": "string"},
    "event_type": {"type": "string", "enum": ["page_view", "click", "purchase"]},
    "timestamp": {"type": "string", "format": "date-time"},
    "properties": {"type": "object"}
  }
}

步骤 2:创建 Kafka 主题

# 使用适当的分区数创建主题
kafka-topics.sh --create \
  --bootstrap-server localhost:9092 \
  --topic user-events \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=604800000 \
  --config cleanup.policy=delete

# 验证主题
kafka-topics.sh --describe \
  --bootstrap-server localhost:9092 \
  --topic user-events

步骤 3:实现 Spark Streaming 作业

# streaming/user_events_processor.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    from_json, col, window, count, avg,
    to_timestamp, current_timestamp
)
from pyspark.sql.types import (
    StructType, StructField, StringType,
    TimestampType, MapType
)

# 初始化 Spark
spark = SparkSession.builder \
    .appName("UserEventsProcessor") \
    .config("spark.sql.streaming.checkpointLocation", "/checkpoints/user-events") \
    .config("spark.sql.shuffle.partitions", "12") \
    .getOrCreate()

# 定义模式
event_schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("user_id", StringType(), False),
    StructField("event_type", StringType(), False),
    StructField("timestamp", StringType(), False),
    StructField("properties", MapType(StringType(), StringType()), True)
])

# 从 Kafka 读取
events_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "user-events") \
    .option("startingOffsets", "latest") \
    .option("failOnDataLoss", "false") \
    .load()

# 解析 JSON
parsed_df = events_df \
    .select(from_json(col("value").cast("string"), event_schema).alias("data")) \
    .select("data.*") \
    .withColumn("event_timestamp", to_timestamp(col("timestamp")))

# 窗口聚合
aggregated_df = parsed_df \
    .withWatermark("event_timestamp", "10 minutes") \
    .groupBy(
        window(col("event_timestamp"), "5 minutes"),
        col("event_type")
    ) \
    .agg(
        count("*").alias("event_count"),
        approx_count_distinct("user_id").alias("unique_users")
    )

# 写入 Delta Lake
query = aggregated_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/user-events-aggregated") \
    .option("path", "/data/lake/user_events_aggregated") \
    .trigger(processingTime="1 minute") \
    .start()

query.awaitTermination()

步骤 4:处理迟到数据和错误

# 用于失败记录的 Dead letter queue
from pyspark.sql.functions import current_timestamp, lit

def process_with_error_handling(batch_df, batch_id):
    try:
        # 尝试处理
        valid_df = batch_df.filter(col("event_id").isNotNull())
        invalid_df = batch_df.filter(col("event_id").isNull())

        # 写入有效记录
        valid_df.write \
            .format("delta") \
            .mode("append") \
            .save("/data/lake/user_events")

        # 将无效记录写入 DLQ
        if invalid_df.count() > 0:
            invalid_df \
                .withColumn("error_timestamp", current_timestamp()) \
                .withColumn("error_reason", lit("missing_event_id")) \
                .write \
                .format("delta") \
                .mode("append") \
                .save("/data/lake/dlq/user_events")

    except Exception as e:
        # 记录错误、告警、继续
        logger.error(f"Batch {batch_id} failed: {e}")
        raise

# 使用 foreachBatch 进行自定义处理
query = parsed_df.writeStream \
    .foreachBatch(process_with_error_handling) \
    .option("checkpointLocation", "/checkpoints/user-events") \
    .start()

步骤 5:监控流健康状态

# monitoring/stream_metrics.py
from prometheus_client import Gauge, Counter, start_http_server

# 定义指标
RECORDS_PROCESSED = Counter(
    'stream_records_processed_total',
    '已处理记录总数',
    ['stream_name', 'status']
)

PROCESSING_LAG = Gauge(
    'stream_processing_lag_seconds',
    '当前处理延迟',
    ['stream_name']
)

BATCH_DURATION = Gauge(
    'stream_batch_duration_seconds',
    '上一批次处理时长',
    ['stream_name']
)

def emit_metrics(query):
    """从流式查询中发出 Prometheus 指标。"""
    progress = query.lastProgress
    if progress:
        RECORDS_PROCESSED.labels(
            stream_name='user-events',
            status='success'
        ).inc(progress['numInputRows'])

        if progress['sources']:
            # 根据最新偏移量计算延迟
            for source in progress['sources']:
                end_offset = source.get('endOffset', {})
                # 解析 Kafka 偏移量并计算延迟

工作流 3:数据质量框架设置

场景: 使用 Great Expectations 实施全面的数据质量监控。

步骤 1:初始化 Great Expectations

# 安装并初始化
pip install great_expectations

great_expectations init

# 连接到数据源
great_expectations datasource new

步骤 2:创建期望套件

# expectations/orders_suite.py
import great_expectations as gx

context = gx.get_context()

# 创建期望套件
suite = context.add_expectation_suite("orders_quality_suite")

# 添加期望
validator = context.get_validator(
    batch_request={
        "datasource_name": "warehouse",
        "data_asset_name": "orders",
    },
    expectation_suite_name="orders_quality_suite"
)

# 模式期望
validator.expect_table_columns_to_match_ordered_list(
    column_list=[
        "order_id", "customer_id", "order_date",
        "total_amount", "status", "created_at"
    ]
)

# 完整性期望
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_not_be_null("order_date")

# 唯一性期望
validator.expect_column_values_to_be_unique("order_id")

# 范围期望
validator.expect_column_values_to_be_between(
    "total_amount",
    min_value=0,
    max_value=1000000
)

# 分类期望
validator.expect_column_values_to_be_in_set(
    "status",
    ["pending", "confirmed", "shipped", "delivered", "cancelled"]
)

# 新鲜度期望
validator.expect_column_max_to_be_between(
    "order_date",
    min_value={"$PARAMETER": "now - timedelta(days=1)"},
    max_value={"$PARAMETER": "now"}
)

# 参照完整性
validator.expect_column_values_to_be_in_set(
    "customer_id",
    value_set={"$PARAMETER": "valid_customer_ids"}
)

validator.save_expectation_suite(discard_failed_expectations=False)

步骤 3:使用 dbt 创建数据质量检查

# models/marts/schema.yml
**版本:** 2

models:
  - name: fct_orders
    description: "包含数据质量检查的订单事实表"

    tests:
      # 行数检查
      - dbt_utils.equal_rowcount:
          compare_model: ref('stg_orders')

      # 新鲜度检查
      - dbt_utils.recency:
          datepart: hour
          field: created_at
          interval: 24

    columns:
      - name: order_id
        description: "唯一订单标识符"
        tests:
          - unique
          - not_null
          - relationships:
              to: ref('dim_orders')
              field: order_id

      - name: total_amount
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 1000000
              inclusive: true
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0
              row_condition: "status != 'cancelled'"

      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id
              severity: warn

步骤 4:实施数据契约

```yaml

contracts/orders_contract.yaml

contract:
name: orders_data_contract
version: "1.0.0"
owner: data-team@company.com

schema:
type: object
properties:
order_id:
type: string
format: uuid
description: "唯一订单标识符"
customer_id:
type: string
not_null: true
order_date:
type: date
not_null: true
total_amount:
type: decimal
precision: 10
scale: 2
minimum: 0
status:
type: string
enum: ["pending", "confirmed", "shipped", "delivered", "cancelled"]

sla:
freshness:
max_delay_hours: 1
completeness:
min_percentage: 99.9
accuracy:
duplicate_tolerance: 0.01

consumers:
- name: analytics-team
usage: "每日报告仪表板"
- name: ml

2 次点击  ∙  0 人收藏  
登录后收藏  
目前尚无回复
0 条回复
About   ·   Help   ·    
OA0 - Omni AI 0 一个探索 AI 的社区
沪ICP备2024103595号-2
Developed with Cursor