Understanding Data Provenance and Lineage

Learn what data provenance and data lineage mean for data science. Understand how to track data origins, transformations, column-level lineage, and why it matters for debugging, compliance, and trust.

Understanding Data Provenance and Lineage

Data provenance is the documented history of where data originated — its source systems, how it was collected, and who created it. Data lineage is the broader record of how data has moved and transformed from its origin through every pipeline stage to its current form — which tables it was joined to, which transformations were applied, which columns were derived from which source fields. Together, provenance and lineage answer the question “where did this data come from and what happened to it?” — making it possible to debug wrong results, assess the trustworthiness of a dataset, satisfy compliance requirements, and understand the blast radius of upstream data changes.

Introduction

Every number in a dashboard, every feature in a machine learning model, every metric in a business report has a history. It came from somewhere, it was transformed along the way, and the quality and trustworthiness of that number depends entirely on the integrity of that history.

Imagine a senior executive questions why the revenue number in this morning’s report differs from the number they saw yesterday. Without data lineage, answering this question requires hours of code archaeology — tracing backwards through multiple SQL transformations, Python scripts, and pipeline stages, trying to reconstruct what happened. With data lineage, the answer is a few clicks: here are all the upstream tables that feed into this number, here is the transformation that changed yesterday, and here is the specific column mapping that produced the discrepancy.

Now consider a regulatory audit asking you to demonstrate that customer data processed for a marketing campaign was properly consented. Without data provenance, you cannot answer this question. With provenance, you can point to the source system timestamp, the consent flag at the time of collection, and every transformation that data passed through.

Or imagine a data engineer changes a source table schema — renames a column from cust_id to customer_id. Without lineage, they have no idea which of the 200 downstream tables, models, and reports will break. With lineage, they can see every downstream consumer of that column and coordinate changes proactively.

Provenance and lineage are not abstract concepts — they are the practical infrastructure that makes large data systems trustworthy, debuggable, and maintainable. This article explains what they are, why they matter, how they’re implemented in modern data tools, and how data scientists can both benefit from and contribute to good lineage practices in their own work.

Data Provenance: Where Did This Data Come From?

Data provenance (from the French provenir — “to come from”) answers the question: what is the origin and history of a piece of data?

Provenance documentation answers:

  • What source system did this data come from?
  • How was it collected? (manual entry, sensor reading, API call, form submission, click event)
  • When was it collected?
  • Who collected or created it?
  • What were the conditions under which it was collected?
  • Has it been modified since collection, and by whom?

Why Provenance Matters

Trustworthiness assessment: A revenue figure from a validated financial system is more trustworthy than one derived from a manually maintained spreadsheet. Knowing the origin lets you assess the data’s reliability.

Regulatory compliance: GDPR, HIPAA, CCPA, and SOX all have data origin and handling requirements. “Where did you get this customer’s data, and did you have the right to use it for this purpose?” requires provenance to answer.

Reproducibility: Can you reproduce the exact dataset used to train a model from six months ago? Only if you recorded provenance: which data sources, which version, which time window.

Bias detection: A training dataset collected only in summer might produce a seasonal model. A survey conducted only online might miss offline populations. Provenance tells you the collection conditions that might introduce bias.

Python
# Documenting provenance in practice: metadata attached to datasets
import json
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict
from typing import Optional

@dataclass
class DataProvenance:
    """
    Records the provenance metadata for a dataset or DataFrame.
    Attach to any dataset being created or consumed.
    """
    # Source information
    source_system:    str
    source_table:     Optional[str] = None
    source_query:     Optional[str] = None
    source_file:      Optional[str] = None
    source_api:       Optional[str] = None

    # Collection context
    collection_method: str = "automated_pipeline"  # 'sensor', 'manual_entry', 'api', 'scrape', etc.
    collection_start:  Optional[str] = None   # ISO timestamp of earliest record
    collection_end:    Optional[str] = None   # ISO timestamp of latest record

    # Processing context
    extracted_at:     str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
    extracted_by:     str = "data_pipeline"   # Pipeline name or user
    pipeline_version: str = "unknown"
    processing_notes: str = ""

    # Quality and trust
    row_count:        Optional[int] = None
    known_issues:     list = field(default_factory=list)
    data_classification: str = "internal"  # 'public', 'internal', 'confidential', 'restricted'

    def to_dict(self) -> dict:
        return asdict(self)

    def to_json(self) -> str:
        return json.dumps(self.to_dict(), indent=2)

    def save(self, filepath: str):
        with open(filepath, "w") as f:
            f.write(self.to_json())

    @classmethod
    def load(cls, filepath: str) -> "DataProvenance":
        with open(filepath) as f:
            data = json.load(f)
        return cls(**data)


# Attach provenance when creating a dataset
import pandas as pd

# Extract data from warehouse
df = pd.read_sql("""
    SELECT customer_id, email, signup_date, lifetime_value
    FROM dim_customer
    WHERE is_current = TRUE
    AND signup_date >= '2020-01-01'
""", engine)

