元数据卡
- 前置知识:第11章(数据治理基础)
- 预计时间:40 分钟
- 核心难度:进阶
- 阅读模式:轻松漫游
- 完成标志:能解释数据血缘的作用,能手动或自动追踪数据从源头到目标的路径
你的进度
你开始建立数据治理的框架,但很快遇到一个实际问题:你在报表上看到一个数字——'上季度任务完成率 87.4%'。这个数字怎么来的?
你开始追踪:它来自某张聚合表 → 那表来自 ETL 脚本 → 脚本读了三张源表 → 其中一张表的数据来自另一个团队 → 那个团队的数据管道里有一次手动修改。
一条数据从出生到死亡,走过了一条你完全看不见的路。你需要看见它。
你的任务
"报告里的这个数字不对。"这是一句你每周都能听到的话。你打开报告,看到一个聚合值。这个值来自哪个表?经过了几次转换?最后一次更新是什么时候?如果没有人能回答这些问题,排查数据问题就只能靠直觉和运气。数据血缘就是回答这些问题的系统。
元数据:关于数据的数据
元数据分为三类:
- 技术元数据:表结构、字段类型、分区信息、文件大小
- 业务元数据:字段的业务含义、数据所有者、使用说明
- 操作元数据:ETL 运行时间、影响的行数、错误日志
你在第 11 章建的数据目录主要涵盖技术元数据和部分业务元数据。操作元数据则需要从管道运行日志中提取。
# 收集技术元数据示例
import pandas as pd
def collect_technical_metadata(file_path, table_name):
df = pd.read_parquet(file_path)
metadata = {
"table": table_name,
"rows": len(df),
"columns": len(df.columns),
"size_mb": round(df.memory_usage(deep=True).sum() / 1e6, 2),
"column_names": list(df.columns),
"dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
"partition_key": None,
"last_updated": None,
}
return metadata2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
数据血缘:数据从哪里来,到哪里去
数据血缘追踪的是数据变换的路径。每一次 SQL 查询、Python 转换、管道调度——都在创造一条血缘关系。
最简单的血缘关系就是上游-下游的链条:
source: mission_raw.csv
↓
transform: add_derived_columns.py
↓
table: missions_clean
↓
aggregate: daily_report.sql
↓
table: daily_mission_summary2
3
4
5
6
7
8
9
当有人问"daily_mission_summary 里的这个值从哪里来",你的答案应该是这条链。
手动追踪血缘
在小规模项目中,你可以在代码中手动记录每条血缘。
# 在管道代码中记录血缘信息
from datetime import datetime
def pipeline_step(input_tables, output_table, transform_description):
"""记录数据管道的一步"""
lineage_record = {
"step_name": transform_description,
"input_tables": input_tables,
"output_table": output_table,
"run_at": datetime.now().isoformat(),
}
return lineage_record
# 使用示例
step1 = pipeline_step(
input_tables=["sensors_raw"],
output_table="sensors_clean",
transform_description="clean sensors data (fill nulls, remove outliers)"
)
step2 = pipeline_step(
input_tables=["sensors_clean"],
output_table="hourly_sensors_agg",
transform_description="aggregate to hourly averages by sensor type"
)
# 收集血缘记录
lineage_log = pd.DataFrame([step1, step2])
lineage_log.to_csv("lineage_log.csv", index=False)2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
当问题出现时,你可以查询这个日志:
# 反向查找:某个输出表的来源
def find_upstream(lineage_df, target_table):
upstreams = lineage_df[lineage_df["output_table"] == target_table]
return upstreams["input_tables"].tolist()
# 正向查找:某个表的变更影响了谁
def find_downstream(lineage_df, source_table):
downstreams = lineage_df[lineage_df["input_tables"].apply(
lambda x: source_table in x
)]
return downstreams["output_table"].tolist()2
3
4
5
6
7
8
9
10
11
自动追踪血缘
手动记录的问题——人会忘记写。你可以从 SQL 查询中自动解析血缘。
import re
def parse_sql_lineage(sql_query):
"""从 SQL 中提取输入表和输出表"""
# 查找 INSERT INTO / CREATE TABLE AS
output_match = re.search(
r"(?:INSERT\s+INTO|CREATE\s+TABLE\s+\w+\s+AS)\s+(\w+)",
sql_query, re.IGNORECASE
)
output_table = output_match.group(1) if output_match else None
# 查找 FROM / JOIN 中的表名
input_tables = re.findall(
r"(?:FROM|JOIN)\s+(\w+)",
sql_query, re.IGNORECASE
)
return {"input": input_tables, "output": output_table}
# 示例
query = """
INSERT INTO daily_mission_summary
SELECT m.mission_id, m.success, l.resources
FROM missions_clean m
JOIN logs l ON m.mission_id = l.mission_id
"""
print(parse_sql_lineage(query))
# 输出: {"input": ["missions_clean", "logs"], "output": "daily_mission_summary"}2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
自动解析有局限性——无法覆盖 Python 代码中的变换。更完善的做法:在数据管道的入口和出口统一规范地记录血缘。
影响分析
血缘关系的核心应用场景是影响分析。当你打算修改一张源表时,你需要知道哪些下游报表会被影响。
def impact_analysis(lineage_df, changed_table):
"""给定一个要变更的表,找出所有受影响的下游"""
affected = set()
queue = [changed_table]
while queue:
current = queue.pop(0)
downstreams = find_downstream(lineage_df, current)
for t in downstreams:
if t not in affected:
affected.add(t)
queue.append(t)
return affected
# 示例
print(impact_analysis(lineage_df, "missions_clean"))
# 输出: {"daily_mission_summary", "weekly_performance", "team_analytics"}2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
你在改 missions_clean 之前就知道——这会影响到三个下游报表。你可以提前通知相关人员,或评估是否需要开一条新的分支。
常见陷阱
- 血缘是"一次性记录"。管道会持续演进,血缘需要和管道一起更新。
- 只记录了显式血缘(SQL 表到表),忽略了隐式血缘(Python 代码里的 DataFrame 变换)。
- 把血缘和元数据存在两套系统中,导致查一个完整路径需要在两个系统间跳转。
通关挑战
- 热身:选择一个你常用的查询,写下它的上游表和下游表。
- 挑战:为你的一个数据管道搭建手动血缘记录系统。运行一个月后,检查它包含了多少变换。
- 排障:一个聚合报告显示数据异常。通过血缘追踪找到问题的原始表。
验收标准
- 能区分技术元数据、业务元数据和操作元数据
- 能手动记录数据变换的血缘关系
- 能从 SQL 查询中提取血缘信息
- 会使用血缘关系做影响分析
旅人笔记
数据血缘是数据世界的导航系统。没有它,你只能在一团迷雾中排查问题。
下一站预告
血缘帮你追踪数据路径。但谁负责数据?下一章,Data Mesh 与数据产品——一种让数据治理与团队规模一起增长的模式。