名称: senior-data-engineer
描述: 用于构建可扩展数据管道、ETL/ELT系统和数据基础设施的数据工程技能。精通 Python、SQL、Spark、Airflow、dbt、Kafka 和现代数据栈。涵盖数据建模、管道编排、数据质量和 DataOps。适用于设计数据架构、构建数据管道、优化数据工作流、实施数据治理或排查数据问题。
用于构建可扩展、可靠数据系统的生产级数据工程技能。
当您看到以下情况时,激活此技能:
管道设计:
* "为...设计一个数据管道"
* "构建一个 ETL/ELT 流程..."
* "我应该如何从...摄取数据"
* "设置从...的数据提取"
架构:
* "我应该使用批处理还是流处理?"
* "Lambda 与 Kappa 架构"
* "如何处理迟到数据"
* "设计一个数据湖仓"
数据建模:
* "创建一个维度模型..."
* "星型模式 vs 雪花模式"
* "实现缓慢变化维度"
* "设计数据仓库"
数据质量:
* "为...添加数据验证"
* "设置数据质量检查"
* "监控数据新鲜度"
* "实施数据契约"
性能:
* "优化这个 Spark 作业"
* "查询运行缓慢"
* "减少管道执行时间"
* "调优 Airflow DAG"
# 生成管道编排配置
python scripts/pipeline_orchestrator.py generate \
--type airflow \
--source postgres \
--destination snowflake \
--schedule "0 5 * * *"
# 验证数据质量
python scripts/data_quality_validator.py validate \
--input data/sales.parquet \
--schema schemas/sales.json \
--checks freshness,completeness,uniqueness
# 优化 ETL 性能
python scripts/etl_performance_optimizer.py analyze \
--query queries/daily_aggregation.sql \
--engine spark \
--recommend
场景: 从 PostgreSQL 提取数据,使用 dbt 转换,加载到 Snowflake。
-- 记录源表结构
SELECT
table_name,
column_name,
data_type,
is_nullable
FROM information_schema.columns
WHERE table_schema = 'source_schema'
ORDER BY table_name, ordinal_position;
python scripts/pipeline_orchestrator.py generate \
--type airflow \
--source postgres \
--tables orders,customers,products \
--mode incremental \
--watermark updated_at \
--output dags/extract_source.py
-- models/staging/stg_orders.sql
WITH source AS (
SELECT * FROM {{ source('postgres', 'orders') }}
),
renamed AS (
SELECT
order_id,
customer_id,
order_date,
total_amount,
status,
_extracted_at
FROM source
WHERE order_date >= DATEADD(day, -3, CURRENT_DATE)
)
SELECT * FROM renamed
-- models/marts/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
cluster_by=['order_date']
)
}}
SELECT
o.order_id,
o.customer_id,
c.customer_segment,
o.order_date,
o.total_amount,
o.status
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c
ON o.customer_id = c.customer_id
{% if is_incremental() %}
WHERE o._extracted_at > (SELECT MAX(_extracted_at) FROM {{ this }})
{% endif %}
# models/marts/schema.yml
**版本:** 2
models:
- name: fct_orders
description: "订单事实表"
columns:
- name: order_id
tests:
- unique
- not_null
- name: total_amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 1000000
- name: order_date
tests:
- not_null
- dbt_utils.recency:
datepart: day
field: order_date
interval: 1
# dags/daily_etl.py
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email': ['data-alerts@company.com'],
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'daily_etl_pipeline',
default_args=default_args,
description='从 PostgreSQL 到 Snowflake 的每日 ETL',
schedule_interval='0 5 * * *',
start_date=days_ago(1),
catchup=False,
tags=['etl', 'daily'],
) as dag:
extract = BashOperator(
task_id='extract_source_data',
bash_command='python /opt/airflow/scripts/extract.py --date {{ ds }}',
)
transform = BashOperator(
task_id='run_dbt_models',
bash_command='cd /opt/airflow/dbt && dbt run --select marts.*',
)
test = BashOperator(
task_id='run_dbt_tests',
bash_command='cd /opt/airflow/dbt && dbt test --select marts.*',
)
notify = BashOperator(
task_id='send_notification',
bash_command='python /opt/airflow/scripts/notify.py --status success',
trigger_rule='all_success',
)
extract >> transform >> test >> notify
# 本地测试
dbt run --select stg_orders fct_orders
dbt test --select fct_orders
# 验证数据质量
python scripts/data_quality_validator.py validate \
--table fct_orders \
--checks all \
--output reports/quality_report.json
场景: 从 Kafka 流式传输事件,使用 Flink/Spark Streaming 处理,下沉到数据湖。
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "UserEvent",
"type": "object",
"required": ["event_id", "user_id", "event_type", "timestamp"],
"properties": {
"event_id": {"type": "string", "format": "uuid"},
"user_id": {"type": "string"},
"event_type": {"type": "string", "enum": ["page_view", "click", "purchase"]},
"timestamp": {"type": "string", "format": "date-time"},
"properties": {"type": "object"}
}
}
# 使用适当的分区数创建主题
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic user-events \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config cleanup.policy=delete
# 验证主题
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic user-events
# streaming/user_events_processor.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
from_json, col, window, count, avg,
to_timestamp, current_timestamp
)
from pyspark.sql.types import (
StructType, StructField, StringType,
TimestampType, MapType
)
# 初始化 Spark
spark = SparkSession.builder \
.appName("UserEventsProcessor") \
.config("spark.sql.streaming.checkpointLocation", "/checkpoints/user-events") \
.config("spark.sql.shuffle.partitions", "12") \
.getOrCreate()
# 定义模式
event_schema = StructType([
StructField("event_id", StringType(), False),
StructField("user_id", StringType(), False),
StructField("event_type", StringType(), False),
StructField("timestamp", StringType(), False),
StructField("properties", MapType(StringType(), StringType()), True)
])
# 从 Kafka 读取
events_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "user-events") \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.load()
# 解析 JSON
parsed_df = events_df \
.select(from_json(col("value").cast("string"), event_schema).alias("data")) \
.select("data.*") \
.withColumn("event_timestamp", to_timestamp(col("timestamp")))
# 窗口聚合
aggregated_df = parsed_df \
.withWatermark("event_timestamp", "10 minutes") \
.groupBy(
window(col("event_timestamp"), "5 minutes"),
col("event_type")
) \
.agg(
count("*").alias("event_count"),
approx_count_distinct("user_id").alias("unique_users")
)
# 写入 Delta Lake
query = aggregated_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/user-events-aggregated") \
.option("path", "/data/lake/user_events_aggregated") \
.trigger(processingTime="1 minute") \
.start()
query.awaitTermination()
# 用于失败记录的 Dead letter queue
from pyspark.sql.functions import current_timestamp, lit
def process_with_error_handling(batch_df, batch_id):
try:
# 尝试处理
valid_df = batch_df.filter(col("event_id").isNotNull())
invalid_df = batch_df.filter(col("event_id").isNull())
# 写入有效记录
valid_df.write \
.format("delta") \
.mode("append") \
.save("/data/lake/user_events")
# 将无效记录写入 DLQ
if invalid_df.count() > 0:
invalid_df \
.withColumn("error_timestamp", current_timestamp()) \
.withColumn("error_reason", lit("missing_event_id")) \
.write \
.format("delta") \
.mode("append") \
.save("/data/lake/dlq/user_events")
except Exception as e:
# 记录错误、告警、继续
logger.error(f"Batch {batch_id} failed: {e}")
raise
# 使用 foreachBatch 进行自定义处理
query = parsed_df.writeStream \
.foreachBatch(process_with_error_handling) \
.option("checkpointLocation", "/checkpoints/user-events") \
.start()
# monitoring/stream_metrics.py
from prometheus_client import Gauge, Counter, start_http_server
# 定义指标
RECORDS_PROCESSED = Counter(
'stream_records_processed_total',
'已处理记录总数',
['stream_name', 'status']
)
PROCESSING_LAG = Gauge(
'stream_processing_lag_seconds',
'当前处理延迟',
['stream_name']
)
BATCH_DURATION = Gauge(
'stream_batch_duration_seconds',
'上一批次处理时长',
['stream_name']
)
def emit_metrics(query):
"""从流式查询中发出 Prometheus 指标。"""
progress = query.lastProgress
if progress:
RECORDS_PROCESSED.labels(
stream_name='user-events',
status='success'
).inc(progress['numInputRows'])
if progress['sources']:
# 根据最新偏移量计算延迟
for source in progress['sources']:
end_offset = source.get('endOffset', {})
# 解析 Kafka 偏移量并计算延迟
场景: 使用 Great Expectations 实施全面的数据质量监控。
# 安装并初始化
pip install great_expectations
great_expectations init
# 连接到数据源
great_expectations datasource new
# expectations/orders_suite.py
import great_expectations as gx
context = gx.get_context()
# 创建期望套件
suite = context.add_expectation_suite("orders_quality_suite")
# 添加期望
validator = context.get_validator(
batch_request={
"datasource_name": "warehouse",
"data_asset_name": "orders",
},
expectation_suite_name="orders_quality_suite"
)
# 模式期望
validator.expect_table_columns_to_match_ordered_list(
column_list=[
"order_id", "customer_id", "order_date",
"total_amount", "status", "created_at"
]
)
# 完整性期望
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_not_be_null("order_date")
# 唯一性期望
validator.expect_column_values_to_be_unique("order_id")
# 范围期望
validator.expect_column_values_to_be_between(
"total_amount",
min_value=0,
max_value=1000000
)
# 分类期望
validator.expect_column_values_to_be_in_set(
"status",
["pending", "confirmed", "shipped", "delivered", "cancelled"]
)
# 新鲜度期望
validator.expect_column_max_to_be_between(
"order_date",
min_value={"$PARAMETER": "now - timedelta(days=1)"},
max_value={"$PARAMETER": "now"}
)
# 参照完整性
validator.expect_column_values_to_be_in_set(
"customer_id",
value_set={"$PARAMETER": "valid_customer_ids"}
)
validator.save_expectation_suite(discard_failed_expectations=False)
# models/marts/schema.yml
**版本:** 2
models:
- name: fct_orders
description: "包含数据质量检查的订单事实表"
tests:
# 行数检查
- dbt_utils.equal_rowcount:
compare_model: ref('stg_orders')
# 新鲜度检查
- dbt_utils.recency:
datepart: hour
field: created_at
interval: 24
columns:
- name: order_id
description: "唯一订单标识符"
tests:
- unique
- not_null
- relationships:
to: ref('dim_orders')
field: order_id
- name: total_amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 1000000
inclusive: true
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
row_condition: "status != 'cancelled'"
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
severity: warn
```yaml
contract:
name: orders_data_contract
version: "1.0.0"
owner: data-team@company.com
schema:
type: object
properties:
order_id:
type: string
format: uuid
description: "唯一订单标识符"
customer_id:
type: string
not_null: true
order_date:
type: date
not_null: true
total_amount:
type: decimal
precision: 10
scale: 2
minimum: 0
status:
type: string
enum: ["pending", "confirmed", "shipped", "delivered", "cancelled"]
sla:
freshness:
max_delay_hours: 1
completeness:
min_percentage: 99.9
accuracy:
duplicate_tolerance: 0.01
consumers:
- name: analytics-team
usage: "每日报告仪表板"
- name: ml