# Record its provenance
provenance = DataProvenance(
    source_system     = "analytics_warehouse",
    source_table      = "dim_customer",
    source_query      = "SELECT ... WHERE is_current=TRUE AND signup_date>='2020-01-01'",
    collection_method = "sql_query",
    collection_start  = str(df["signup_date"].min()),
    collection_end    = str(df["signup_date"].max()),
    extracted_by      = "churn_model_pipeline_v2",
    pipeline_version  = "2.1.3",
    row_count         = len(df),
    data_classification = "confidential",
    known_issues      = [
        "signup_date may be NULL for SSO customers imported before 2021-06"
    ]
)

# Save both data and provenance together
df.to_parquet("data/processed/customers.parquet", index=False)
provenance.save("data/processed/customers.provenance.json")

print(provenance.to_json())

Data Lineage: What Happened to the Data Along the Way?

Data lineage maps the complete journey of data from its sources through every transformation to its current form. While provenance answers “where did this come from?”, lineage answers “what path did it take, and what was done to it?”

The Three Levels of Data Lineage

Table-level lineage: Which tables fed into this table? Which tables does this table feed into?

Python
orders_raw  ──┐
customers_raw─┼──► stg_orders ──► fct_sales ──► monthly_revenue_summary
products_raw ─┘

Column-level lineage: Which source column produced this output column? Through what transformations?

Plaintext
fct_sales.line_revenue
    ← stg_order_items.quantity × stg_order_items.unit_price
        ← raw_orders.item_qty (cast to integer, NULL → 0)
        ← raw_orders.unit_price_usd (cast to DECIMAL(10,2))

Record-level lineage: Which specific source records produced this output record? (Rarely tracked due to volume, but valuable for auditing individual records.)

Building a Simple Lineage Tracker in Python

Python
import json
from dataclasses import dataclass, field, asdict
from typing import Optional
from datetime import datetime, timezone
from pathlib import Path

@dataclass
class ColumnMapping:
    """Records how one output column was derived from source columns."""
    output_column:   str
    source_columns:  list           # List of source columns that contributed
    transformation:  str            # Description of transformation
    is_direct_copy:  bool = False   # True if value is unchanged from source
    formula:         Optional[str] = None  # SQL expression or formula

@dataclass
class DatasetLineage:
    """
    Records the complete lineage for one dataset creation step.
    Captures inputs, outputs, transformations, and execution metadata.
    """
    # Identity
    output_dataset:   str           # Name/path of the output dataset
    step_name:        str           # Human-readable name of this step
    step_type:        str           # 'sql_transform', 'python_transform', 'join', 'filter', etc.

    # Inputs
    input_datasets:   list = field(default_factory=list)  # List of input dataset names

    # Column-level mapping
    column_mappings:  list = field(default_factory=list)  # List of ColumnMapping dicts

    # Execution metadata
    executed_at:      str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
    executed_by:      str = "pipeline"
    code_reference:   Optional[str] = None  # File path or git commit
    sql_query:        Optional[str] = None  # SQL used (if applicable)

    # Output stats
    rows_in:   Optional[int] = None
    rows_out:  Optional[int] = None
    duration_seconds: Optional[float] = None

    def add_column_mapping(self, output_col: str, source_cols: list,
                            transformation: str, formula: str = None,
                            is_direct: bool = False):
        self.column_mappings.append(ColumnMapping(
            output_column=output_col, source_columns=source_cols,
            transformation=transformation, formula=formula,
            is_direct_copy=is_direct
        ))

    def to_dict(self) -> dict:
        d = asdict(self)
        d["executed_at"] = self.executed_at
        return d

    def save(self, lineage_dir: str = "lineage"):
        Path(lineage_dir).mkdir(parents=True, exist_ok=True)
        safe_name = self.output_dataset.replace("/", "_").replace(".", "_")
        filepath = f"{lineage_dir}/{safe_name}_lineage.json"
        with open(filepath, "w") as f:
            json.dump(self.to_dict(), f, indent=2)
        return filepath


# Document lineage for a feature engineering step
lineage = DatasetLineage(
    output_dataset = "data/processed/customer_churn_features.parquet",
    step_name      = "Customer Churn Feature Engineering",
    step_type      = "python_transform",
    input_datasets = [
        "analytics_warehouse.dim_customer",
        "analytics_warehouse.fact_sales",
        "analytics_warehouse.fact_support_tickets"
    ],
    executed_by    = "churn_pipeline_v2",
    code_reference = "pipelines/churn_features.py:transform_order_features()",
    rows_in        = 12847,
    rows_out       = 9234
)

# Document column-level lineage
lineage.add_column_mapping(
    output_col    = "lifetime_value",
    source_cols   = ["fact_sales.line_revenue"],
    transformation= "SUM aggregation over all orders",
    formula       = "SUM(fact_sales.line_revenue) GROUP BY customer_key",
    is_direct     = False
)
lineage.add_column_mapping(
    output_col    = "recency_days",
    source_cols   = ["fact_sales.order_date"],
    transformation= "Days from most recent order to reference date",
    formula       = "DATEDIFF(reference_date, MAX(order_date))",
    is_direct     = False
)
lineage.add_column_mapping(
    output_col    = "customer_segment",
    source_cols   = ["dim_customer.customer_segment"],
    transformation= "Direct copy from current customer dimension record",
    is_direct     = True
)
lineage.add_column_mapping(
    output_col    = "n_high_severity_tickets",
    source_cols   = ["fact_support_tickets.severity"],
    transformation= "Count of tickets where severity = 'high'",
    formula       = "SUM(CASE WHEN severity = 'high' THEN 1 ELSE 0 END)",
    is_direct     = False
)

