Metadata Card
- Prerequisites: Chapter 11 (Data Governance Fundamentals)
- Estimated time: 40 minutes
- Core difficulty: Advanced
- Reading mode: Casual stroll
- Completion: Able to explain the role of data lineage, and manually or automatically trace data's path from source to destination
Your Progress
You started building a data governance framework in the Data Prophecy Hall, but quickly hit a practical problem: you see a number on a report—'87.4% mission completion rate for last quarter.' How did this number come about?
You start tracing: it comes from an aggregate table → that table came from an ETL script → the script read three source tables → one of those tables' data came from another team → that team's data pipeline had a manual modification.
A piece of data, from birth to death, traveled a path you couldn't see. You need to see it.
Your Task
"The number in this report is wrong." It's a phrase you hear every week. You open the report and see an aggregate value. Which table does this value come from? How many transformations did it go through? When was the last update? If no one can answer these questions, debugging data issues relies solely on intuition and luck. Data lineage is the system that answers these questions.
Metadata: Data About Data
Metadata falls into three categories:
- Technical metadata: Table structure, field types, partition info, file size
- Business metadata: Business meaning of fields, data owner, usage instructions
- Operational metadata: ETL run time, rows affected, error logs
The data catalog you built in Chapter 11 primarily covers technical metadata and some business metadata. Operational metadata needs to be extracted from pipeline run logs.
Technical metadata can be auto-extracted from parquet files—table name, row count, column count, types, size. But the true value of metadata lies in connecting technical and business information.
# Collecting technical metadata example
import pandas as pd
def collect_technical_metadata(file_path, table_name):
df = pd.read_parquet(file_path)
metadata = {
"table": table_name,
"rows": len(df),
"columns": len(df.columns),
"size_mb": round(df.memory_usage(deep=True).sum() / 1e6, 2),
"column_names": list(df.columns),
"dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
"partition_key": None,
"last_updated": None,
}
return metadata2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Data Lineage: Where Data Comes From, Where It Goes
Data lineage traces the path of data transformations. Every SQL query, Python transformation, and pipeline schedule—creates a lineage relationship.
The simplest lineage is the upstream-downstream chain:
source: mission_raw.csv
↓
transform: add_derived_columns.py
↓
table: missions_clean
↓
aggregate: daily_report.sql
↓
table: daily_mission_summary2
3
4
5
6
7
8
9
When someone asks "where does this value in daily_mission_summary come from," your answer should be this chain.
Manual Lineage Tracking
In small-scale projects, you can manually record each lineage in code.
# Record lineage information in pipeline code
from datetime import datetime
def pipeline_step(input_tables, output_table, transform_description):
"""Record one step of a data pipeline"""
lineage_record = {
"step_name": transform_description,
"input_tables": input_tables,
"output_table": output_table,
"run_at": datetime.now().isoformat(),
}
return lineage_record
# Usage example
step1 = pipeline_step(
input_tables=["sensors_raw"],
output_table="sensors_clean",
transform_description="clean sensors data (fill nulls, remove outliers)"
)
step2 = pipeline_step(
input_tables=["sensors_clean"],
output_table="hourly_sensors_agg",
transform_description="aggregate to hourly averages by sensor type"
)
# Collect lineage records
lineage_log = pd.DataFrame([step1, step2])
lineage_log.to_csv("lineage_log.csv", index=False)2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
When a problem arises, you can query this log:
Lineage query supports two directions: backtracking (which source tables does this output come from) and foretracking (which downstream systems will be affected by changes to this source table). When someone says "the number in the report is wrong," this is where you start investigating.
# Reverse lookup: which source tables feed into a target table
def find_upstream(lineage_df, target_table):
upstreams = lineage_df[lineage_df["output_table"] == target_table]
return upstreams["input_tables"].tolist()
# Forward lookup: which downstream tables are affected by a source table change
def find_downstream(lineage_df, source_table):
downstreams = lineage_df[lineage_df["input_tables"].apply(
lambda x: source_table in x
)]
return downstreams["output_table"].tolist()2
3
4
5
6
7
8
9
10
11
Automatic Lineage Tracking
The problem with manual recording—people forget to write it. You can automatically parse lineage from SQL queries.
import re
def parse_sql_lineage(sql_query):
"""Extract input and output tables from SQL"""
# Find INSERT INTO / CREATE TABLE AS
output_match = re.search(
r"(?:INSERT\s+INTO|CREATE\s+TABLE\s+\w+\s+AS)\s+(\w+)",
sql_query, re.IGNORECASE
)
output_table = output_match.group(1) if output_match else None
# Find table names in FROM / JOIN
input_tables = re.findall(
r"(?:FROM|JOIN)\s+(\w+)",
sql_query, re.IGNORECASE
)
return {"input": input_tables, "output": output_table}
# Example
query = """
INSERT INTO daily_mission_summary
SELECT m.mission_id, m.success, l.resources
FROM missions_clean m
JOIN logs l ON m.mission_id = l.mission_id
"""
print(parse_sql_lineage(query))
# Output: {"input": ["missions_clean", "logs"], "output": "daily_mission_summary"}2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Automatic parsing has limitations—it can't cover transformations in Python code. A more complete approach: uniformly and normatively record lineage at the entry and exit points of data pipelines.
Impact Analysis
The core application of lineage relationships is impact analysis. When you plan to modify a source table, you need to know which downstream reports will be affected.
def impact_analysis(lineage_df, changed_table):
"""Given a table to be changed, find all affected downstream tables"""
affected = set()
queue = [changed_table]
while queue:
current = queue.pop(0)
downstreams = find_downstream(lineage_df, current)
for t in downstreams:
if t not in affected:
affected.add(t)
queue.append(t)
return affected
# Example
print(impact_analysis(lineage_df, "missions_clean"))
# Output: {"daily_mission_summary", "weekly_performance", "team_analytics"}2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Before modifying missions_clean, you know it will affect three downstream reports. You can notify the relevant people in advance, or assess whether you need to create a new branch.
Common Pitfalls
- Lineage as a "one-time record." Pipelines evolve continuously; lineage needs to be updated along with the pipelines.
- Only recording explicit lineage (SQL table-to-table), ignoring implicit lineage (DataFrame transformations in Python code).
- Storing lineage and metadata in two separate systems, requiring toggling between two systems to trace a complete path.
Pass Challenges
- Warm-up: Pick a commonly used query and write down its upstream and downstream tables.
- Challenge: Build a manual lineage recording system for one of your data pipelines. After running for a month, check how many transformations it contains.
- Troubleshooting: An aggregate report shows data anomalies. Use lineage tracking to find the raw table where the problem originated.
Acceptance Criteria
- Can distinguish between technical metadata, business metadata, and operational metadata
- Can manually record lineage relationships of data transformations
- Can extract lineage information from SQL queries
- Can use lineage relationships for impact analysis
Traveler's Notes
Data lineage is the navigation system of the data world. Without it, you can only troubleshoot in a fog.
Next Chapter Preview
Lineage helps you trace data paths. But who owns the data? Next chapter, Data Mesh & Data Products—a model that lets data governance scale with team size.