名称: "data-lineage-tracker"
描述: "追踪数据在建筑系统中的来源、转换与流向。对审计追踪、合规性和数据问题调试至关重要。"
主页: "https://datadrivenconstruction.io"
元数据: {"openclaw": {"emoji": "✔️", "os": ["darwin", "linux", "win32"], "homepage": "https://datadrivenconstruction.io", "requires": {"bins": ["python3"]}}}
追踪建筑数据在系统中的来源、转换与流向。提供合规性审计追踪,帮助调试数据问题,并确保数据治理。
建筑项目需要数据可追溯性:
- 审计合规:了解每个数据的来源
- 问题定位:追踪数据问题至其源头
- 变更影响:理解下游系统所受影响
- 法规要求:为法律/保险目的维护数据溯源
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Set
from datetime import datetime
from enum import Enum
import json
import hashlib
import uuid
class TransformationType(Enum):
EXTRACT = "extract"
TRANSFORM = "transform"
LOAD = "load"
AGGREGATE = "aggregate"
JOIN = "join"
FILTER = "filter"
CALCULATE = "calculate"
MANUAL_EDIT = "manual_edit"
IMPORT = "import"
EXPORT = "export"
@dataclass
class DataSource:
id: str
name: str
system: str
location: str
owner: str
created_at: datetime
@dataclass
class TransformationStep:
id: str
transformation_type: TransformationType
description: str
input_entities: List[str]
output_entities: List[str]
logic: str # SQL、Python 或描述
performed_by: str # 用户或系统
performed_at: datetime
parameters: Dict[str, Any] = field(default_factory=dict)
@dataclass
class DataEntity:
id: str
name: str
source_id: str
entity_type: str # 表、文件、字段、记录
created_at: datetime
version: int = 1
checksum: Optional[str] = None
parent_entities: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class LineageRecord:
id: str
entity_id: str
transformation_id: str
upstream_entities: List[str]
downstream_entities: List[str]
recorded_at: datetime
class ConstructionDataLineageTracker:
"""追踪建筑数据流的血缘关系。"""
def __init__(self, project_id: str):
self.project_id = project_id
self.sources: Dict[str, DataSource] = {}
self.entities: Dict[str, DataEntity] = {}
self.transformations: Dict[str, TransformationStep] = {}
self.lineage_records: List[LineageRecord] = []
def register_source(self, name: str, system: str, location: str, owner: str) -> DataSource:
"""注册新数据源。"""
source = DataSource(
id=f"SRC-{uuid.uuid4().hex[:8]}",
name=name,
system=system,
location=location,
owner=owner,
created_at=datetime.now()
)
self.sources[source.id] = source
return source
def register_entity(self, name: str, source_id: str, entity_type: str,
parent_entities: List[str] = None,
metadata: Dict = None) -> DataEntity:
"""注册数据实体(表、文件、字段)。"""
entity = DataEntity(
id=f"ENT-{uuid.uuid4().hex[:8]}",
name=name,
source_id=source_id,
entity_type=entity_type,
created_at=datetime.now(),
parent_entities=parent_entities or [],
metadata=metadata or {}
)
self.entities[entity.id] = entity
return entity
def calculate_checksum(self, data: Any) -> str:
"""计算数据校验和。"""
if isinstance(data, str):
content = data
else:
content = json.dumps(data, sort_keys=True, default=str)
return hashlib.sha256(content.encode()).hexdigest()[:16]
def record_transformation(self,
transformation_type: TransformationType,
description: str,
input_entities: List[str],
output_entities: List[str],
logic: str,
performed_by: str,
parameters: Dict = None) -> TransformationStep:
"""记录数据转换。"""
transformation = TransformationStep(
id=f"TRF-{uuid.uuid4().hex[:8]}",
transformation_type=transformation_type,
description=description,
input_entities=input_entities,
output_entities=output_entities,
logic=logic,
performed_by=performed_by,
performed_at=datetime.now(),
parameters=parameters or {}
)
self.transformations[transformation.id] = transformation
# 创建血缘记录
for output_id in output_entities:
record = LineageRecord(
id=f"LIN-{uuid.uuid4().hex[:8]}",
entity_id=output_id,
transformation_id=transformation.id,
upstream_entities=input_entities,
downstream_entities=[],
recorded_at=datetime.now()
)
self.lineage_records.append(record)
# 更新输入实体的下游引用
for input_id in input_entities:
for existing_record in self.lineage_records:
if existing_record.entity_id == input_id:
existing_record.downstream_entities.append(output_id)
return transformation
def trace_upstream(self, entity_id: str, depth: int = None) -> List[Dict]:
"""追踪实体的所有上游来源。"""
visited = set()
lineage = []
def trace(eid: str, current_depth: int):
if eid in visited:
return
if depth is not None and current_depth > depth:
return
visited.add(eid)
entity = self.entities.get(eid)
if not entity:
return
# 查找生成此实体的转换
for record in self.lineage_records:
if record.entity_id == eid:
transformation = self.transformations.get(record.transformation_id)
if transformation:
lineage.append({
'entity': entity.name,
'entity_id': eid,
'depth': current_depth,
'transformation': transformation.description,
'transformation_type': transformation.transformation_type.value,
'performed_at': transformation.performed_at.isoformat(),
'performed_by': transformation.performed_by,
'upstream': record.upstream_entities
})
for upstream_id in record.upstream_entities:
trace(upstream_id, current_depth + 1)
trace(entity_id, 0)
return sorted(lineage, key=lambda x: x['depth'])
def trace_downstream(self, entity_id: str, depth: int = None) -> List[Dict]:
"""追踪实体的所有下游依赖。"""
visited = set()
dependencies = []
def trace(eid: str, current_depth: int):
if eid in visited:
return
if depth is not None and current_depth > depth:
return
visited.add(eid)
entity = self.entities.get(eid)
if not entity:
return
# 查找使用此实体的实体
for record in self.lineage_records:
if eid in record.upstream_entities:
transformation = self.transformations.get(record.transformation_id)
if transformation:
dependencies.append({
'entity': self.entities[record.entity_id].name if record.entity_id in self.entities else record.entity_id,
'entity_id': record.entity_id,
'depth': current_depth,
'transformation': transformation.description,
'transformation_type': transformation.transformation_type.value
})
trace(record.entity_id, current_depth + 1)
trace(entity_id, 0)
return sorted(dependencies, key=lambda x: x['depth'])
def get_entity_history(self, entity_id: str) -> List[Dict]:
"""获取实体的完整变更历史。"""
history = []
for record in self.lineage_records:
if record.entity_id == entity_id:
transformation = self.transformations.get(record.transformation_id)
if transformation:
history.append({
'timestamp': transformation.performed_at.isoformat(),
'action': transformation.transformation_type.value,
'description': transformation.description,
'performed_by': transformation.performed_by,
'inputs': [
self.entities[eid].name if eid in self.entities else eid
for eid in record.upstream_entities
]
})
return sorted(history, key=lambda x: x['timestamp'])
def impact_analysis(self, entity_id: str) -> Dict:
"""分析实体变更的影响。"""
downstream = self.trace_downstream(entity_id)
impact = {
'entity': self.entities[entity_id].name if entity_id in self.entities else entity_id,
'total_affected': len(downstream),
'affected_by_depth': {},
'affected_entities': downstream
}
for dep in downstream:
depth = dep['depth']
impact['affected_by_depth'][depth] = impact['affected_by_depth'].get(depth, 0) + 1
return impact
def validate_lineage(self) -> List[str]:
"""验证血缘的完整性和一致性。"""
issues = []
# 检查孤立实体(无来源或转换)
for eid, entity in self.entities.items():
has_lineage = any(r.entity_id == eid for r in self.lineage_records)
if not has_lineage and entity.entity_type != 'source':
issues.append(f"实体 '{entity.name}' 无血缘记录")
# 检查断开的引用
all_entity_ids = set(self.entities.keys())
for record in self.lineage_records:
for upstream_id in record.upstream_entities:
if upstream_id not in all_entity_ids:
issues.append(f"血缘引用了未知实体: {upstream_id}")
# 检查循环依赖
for eid in self.entities:
upstream = set()
to_check = [eid]
while to_check:
current = to_check.pop()
if current in upstream:
issues.append(f"检测到循环依赖,涉及实体: {self.entities[eid].name}")
break
upstream.add(current)
for record in self.lineage_records:
if record.entity_id == current:
to_check.extend(record.upstream_entities)
return issues
def generate_lineage_graph(self, entity_id: str) -> str:
"""生成血缘的 Mermaid 图。"""
lines = ["```mermaid", "graph LR"]
upstream = self.trace_upstream(entity_id, depth=5)
downstream = self.trace_downstream(entity_id, depth=5)
# 添加节点
added_nodes = set()
for item in upstream + downstream:
node_id = item['entity_id'].replace('-', '_')
if node_id not in added_nodes:
entity = self.entities.get(item['entity_id'])
name = entity.name if entity else item['entity_id']
lines.append(f" {node_id}[{name}]")
added_nodes.add(node_id)
# 添加目标节点
target_node = entity_id.replace('-', '_')
if target_node not in added_nodes:
entity = self.entities.get(entity_id)
name = entity.name if entity else entity_id
lines.append(f" {target_node}[{name}]:::target")
# 添加边
for item in upstream:
for upstream_id in item.get('upstream', []):
from_node = upstream_id.replace('-', '_')
to_node = item['entity_id'].replace('-', '_')
lines.append(f" {from_node} --> {to_node}")
for item in downstream:
from_node = entity_id.replace('-', '_')
to_node = item['entity_id'].replace('-', '_')
if to_node != from_node:
lines.append(f" {from_node} --> {to_node}")
lines.append(" classDef target fill:#f96")
lines.append("```")
return "\n".join(lines)
def export_lineage(self) -> Dict:
"""导出完整的血缘数据。"""
return {
'project_id': self.project_id,
'exported_at': datetime.now().isoformat(),
'sources': {k: {
'id': v.id,
'name': v.name,
'system': v.system,
'location': v.location,
'owner': v.owner
} for k, v in self.sources.items()},
'entities': {k: {
'id': v.id,
'name': v.name,
'source_id': v.source_id,
'entity_type': v.entity_type,
'parent_entities': v.parent_entities
} for k, v in self.entities.items()},
'transformations': {k: {
'id': v.id,
'type': v.transformation_type.value,
'description': v.description,
'input_entities': v.input_entities,
'output_entities': v.output_entities,
'performed_by': v.performed_by,
'performed_at': v.performed_at.isoformat()
} for k, v in self.transformations.items()},
'lineage_records': [{
'id': r.id,
'entity_id': r.entity_id,
'transformation_id': r.transformation_id,
'upstream_entities': r.upstream_entities
} for r in self.lineage_records]
}
def generate_report(self) -> str:
"""生成血缘报告。"""
lines = [f"# 数据血缘报告: {self.project_id}", ""]
lines.append(f"**生成时间:** {datetime.now().strftime('%Y-%m-%d %H:%M')}")
lines.append(f"**数据源:** {len(self.sources)}")
lines.append(f"**实体:** {len(self.entities)}")
lines.append(f"**转换:** {len(self.transformations)}")
lines.append("")
# 数据源
lines.append("## 数据源")
for source in self.sources.values():
lines.append(f"- **{source.name}** ({source.system})")
lines.append(f" - 位置: {source.location}")
lines.append(f" - 所有者: {source.owner}")
lines.append("")
# 验证
issues = self.validate_lineage()
if issues:
lines.append("## 血缘问题")
for issue in issues:
lines.append(f"- ⚠️ {issue}")
lines.append("")
# 转换摘要
lines.append("## 转换摘要")
type_counts = {}
for t in self.transformations.values():
type_counts[t.transformation_type.value] = type_counts.get(t.transformation_type.value, 0) + 1
for t_type, count in sorted(type_counts.items()):
lines.append(f"- {t_type}: {count}")
return "\n".join(lines)
# 初始化追踪器
tracker = ConstructionDataLineageTracker("PROJECT-001")
# 注册数据源
procore = tracker.register_source("Procore", "SaaS", "cloud", "PM Team")
sage = tracker.register_source("Sage 300", "Database", "on-prem", "Finance")
# 注册实体
budget = tracker.register_entity("Project Budget", procore.id, "table")
costs = tracker.register_entity("Job Costs", sage.id, "table")
report = tracker.register_entity("Cost Variance Report", procore.id, "file")
# 记录转换
tracker.record_transformation(
transformation_type=TransformationType.JOIN,
description="Join budget and actual costs for variance calculation",
input_entities=[budget.id, costs.id],
output_entities=[report.id],
logic="SELECT b.*, c.actual, (b.budget - c.actual) as variance FROM budget b JOIN costs c ON b.cost_code = c.cost_code",
performed_by="ETL Pipeline"
)
# 追踪血缘
upstream = tracker.trace_upstream(report.id)
print("上游血缘:", upstream)
# 生成图表
print(tracker.generate_lineage_graph(report.id))
# 导出用于审计
lineage_data = tracker.export_lineage()