filepath = lineage.save("lineage/")
print(f"Lineage saved to: {filepath}")

Lineage in the Modern Data Stack

In practice, lineage is tracked at different levels of sophistication depending on the tools in use.

dbt: Automatic Table-Level Lineage

dbt (data build tool) provides automatic table-level lineage as a core feature. Because every dbt model references its dependencies using {{ ref('model_name') }}, dbt can construct a complete dependency graph of every table in the warehouse.

SQL
-- models/marts/fct_sales.sql
-- dbt automatically knows this model depends on stg_orders, stg_customers, stg_products
-- because of the {{ ref() }} calls

WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),
customers AS (
    SELECT * FROM {{ ref('stg_customers') }}
),
products AS (
    SELECT * FROM {{ ref('stg_products') }}
)

SELECT
    o.order_id,
    c.customer_key,
    p.product_key,
    o.order_date,
    p.quantity * p.unit_price AS line_revenue
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN products  p ON o.order_id    = p.order_id

Running dbt docs generate and dbt docs serve produces an interactive lineage graph in the browser showing the complete DAG from raw sources to final marts. Every model can see its direct dependencies and all downstream consumers.

YAML
# sources.yml — documents raw source tables (root of the lineage)
version: 2

sources:
  - name: postgres_ecommerce
    description: "Production e-commerce PostgreSQL database"
    database: raw
    schema: public
    tables:
      - name: orders
        description: "Raw order records from the e-commerce platform"
        columns:
          - name: order_id
            description: "Unique order identifier"
            tests: [unique, not_null]
          - name: customer_id
            description: "FK to customers table"
          - name: created_at
            description: "UTC timestamp when order was placed"

Apache Atlas and OpenLineage

For non-dbt environments, dedicated lineage tools track data movement across heterogeneous systems:

Apache Atlas is an open-source metadata and governance platform that automatically captures lineage from Hive, Spark, Kafka, and other Hadoop ecosystem tools.

OpenLineage is an open standard for lineage collection that integrates with Spark, Airflow, dbt, Flink, and other tools, sending lineage events to backends like Marquez or Atlan.

Python
# OpenLineage: emitting lineage events from a Python pipeline
# pip install openlineage-python

from openlineage.client import OpenLineageClient
from openlineage.client.run import (
    RunEvent, RunState, Run, Job,
    Dataset, InputDataset, OutputDataset
)
from openlineage.client.facet import (
    SchemaDatasetFacet, SchemaField,
    SqlJobFacet, DataQualityMetricsInputDatasetFacet
)
import uuid
from datetime import datetime, timezone

# Initialize the client
client = OpenLineageClient(url="http://lineage-server:5000")

run_id  = str(uuid.uuid4())
job_name = "customer_churn_feature_pipeline"
namespace = "data-science-team"

# Emit START event
client.emit(RunEvent(
    eventType=RunState.START,
    eventTime=datetime.now(timezone.utc).isoformat(),
    run=Run(runId=run_id),
    job=Job(namespace=namespace, name=job_name),
    inputs=[
        InputDataset(
            namespace="analytics-warehouse",
            name="dim_customer",
            facets={
                "schema": SchemaDatasetFacet(fields=[
                    SchemaField(name="customer_key", type="INTEGER"),
                    SchemaField(name="customer_id",  type="VARCHAR"),
                    SchemaField(name="customer_segment", type="VARCHAR"),
                ])
            }
        ),
        InputDataset(namespace="analytics-warehouse", name="fact_sales"),
    ],
    outputs=[
        OutputDataset(
            namespace="data-lake",
            name="customer_churn_features",
            facets={
                "schema": SchemaDatasetFacet(fields=[
                    SchemaField(name="customer_id",      type="VARCHAR"),
                    SchemaField(name="lifetime_value",   type="DOUBLE"),
                    SchemaField(name="recency_days",     type="INTEGER"),
                    SchemaField(name="customer_segment", type="VARCHAR"),
                ])
            }
        )
    ]
))

# ... run the actual pipeline ...
features_df = run_pipeline()

# Emit COMPLETE event with statistics
client.emit(RunEvent(
    eventType=RunState.COMPLETE,
    eventTime=datetime.now(timezone.utc).isoformat(),
    run=Run(runId=run_id),
    job=Job(namespace=namespace, name=job_name),
    inputs=[InputDataset(namespace="analytics-warehouse", name="dim_customer")],
    outputs=[OutputDataset(
        namespace="data-lake",
        name="customer_churn_features",
        facets={
            "dataQualityMetrics": DataQualityMetricsInputDatasetFacet(
                rowCount=len(features_df),
                bytes=features_df.memory_usage(deep=True).sum(),
                columnMetrics={}
            )
        }
    )]
))

Building a Lightweight Lineage Graph

For teams without dedicated lineage infrastructure, a simple lineage graph can be built and visualized in Python using NetworkX:

Python
import networkx as nx
import json
from pathlib import Path
from typing import Optional

class LineageGraph:
    """
    A lightweight DAG-based lineage tracker.

    Represents datasets as nodes and transformations as edges.
    Supports forward (impact) and backward (root cause) traversal.
    """

    def __init__(self):
        self.graph = nx.DiGraph()

    def add_dataset(self, name: str, dataset_type: str = "table",
                     description: str = "", system: str = "unknown",
                     metadata: dict = None):
        """Register a dataset as a node in the lineage graph."""
        self.graph.add_node(name, **{
            "type":        dataset_type,
            "description": description,
            "system":      system,
            "metadata":    metadata or {}
        })

    def add_transformation(self, inputs: list, output: str,
                            step_name: str, step_type: str = "transform",
                            code_ref: str = None, description: str = ""):
        """
        Record a transformation step: inputs → output.

        Adds the output dataset if not already registered,
        and creates edges from each input to the output.
        """
        # Ensure output node exists
        if output not in self.graph:
            self.graph.add_node(output, type="table", system="derived")

        for input_ds in inputs:
            if input_ds not in self.graph:
                self.graph.add_node(input_ds, type="table", system="source")
            self.graph.add_edge(input_ds, output, **{
                "step_name":   step_name,
                "step_type":   step_type,
                "code_ref":    code_ref,
                "description": description
            })

    def get_upstream(self, dataset: str, depth: int = None) -> set:
        """
        Get all datasets that (directly or indirectly) feed into the given dataset.

        Parameters
        ----------
        dataset : str
            Target dataset name.
        depth : int, optional
            Maximum hops upstream. None = unlimited.

        Returns
        -------
        set
            All ancestor dataset names.
        """
        if depth is None:
            return set(nx.ancestors(self.graph, dataset))
        ancestors = set()
        frontier = {dataset}
        for _ in range(depth):
            new_frontier = set()
            for node in frontier:
                parents = set(self.graph.predecessors(node))
                new_frontier |= parents - ancestors
                ancestors |= parents
            frontier = new_frontier
            if not frontier:
                break
        return ancestors

    def get_downstream(self, dataset: str, depth: int = None) -> set:
        """
        Get all datasets that (directly or indirectly) depend on the given dataset.
        Essential for impact analysis: 'if I change X, what breaks?'
        """
        if depth is None:
            return set(nx.descendants(self.graph, dataset))
        descendants = set()
        frontier = {dataset}
        for _ in range(depth):
            new_frontier = set()
            for node in frontier:
                children = set(self.graph.successors(node))
                new_frontier |= children - descendants
                descendants |= children
            frontier = new_frontier
            if not frontier:
                break
        return descendants

    def get_lineage_path(self, source: str, destination: str) -> list:
        """
        Find the transformation path from source to destination.

        Returns the list of datasets in order from source to destination.
        """
        try:
            path = nx.shortest_path(self.graph, source, destination)
            return path
        except (nx.NetworkXNoPath, nx.NodeNotFound):
            return []

    def impact_analysis(self, changed_dataset: str) -> dict:
        """
        Analyze the impact of changing or removing a dataset.
        Returns all downstream consumers grouped by distance.
        """
        downstream = list(nx.descendants(self.graph, changed_dataset))
        impact_by_distance = {}

        for ds in downstream:
            try:
                distance = len(nx.shortest_path(
                    self.graph, changed_dataset, ds
                )) - 1
                if distance not in impact_by_distance:
                    impact_by_distance[distance] = []
                impact_by_distance[distance].append(ds)
            except nx.NetworkXNoPath:
                pass

        print(f"\nImpact Analysis: '{changed_dataset}' changed")
        print(f"Total affected downstream datasets: {len(downstream)}")
        for dist, datasets in sorted(impact_by_distance.items()):
            print(f"  Distance {dist}: {datasets}")

        return impact_by_distance

    def visualize(self, highlight_node: str = None, figsize: tuple = (14, 8)):
        """Plot the lineage graph using matplotlib."""
        import matplotlib.pyplot as plt

        pos = nx.spring_layout(self.graph, seed=42, k=2)

        # Color nodes by type
        type_colors = {
            "source":  "#3498DB",
            "table":   "#2ECC71",
            "model":   "#9B59B6",
            "report":  "#E67E22",
            "derived": "#95A5A6"
        }

        node_colors = [
            "#E74C3C" if n == highlight_node
            else type_colors.get(self.graph.nodes[n].get("type", "table"), "#95A5A6")
            for n in self.graph.nodes
        ]

        fig, ax = plt.subplots(figsize=figsize)
        nx.draw(
            self.graph, pos, ax=ax,
            node_color=node_colors,
            node_size=2000,
            with_labels=True,
            font_size=9,
            font_weight="bold",
            edge_color="#BDC3C7",
            arrows=True,
            arrowsize=20
        )

        # Add legend
        from matplotlib.patches import Patch
        legend_elements = [Patch(facecolor=c, label=t)
                           for t, c in type_colors.items()]
        if highlight_node:
            legend_elements.append(Patch(facecolor="#E74C3C",
                                          label=f"Changed: {highlight_node}"))
        ax.legend(handles=legend_elements, loc="upper left")

        ax.set_title("Data Lineage Graph", fontsize=14, fontweight="bold")
        plt.tight_layout()
        return fig

    def to_json(self) -> str:
        """Serialize the lineage graph to JSON."""
        data = {
            "nodes": [
                {"id": n, **self.graph.nodes[n]}
                for n in self.graph.nodes
            ],
            "edges": [
                {"source": u, "target": v, **self.graph.edges[u, v]}
                for u, v in self.graph.edges
            ]
        }
        return json.dumps(data, indent=2)


# Build the lineage graph for a typical analytics stack
lg = LineageGraph()

# Source systems
lg.add_dataset("postgres.orders",    "source", "Raw orders from e-commerce DB",   "PostgreSQL")
lg.add_dataset("postgres.customers", "source", "Raw customer records",             "PostgreSQL")
lg.add_dataset("postgres.products",  "source", "Raw product catalog",              "PostgreSQL")
lg.add_dataset("salesforce.accounts","source", "B2B account data from Salesforce", "Salesforce")
lg.add_dataset("mixpanel.events",    "source", "Product analytics events",         "Mixpanel")

# Staging layer
lg.add_transformation(
    inputs=["postgres.orders"], output="warehouse.stg_orders",
    step_name="Stage Orders", step_type="dbt_model",
    code_ref="models/staging/stg_orders.sql",
    description="Clean, type-cast, and filter raw orders"
)
lg.add_transformation(
    inputs=["postgres.customers", "salesforce.accounts"],
    output="warehouse.stg_customers",
    step_name="Stage Customers", step_type="dbt_model",
    code_ref="models/staging/stg_customers.sql",
    description="Merge e-commerce and CRM customer records"
)
lg.add_transformation(
    inputs=["postgres.products"], output="warehouse.stg_products",
    step_name="Stage Products", step_type="dbt_model",
    code_ref="models/staging/stg_products.sql"
)

# Core fact table
lg.add_transformation(
    inputs=["warehouse.stg_orders", "warehouse.stg_customers",
            "warehouse.stg_products"],
    output="warehouse.fct_sales",
    step_name="Build Sales Fact", step_type="dbt_model",
    code_ref="models/marts/fct_sales.sql",
    description="Star schema fact table at order-item grain"
)

# Mart/aggregation layer
lg.add_transformation(
    inputs=["warehouse.fct_sales", "warehouse.stg_customers"],
    output="warehouse.mart_customer_summary",
    step_name="Customer Summary Mart", step_type="dbt_model",
    code_ref="models/marts/mart_customer_summary.sql"
)
lg.add_transformation(
    inputs=["warehouse.fct_sales"],
    output="warehouse.mart_daily_revenue",
    step_name="Daily Revenue Mart", step_type="dbt_model"
)

# ML features
lg.add_transformation(
    inputs=["warehouse.mart_customer_summary", "mixpanel.events"],
    output="feature_store.churn_features",
    step_name="Churn Feature Engineering", step_type="python_pipeline",
    code_ref="pipelines/churn_features.py"
)

# Reports / consumers
lg.add_transformation(
    inputs=["warehouse.mart_daily_revenue"],
    output="dashboard.revenue_tracker",
    step_name="Revenue Dashboard", step_type="bi_report",
    code_ref="looker/dashboards/revenue_tracker.lookml"
)
lg.add_transformation(
    inputs=["feature_store.churn_features"],
    output="model.churn_prediction_v3",
    step_name="Churn Model Training", step_type="ml_training"
)

# ── Use the lineage graph ─────────────────────────────────────────
print("Upstream of churn model:")
print(lg.get_upstream("model.churn_prediction_v3"))
# {'postgres.orders', 'postgres.customers', 'salesforce.accounts',
#  'postgres.products', 'mixpanel.events', 'warehouse.stg_orders', ...}

print("\nDownstream of postgres.customers:")
print(lg.get_downstream("postgres.customers"))
# {'warehouse.stg_customers', 'warehouse.fct_sales',
#  'warehouse.mart_customer_summary', 'feature_store.churn_features',
#  'model.churn_prediction_v3'}

print("\nLineage path from postgres.orders to dashboard.revenue_tracker:")
print(lg.get_lineage_path("postgres.orders", "dashboard.revenue_tracker"))
# ['postgres.orders', 'warehouse.stg_orders', 'warehouse.fct_sales',
#  'warehouse.mart_daily_revenue', 'dashboard.revenue_tracker']

# Impact analysis: what happens if postgres.customers changes?
impact = lg.impact_analysis("postgres.customers")

Column-Level Lineage: The Deepest Level

Column-level lineage tracks not just which tables feed into which tables, but which specific columns in source tables produced specific columns in output tables.

Python
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class ColumnNode:
    """Represents one column in the column-level lineage graph."""
    table:  str
    column: str

    @property
    def full_name(self) -> str:
        return f"{self.table}.{self.column}"

    def __hash__(self):
        return hash(self.full_name)

    def __eq__(self, other):
        return self.full_name == other.full_name


@dataclass
class ColumnTransformation:
    """Records how output columns are derived from input columns."""
    output: ColumnNode
    inputs: list          # List of ColumnNode
    formula: str          # SQL expression or description
    transform_type: str   # 'direct', 'cast', 'arithmetic', 'aggregate', 'concat', 'condition'


class ColumnLineageTracker:
    """
    Tracks column-level lineage for SQL transformations.
    Answers: 'Which source columns produced this output column?'
    """

    def __init__(self):
        self.mappings: list = []

    def add_mapping(self, output_table: str, output_column: str,
                     input_columns: list,   # list of (table, column) tuples
                     formula: str,
                     transform_type: str = "direct") -> None:
        """Register a column-level transformation."""
        output_node = ColumnNode(table=output_table, column=output_column)
        input_nodes = [ColumnNode(table=t, column=c) for t, c in input_columns]
        self.mappings.append(ColumnTransformation(
            output=output_node,
            inputs=input_nodes,
            formula=formula,
            transform_type=transform_type
        ))

    def get_column_ancestry(self, table: str, column: str) -> list:
        """
        Get all source columns that contributed to a given output column.
        Recursively traces back through all transformation steps.
        """
        target = ColumnNode(table=table, column=column)
        visited = set()
        ancestors = []

        def trace_back(node: ColumnNode):
            if node.full_name in visited:
                return
            visited.add(node.full_name)

            for mapping in self.mappings:
                if mapping.output == node:
                    for input_node in mapping.inputs:
                        ancestors.append({
                            "source":     input_node.full_name,
                            "output":     node.full_name,
                            "formula":    mapping.formula,
                            "transform":  mapping.transform_type
                        })
                        trace_back(input_node)

        trace_back(target)
        return ancestors

    def get_column_impact(self, table: str, column: str) -> list:
        """
        Get all output columns that depend on a given source column.
        Answers: 'If I change this column, what else breaks?'
        """
        source = ColumnNode(table=table, column=column)
        impacts = []

        def trace_forward(node: ColumnNode):
            for mapping in self.mappings:
                if node in mapping.inputs:
                    impacts.append({
                        "source":   node.full_name,
                        "output":   mapping.output.full_name,
                        "formula":  mapping.formula
                    })
                    trace_forward(mapping.output)

        trace_forward(source)
        return impacts


# Build column-level lineage for the revenue calculation
clt = ColumnLineageTracker()

# raw_orders → stg_orders
clt.add_mapping("stg_orders", "order_id",
    [("raw_orders", "id")],
    "CAST(id AS TEXT)", transform_type="cast")

clt.add_mapping("stg_orders", "quantity",
    [("raw_orders", "item_quantity")],
    "COALESCE(item_quantity, 0)", transform_type="direct")

clt.add_mapping("stg_orders", "unit_price_usd",
    [("raw_orders", "unit_price"), ("raw_orders", "currency")],
    "CASE WHEN currency='EUR' THEN unit_price * 1.08 ELSE unit_price END",
    transform_type="condition")

# stg_orders → fct_sales
clt.add_mapping("fct_sales", "line_revenue",
    [("stg_orders", "quantity"), ("stg_orders", "unit_price_usd")],
    "quantity * unit_price_usd", transform_type="arithmetic")

# fct_sales → mart_customer_summary
clt.add_mapping("mart_customer_summary", "lifetime_value",
    [("fct_sales", "line_revenue")],
    "SUM(line_revenue) GROUP BY customer_id", transform_type="aggregate")

# Trace: where does lifetime_value come from, all the way back?
print("Column ancestry for mart_customer_summary.lifetime_value:")
for step in clt.get_column_ancestry("mart_customer_summary", "lifetime_value"):
    print(f"  {step['source']} ──[{step['transform']}]──► {step['output']}")
    print(f"    Formula: {step['formula']}")

# Impact: if we change raw_orders.unit_price, what breaks?
print("\nImpact of changing raw_orders.unit_price:")
for impact in clt.get_column_impact("raw_orders", "unit_price"):
    print(f"  → {impact['output']}")

Why Lineage Matters for Data Scientists: Practical Scenarios

Scenario 1: Debugging a Wrong Metric

Python
# You notice the revenue dashboard shows $1.2M but last week it was $1.1M
# The business says nothing changed — is this a data bug or a real change?

def debug_metric_with_lineage(lg: LineageGraph, metric_dataset: str):
    """
    Use lineage to systematically narrow down the source of a metric change.
    """
    print(f"Debugging unexpected change in: {metric_dataset}")
    print(f"\nStep 1: Trace all upstream data sources")

    upstream = lg.get_upstream(metric_dataset)
    print(f"  {len(upstream)} upstream datasets:")
    for ds in sorted(upstream):
        print(f"  → {ds}")

    print(f"\nStep 2: Check each upstream for recent changes")
    # In a real system, you'd query pipeline run logs or audit tables
    # to find which upstream datasets changed recently

    print(f"\nStep 3: Find the lineage path for the suspicious column")
    # Use column-level lineage to trace back to the raw source

    return upstream

Scenario 2: Assessing the Risk of an Upstream Schema Change

Python
def schema_change_impact_report(lg: LineageGraph,
                                  changed_dataset: str) -> dict:
    """
    Generate an impact report before making a schema change.
    Shows every downstream asset that will be affected.
    """
    downstream = lg.get_downstream(changed_dataset)

    # Categorize by type
    report = {
        "changed_dataset":   changed_dataset,
        "total_affected":    len(downstream),
        "affected_by_type":  {},
        "affected_list":     []
    }

    for ds in downstream:
        ds_type = lg.graph.nodes[ds].get("type", "unknown")
        if ds_type not in report["affected_by_type"]:
            report["affected_by_type"][ds_type] = []
        report["affected_by_type"][ds_type].append(ds)
        report["affected_list"].append(ds)

    print(f"\nSchema Change Impact Report")
    print(f"Changed dataset: {changed_dataset}")
    print(f"Total downstream affected: {len(downstream)}")
    print(f"\nBy type:")
    for dtype, dsets in report["affected_by_type"].items():
        print(f"  {dtype} ({len(dsets)}): {dsets}")

    return report

Scenario 3: Model Reproducibility

Python
# To reproduce a model from 6 months ago, you need to know:
# 1. Exactly which source tables were used (provenance)
# 2. Which transformations were applied (lineage)
# 3. What the data looked like at training time (version)

def create_model_training_manifest(
    model_name:     str,
    model_version:  str,
    input_datasets: list,  # List of (dataset_name, version_or_snapshot_date)
    feature_list:   list,
    target_column:  str,
    training_date:  str
) -> dict:
    """
    Create a complete reproducibility manifest for a trained model.
    Save this alongside the model artifact for future reference.
    """
    manifest = {
        "model_name":      model_name,
        "model_version":   model_version,
        "trained_at":      training_date,
        "target_column":   target_column,
        "feature_count":   len(feature_list),
        "features":        feature_list,
        "input_datasets":  [
            {"name": name, "snapshot": snapshot}
            for name, snapshot in input_datasets
        ],
        "lineage": {
            "provenance_files": [
                f"lineage/{name.replace('.', '_')}_lineage.json"
                for name, _ in input_datasets
            ],
            "feature_pipeline_version": "churn_features_v2.1.3",
            "feature_pipeline_code":    "pipelines/churn_features.py",
            "git_commit":               "a3f8d2e"
        }
    }

    return manifest

manifest = create_model_training_manifest(
    model_name     = "customer_churn_classifier",
    model_version  = "v3.2",
    input_datasets = [
        ("warehouse.mart_customer_summary", "2024-09-14T06:00:00Z"),
        ("feature_store.churn_features",    "2024-09-14T07:30:00Z")
    ],
    feature_list   = ["lifetime_value", "recency_days", "n_orders",
                       "avg_order_value", "n_high_severity_tickets"],
    target_column  = "churned_30d",
    training_date  = "2024-09-15T10:00:00Z"
)
print(json.dumps(manifest, indent=2))

Compliance and Regulatory Lineage

For regulated industries (healthcare, finance, insurance), lineage is not optional — it is a compliance requirement.

Python
# GDPR Article 30 requires maintaining records of processing activities:
# what data, from where, processed how, for what purpose, by whom

@dataclass
class ComplianceLineageRecord:
    """
    Data processing record for GDPR Article 30 compliance.
    Documents every data processing activity involving personal data.
    """
    processing_activity:  str
    personal_data_types:  list   # e.g., ['name', 'email', 'location']
    data_subjects:        str    # e.g., 'EU customers'
    legal_basis:          str    # 'consent', 'contract', 'legitimate_interest', 'legal_obligation'
    purpose:              str
    data_sources:         list
    data_recipients:      list
    retention_period:     str
    third_country_transfer: bool = False
    security_measures:    list = field(default_factory=list)
    created_at:           str = field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat()
    )

# Example: documenting ML model training involving personal data
training_record = ComplianceLineageRecord(
    processing_activity   = "Churn prediction model training",
    personal_data_types   = ["email", "purchase_history", "account_status"],
    data_subjects         = "Registered customers in EU",
    legal_basis           = "legitimate_interest",
    purpose               = "Predicting customer churn to improve retention offers",
    data_sources          = ["analytics_warehouse.dim_customer",
                              "analytics_warehouse.fact_sales"],
    data_recipients       = ["data-science-team", "marketing-analytics"],
    retention_period      = "Model artifact: 2 years. Training data: not stored separately.",
    third_country_transfer= False,
    security_measures     = [
        "Data access restricted to authorized pipeline service accounts",
        "Customer IDs pseudonymized in model outputs",
        "Data not used for individual profiling in model outputs"
    ]
)

Best Practices for Provenance and Lineage

1. Write Provenance as You Go

The hardest part of provenance is capturing it — not analyzing it afterward. Build provenance recording into every pipeline step:

Python
def with_provenance(func):
    """Decorator that automatically records provenance for pipeline functions."""
    import functools
    import time

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        duration = time.time() - start

        # Auto-record basic provenance
        if hasattr(result, '__len__'):
            print(f"[Provenance] {func.__name__}: {len(result):,} rows in {duration:.1f}s")

        return result
    return wrapper

2. Use dbt for SQL Transformation Lineage

If your team uses SQL for transformations, dbt provides automatic lineage for free. The {{ ref() }} pattern is both a dependency declaration and a lineage record.

3. Version Your Datasets

When creating datasets that will be used for model training, save a version identifier (snapshot date, Git commit, pipeline version):

Python
import pandas as pd
from datetime import datetime, timezone

def save_versioned_dataset(df: pd.DataFrame,
                            base_path: str,
                            dataset_name: str) -> str:
    """
    Save a dataset with a version timestamp in the filename.
    Returns the full path for provenance recording.
    """
    version = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
    filepath = f"{base_path}/{dataset_name}_{version}.parquet"
    df.to_parquet(filepath, index=False)

    # Also save 'latest' symlink/copy for convenience
    latest_path = f"{base_path}/{dataset_name}_latest.parquet"
    df.to_parquet(latest_path, index=False)

    print(f"Saved: {filepath} ({len(df):,} rows)")
    return filepath

4. Document Column Meanings

Lineage tells you where a column came from. Documentation tells you what it means. Both are necessary for trustworthy data:

Python
# Column documentation template — maintain alongside the dataset
FEATURE_DOCS = {
    "lifetime_value": {
        "description": "Sum of all completed order amounts in USD for this customer, all time",
        "source":       "SUM(fct_sales.line_revenue) for completed orders",
        "unit":         "USD",
        "nullable":     False,
        "business_rule": "Excludes cancelled and refunded orders. Includes tax if charged."
    },
    "recency_days": {
        "description": "Calendar days between the customer's most recent order and the reference date",
        "source":       "reference_date - MAX(fct_sales.order_date)",
        "unit":         "days",
        "nullable":     False,
        "business_rule": "999 for customers who have never ordered. Never negative."
    },
    "n_high_severity_tickets": {
        "description": "Count of support tickets with severity='high' in the lookback period",
        "source":       "SUM(CASE WHEN fact_support_tickets.severity='high' THEN 1 ELSE 0 END)",
        "unit":         "count",
        "nullable":     False,
        "business_rule": "Lookback period defined in CONFIG['lookback_days'] (default: 365 days)"
    }
}

Summary

Data provenance and lineage are the infrastructure of trust in data-driven organizations. Provenance answers where data came from and how it was collected — essential for assessing trustworthiness, satisfying compliance requirements, and enabling reproducibility. Lineage answers what path the data took and what transformations were applied — essential for debugging wrong numbers, assessing the impact of upstream changes, and understanding what a model was actually trained on.

Modern data tools make lineage increasingly automatic: dbt generates table-level lineage from {{ ref() }} dependencies, OpenLineage provides a standard for cross-tool lineage tracking, and data catalogs like Atlan, Alation, and DataHub aggregate lineage from multiple sources. But even without dedicated tooling, data scientists can implement lightweight provenance and lineage practices — documenting data origins in metadata files, tracking column transformations in code, building lineage graphs with NetworkX, and saving versioned datasets — that dramatically improve the debuggability and reproducibility of their work.

The investment in provenance and lineage pays off in the situations that matter most: when a metric is wrong and you need to find why, when a schema change is planned and you need to know what breaks, when a model must be reproduced from six months ago, and when a regulator asks what you did with their constituents’ data.

Key Takeaways

  • Data provenance answers “where did this data originate?” — source system, collection method, collection time, known issues; data lineage answers “what happened to this data along the way?” — which tables fed into which, which transformations were applied, which source columns produced which output columns
  • Table-level lineage (which tables depend on which tables) is the most common and is provided automatically by dbt through {{ ref() }} dependencies; column-level lineage (which source columns produced which output columns) is more granular and requires explicit tracking
  • The two most valuable lineage use cases for data scientists are root cause analysis (tracing a wrong metric back to its source) and impact analysis (understanding which downstream assets break when an upstream dataset changes)
  • Model reproducibility requires recording provenance at training time: exactly which dataset versions, which features, which pipeline code version, and which date snapshot produced a specific model artifact
  • OpenLineage is the open standard for emitting lineage events across tools (Spark, Airflow, dbt, Flink) to a central lineage store; tools like Marquez, Atlan, DataHub, and Apache Atlas provide the storage and visualization layer
  • For regulated industries, data lineage is a compliance requirement — GDPR Article 30 requires documenting what personal data is processed, from where, for what purpose, and under what legal basis
  • Even without dedicated lineage tooling, lightweight practices provide most of the value: DataProvenance metadata files alongside each dataset, DatasetLineage JSON records for each pipeline step, versioned dataset filenames, and column-level documentation
  • The LineageGraph pattern — tracking datasets as nodes and transformations as edges in a directed acyclic graph — enables programmatic impact analysis, root cause tracing, and lineage path discovery without requiring commercial tooling
Share:
Subscribe
Notify of
0 Comments

Discover More

Implementing Linear Regression from Scratch in Python

Implementing Linear Regression from Scratch in Python

Learn to implement linear regression from scratch in Python using NumPy. Build gradient descent, the…

Ohm’s Law: Relationship Between Voltage, Current and Resistance

Learn about Ohm’s Law, its applications and practical examples. Discover how voltage, current and resistance…

Moving into Data Science from a Business Background

Learn how to transition from business roles to data science. Discover how your business acumen…

What Is System Performance Monitoring?

What Is System Performance Monitoring?

Learn what system performance monitoring is, which metrics matter, how operating systems track CPU, memory,…

Anomaly Detection: Finding Outliers in Your Data

Anomaly Detection: Finding Outliers in Your Data

Master anomaly detection from first principles. Learn Isolation Forest, Local Outlier Factor, One-Class SVM, statistical…

Operator Overloading in C++: Making Your Classes Intuitive

Operator Overloading in C++: Making Your Classes Intuitive

Learn C++ operator overloading to create intuitive custom classes. Master arithmetic, comparison, stream, and assignment…

Click For More
0
Would love your thoughts, please comment.x
()
x