Data Quality: What Makes Data Good or Bad

Learn the six dimensions of data quality: completeness, accuracy, consistency, timeliness, validity, and uniqueness. Master detection, measurement, and remediation of data quality issues in Python.

Data Quality: What Makes Data Good or Bad

Data quality is the degree to which data is fit for its intended use. Good data is complete (all required values present), accurate (values reflect reality), consistent (same facts represented the same way across systems), timely (current enough for the analysis at hand), valid (values conform to defined rules and formats), and unique (no duplicate records). Poor data quality is the leading cause of wrong analytical conclusions, failed machine learning models, and eroded stakeholder trust in data-driven decisions. Measuring, monitoring, and systematically improving data quality is one of the most impactful skills a data scientist can develop.

Introduction

The most sophisticated machine learning model, the most elegant analysis, the most compelling visualization — all of it is worthless if the underlying data is wrong. Garbage in, garbage out is not just a catchy phrase; it describes the single most common root cause of failed data science projects.

Poor data quality takes many forms. A customer table where 30% of email addresses are invalid. A transaction log where the same purchase appears three times due to a processing error. A sensor dataset where readings of 9999.0 represent “device offline” but get treated as real measurements. A sales table where revenue is recorded in different currencies across different regions with no conversion applied. A patient records system where the same person has five different spellings of their name across five encounters. Each of these is a data quality problem, and each will silently corrupt any analysis or model built on top of it.

What makes data quality especially dangerous is that most problems are silent. Your code runs, the model trains, the dashboard loads — but the answers are wrong. The average order value is inflated because duplicate transactions were counted. The churn model is biased because historical cancellations are systematically underreported in one region. The revenue forecast is off by 15% because one year of data was recorded in the wrong currency.

This article is a comprehensive guide to data quality: the six core dimensions, how to detect and measure quality problems in Python, how to prioritize and fix them, and how to build systematic quality monitoring that catches problems before they corrupt your work.

The Six Dimensions of Data Quality

Data quality researchers and practitioners have converged on six core dimensions that capture the different ways data can fail to be fit for purpose. Understanding these dimensions gives you a vocabulary for diagnosing quality problems and a framework for designing quality checks.

1. Completeness

Completeness measures whether all required data is present. Missing values, empty fields, and absent records all represent incompleteness.

Python
import pandas as pd
import numpy as np

def measure_completeness(df: pd.DataFrame) -> pd.DataFrame:
    """
    Measure completeness for each column in a DataFrame.

    Returns a summary showing null counts, null rates, and
    a completeness score (1.0 = fully complete, 0.0 = all nulls).
    """
    n = len(df)
    summary = pd.DataFrame({
        "column":       df.columns,
        "total_rows":   n,
        "null_count":   df.isnull().sum().values,
        "non_null":     df.notnull().sum().values,
    })
    summary["null_rate"]     = summary["null_count"] / n
    summary["completeness"]  = 1 - summary["null_rate"]

    # Classify completeness
    summary["status"] = pd.cut(
        summary["completeness"],
        bins=[-0.001, 0.90, 0.99, 1.001],
        labels=["Poor (<90%)", "Warning (90-99%)", "Good (≥99%)"]
    )
    return summary.sort_values("completeness")


# Example: customer dataset with various missing patterns
customers = pd.DataFrame({
    "customer_id":    ["C001", "C002", "C003", "C004", "C005"],
    "name":           ["Jane Smith", None, "Alice Williams", "Carlos Garcia", "Sophie Brown"],
    "email":          ["jane@ex.com", "bob@ex.com", None, "carlos@ex.com", None],
    "phone":          ["512-555-0101", None, None, None, "44-20-7123"],
    "city":           ["Austin", "Seattle", None, "Miami", "London"],
    "lifetime_value": [384.96, 139.98, 269.97, None, 184.98]
})

completeness_report = measure_completeness(customers)
print(completeness_report.to_string(index=False))

Output:

Plaintext
column  total_rows  null_count  non_null  null_rate  completeness         status
phone           5           3         2       0.60         0.40         Poor (<90%)
email           5           2         3       0.40         0.60         Poor (<90%)
city            5           1         4       0.20         0.80         Poor (<90%)
name            5           1         4       0.20         0.80         Poor (<90%)
lifetime_value  5           1         4       0.20         0.80         Poor (<90%)
customer_id     5           0         5       0.00         1.00         Good (≥99%)

Completeness problems have different causes and solutions:

  • Structurally missing: Field not applicable (phone number for a B2B company) — expected and acceptable
  • Missing at random: Genuinely unknown at data entry — imputation may be appropriate
  • Systematically missing: Missing for a specific segment (e.g., international customers have no US phone) — must understand the pattern before imputing

2. Accuracy

Accuracy measures whether data values correctly reflect the real-world entity or event being described. It’s the hardest dimension to measure because it requires an external ground truth.

Python
import pandas as pd
import re

def check_email_accuracy(series: pd.Series) -> pd.DataFrame:
    """Check email addresses for format validity as a proxy for accuracy."""
    # Regex for basic email format validation
    email_pattern = r'^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$'

    results = pd.DataFrame({
        "email":        series,
        "is_valid_format": series.apply(
            lambda x: bool(re.match(email_pattern, str(x))) if pd.notna(x) else False
        )
    })

    n_valid   = results["is_valid_format"].sum()
    n_total   = len(series.dropna())
    n_missing = series.isna().sum()

    print(f"Email accuracy check:")
    print(f"  Valid format:   {n_valid} / {n_total} ({n_valid/n_total*100:.1f}%)")
    print(f"  Missing:        {n_missing}")
    print(f"\nInvalid emails:")
    invalid = results[~results["is_valid_format"] & results["email"].notna()]
    print(invalid["email"].tolist())

    return results


# Accuracy issues in practice
emails = pd.Series([
    "jane.smith@email.com",
    "bob.johnson@",            # Missing domain
    "alice@email.com",
    "not-an-email",            # No @ sign
    "carlos@email.c",          # TLD too short
    "sophie@email.com",
    "wei zhang@email.com",     # Space in local part
    None
])

check_email_accuracy(emails)

# Geographic accuracy check — lat/lon should be within valid ranges
def check_coordinate_accuracy(lat_series: pd.Series,
                                lon_series: pd.Series) -> pd.Series:
    """Flag coordinates outside valid lat/lon ranges."""
    invalid_lat = (lat_series < -90)  | (lat_series > 90)
    invalid_lon = (lon_series < -180) | (lon_series > 180)
    return invalid_lat | invalid_lon

# Cross-field accuracy: delivery_date should not precede order_date
def check_date_sequence_accuracy(df: pd.DataFrame,
                                  earlier_col: str,
                                  later_col: str) -> pd.Series:
    """Flag records where the later_col date precedes the earlier_col date."""
    invalid = df[later_col] < df[earlier_col]
    n_invalid = invalid.sum()
    if n_invalid > 0:
        print(f"⚠️  {n_invalid} records where {later_col} < {earlier_col}")
    return invalid

3. Consistency

Consistency measures whether the same fact is represented the same way across different tables, systems, or time periods.

Python
import pandas as pd

def check_cross_table_consistency(
    df_a: pd.DataFrame,
    df_b: pd.DataFrame,
    key_col: str,
    value_col: str,
    table_a_name: str = "Table A",
    table_b_name: str = "Table B"
) -> pd.DataFrame:
    """
    Check that a value is consistent for the same key across two tables.
    Reports keys where the value differs.
    """
    merged = pd.merge(
        df_a[[key_col, value_col]].rename(columns={value_col: f"{value_col}_a"}),
        df_b[[key_col, value_col]].rename(columns={value_col: f"{value_col}_b"}),
        on=key_col,
        how="inner"
    )

    inconsistent = merged[
        merged[f"{value_col}_a"] != merged[f"{value_col}_b"]
    ]

    n_checked      = len(merged)
    n_inconsistent = len(inconsistent)
    consistency_rate = 1 - (n_inconsistent / n_checked) if n_checked > 0 else 1.0

    print(f"Consistency check: {value_col} across {table_a_name} vs {table_b_name}")
    print(f"  Records checked:     {n_checked:,}")
    print(f"  Inconsistent:        {n_inconsistent:,} ({(1-consistency_rate)*100:.1f}%)")
    print(f"  Consistency rate:    {consistency_rate:.3f}")

    if n_inconsistent > 0:
        print(f"\nSample inconsistencies:")
        print(inconsistent.head(10).to_string(index=False))

    return inconsistent


# Example: customer revenue recorded differently in CRM vs Data Warehouse
crm_data = pd.DataFrame({
    "customer_id":   ["C001", "C002", "C003"],
    "annual_revenue": [85000.0, 62000.0, 110000.0]
})

warehouse_data = pd.DataFrame({
    "customer_id":   ["C001", "C002", "C003"],
    "annual_revenue": [85000.0, 65000.0, 110000.0]  # C002 differs!
})

inconsistencies = check_cross_table_consistency(
    crm_data, warehouse_data,
    key_col="customer_id",
    value_col="annual_revenue",
    table_a_name="CRM",
    table_b_name="Data Warehouse"
)

# Within-column consistency: same concept represented differently
def check_value_standardization(series: pd.Series,
                                  canonical_values: list = None) -> pd.DataFrame:
    """
    Identify non-standard representations of categorical values.
    Common data quality issue: 'USA', 'US', 'United States', 'U.S.A.' for the same country.
    """
    value_counts = series.value_counts(dropna=False).reset_index()
    value_counts.columns = ["value", "count"]
    value_counts["pct"] = value_counts["count"] / len(series) * 100

    if canonical_values:
        value_counts["is_canonical"] = value_counts["value"].isin(canonical_values)
        non_canonical = value_counts[~value_counts["is_canonical"] &
                                      value_counts["value"].notna()]
        if len(non_canonical) > 0:
            print(f"Non-canonical values found:")
            print(non_canonical.to_string(index=False))

    return value_counts


# Check country code consistency
country_col = pd.Series([
    "USA", "US", "United States", "USA", "U.S.A.",
    "UK", "United Kingdom", "GB", "USA", "DE"
])
check_value_standardization(country_col, canonical_values=["USA", "UK", "DE", "FR"])

4. Timeliness

Timeliness measures whether data is current enough for its intended use. Data can be perfectly accurate as of when it was captured but stale relative to when it’s being used.

Python
import pandas as pd
from datetime import datetime, timezone, timedelta

def check_timeliness(
    df: pd.DataFrame,
    timestamp_col: str,
    max_age_hours: float = 24.0,
    reference_time: datetime = None
) -> dict:
    """
    Check whether data records are fresh enough for use.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame containing the timestamp column.
    timestamp_col : str
        Column containing the data timestamp.
    max_age_hours : float
        Maximum acceptable age in hours.
    reference_time : datetime, optional
        Reference point for age calculation. Defaults to now (UTC).

    Returns
    -------
    dict
        Timeliness statistics and fresh/stale breakdown.
    """
    if reference_time is None:
        reference_time = datetime.now(timezone.utc)

    ref_ts = pd.Timestamp(reference_time)
    ts_col = pd.to_datetime(df[timestamp_col])

    # Handle timezone-naive timestamps by assuming UTC
    if ts_col.dt.tz is None:
        ts_col = ts_col.dt.tz_localize("UTC")

    age_hours = (ref_ts - ts_col).dt.total_seconds() / 3600

    is_stale = age_hours > max_age_hours
    n_stale  = is_stale.sum()
    n_fresh  = (~is_stale).sum()
    pct_stale = n_stale / len(df) * 100

    oldest_record = age_hours.max()
    newest_record = age_hours.min()
    median_age    = age_hours.median()

    print(f"Timeliness check: {timestamp_col} (max age: {max_age_hours}h)")
    print(f"  Fresh records:  {n_fresh:,} ({100-pct_stale:.1f}%)")
    print(f"  Stale records:  {n_stale:,} ({pct_stale:.1f}%)")
    print(f"  Oldest record:  {oldest_record:.1f} hours ago")
    print(f"  Newest record:  {newest_record:.2f} hours ago")
    print(f"  Median age:     {median_age:.1f} hours")

    return {
        "n_fresh": n_fresh, "n_stale": n_stale,
        "pct_stale": pct_stale, "oldest_hours": oldest_record,
        "max_allowed_hours": max_age_hours
    }

5. Validity

Validity measures whether data values conform to defined business rules, formats, and constraints — even if they’re plausible at first glance.

Python
import pandas as pd
import numpy as np
from typing import Callable

def validate_column(
    series: pd.Series,
    col_name: str,
    rules: list
) -> pd.DataFrame:
    """
    Apply a list of validation rules to a column.

    Each rule is a tuple: (rule_name, condition_function).
    The condition function takes a Series and returns a boolean mask
    where True = VALID.

    Parameters
    ----------
    series : pd.Series
        Column to validate.
    col_name : str
        Column name for reporting.
    rules : list of (str, callable)
        List of (rule_name, is_valid_fn) tuples.

    Returns
    -------
    pd.DataFrame
        Rule-by-rule validation results.
    """
    results = []
    for rule_name, is_valid_fn in rules:
        try:
            is_valid   = is_valid_fn(series)
            n_valid    = is_valid.sum()
            n_invalid  = (~is_valid).sum()
            pass_rate  = n_valid / len(series) if len(series) > 0 else 1.0
            status     = "✓ PASS" if pass_rate >= 0.99 else \
                         "⚠ WARN" if pass_rate >= 0.95 else "✗ FAIL"
            results.append({
                "column":    col_name,
                "rule":      rule_name,
                "n_valid":   n_valid,
                "n_invalid": n_invalid,
                "pass_rate": round(pass_rate, 4),
                "status":    status
            })
        except Exception as e:
            results.append({
                "column": col_name, "rule": rule_name,
                "n_valid": None, "n_invalid": None,
                "pass_rate": None, "status": f"ERROR: {e}"
            })

    return pd.DataFrame(results)


# Example validation rules for an orders table
orders = pd.DataFrame({
    "order_id":     ["ORD_001", "ORD_002", None, "ORD_004", "ORD_004"],  # None + duplicate
    "amount":       [149.99, -25.00, 89.99, 0.00, 149.99],               # Negative + zero
    "customer_id":  ["CUST_001", "CUST_002", "CUST_003", "CUST_004", "CUST_004"],
    "status":       ["completed", "cancelled", "PENDING", "invalid_status", "completed"],
    "order_date":   pd.to_datetime(["2024-09-01", "2024-09-02", "2024-09-03",
                                     "2099-01-01", "2024-09-05"]),  # Future date
    "email":        ["a@b.com", "not-email", "c@d.com", "e@f.com", "g@h.com"]
})

VALID_STATUSES = {"completed", "cancelled", "pending", "shipped", "processing"}
EMAIL_PATTERN = r'^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$'
TODAY = pd.Timestamp.now()

rules_order_id = [
    ("not_null",   lambda s: s.notna()),
    ("unique",     lambda s: ~s.duplicated(keep=False)),
    ("format_ORD", lambda s: s.str.startswith("ORD_").fillna(False))
]

rules_amount = [
    ("not_null",     lambda s: s.notna()),
    ("positive",     lambda s: s > 0),
    ("under_100k",   lambda s: s < 100_000),
]

rules_status = [
    ("not_null",         lambda s: s.notna()),
    ("canonical_value",  lambda s: s.str.lower().isin(VALID_STATUSES).fillna(False)),
]

rules_order_date = [
    ("not_null",     lambda s: s.notna()),
    ("not_future",   lambda s: s <= TODAY),
    ("not_before_2020", lambda s: s >= pd.Timestamp("2020-01-01")),
]

# Run all validations and combine into a single report
all_validations = pd.concat([
    validate_column(orders["order_id"],   "order_id",   rules_order_id),
    validate_column(orders["amount"],     "amount",     rules_amount),
    validate_column(orders["status"],     "status",     rules_status),
    validate_column(orders["order_date"], "order_date", rules_order_date),
])

print(all_validations.to_string(index=False))

6. Uniqueness

Uniqueness measures whether records appear exactly once when they should. Duplicate records are one of the most common and damaging data quality problems.

Python
import pandas as pd

def detect_duplicates(
    df: pd.DataFrame,
    subset: list = None,
    keep: str = "first"
) -> dict:
    """
    Detect and characterize duplicate records in a DataFrame.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame to check.
    subset : list, optional
        Columns to consider for duplicate detection.
        If None, all columns are used.
    keep : str
        Which duplicate to mark as the "original":
        'first', 'last', or False (mark all as duplicates).

    Returns
    -------
    dict
        Duplicate statistics and a DataFrame of duplicate records.
    """
    if subset is None:
        subset = list(df.columns)

    n_total    = len(df)
    is_dup     = df.duplicated(subset=subset, keep=keep)
    n_dupes    = is_dup.sum()
    n_unique   = df[subset].drop_duplicates().shape[0]
    dupe_rate  = n_dupes / n_total if n_total > 0 else 0.0

    print(f"Uniqueness check on: {subset}")
    print(f"  Total rows:      {n_total:,}")
    print(f"  Unique key sets: {n_unique:,}")
    print(f"  Duplicate rows:  {n_dupes:,} ({dupe_rate*100:.2f}%)")

    if n_dupes > 0:
        # Show the most-duplicated keys
        dup_counts = (
            df[subset]
            .value_counts()
            .reset_index()
            .query("count > 1")
            .sort_values("count", ascending=False)
            .head(10)
        )
        print(f"\nMost duplicated keys (top 10):")
        print(dup_counts.to_string(index=False))

    return {
        "n_total": n_total, "n_unique": n_unique,
        "n_duplicates": n_dupes, "duplicate_rate": dupe_rate,
        "duplicate_rows": df[is_dup].copy()
    }


# Detect duplicates in an orders table
orders_with_dupes = pd.DataFrame({
    "order_id":    ["ORD_001", "ORD_001", "ORD_002", "ORD_003", "ORD_003", "ORD_003"],
    "customer_id": ["C001",    "C001",    "C002",    "C003",    "C003",    "C003"],
    "amount":      [149.99,    149.99,    89.99,     29.99,     29.99,     29.99],
    "status":      ["completed","completed","shipped","pending","pending","completed"]
})

# Exact duplicates (all columns match)
exact_result = detect_duplicates(orders_with_dupes)

# Key duplicates (order_id should be unique)
key_result = detect_duplicates(orders_with_dupes, subset=["order_id"])
# ORD_001 appears twice (exact dupe), ORD_003 appears 3 times (status changed)

Building a Comprehensive Data Quality Report

Now let’s build a unified quality report that measures all six dimensions:

Python
import pandas as pd
import numpy as np
from datetime import datetime, timezone

def data_quality_report(
    df: pd.DataFrame,
    dataset_name: str,
    primary_key: list = None,
    timestamp_col: str = None,
    validation_rules: dict = None,
    categorical_cols: dict = None
) -> dict:
    """
    Generate a comprehensive data quality report across all six dimensions.

    Parameters
    ----------
    df : pd.DataFrame
        Dataset to assess.
    dataset_name : str
        Human-readable name for reporting.
    primary_key : list, optional
        Columns that should be unique (for uniqueness check).
    timestamp_col : str, optional
        Timestamp column for timeliness check.
    validation_rules : dict, optional
        {column_name: [(rule_name, fn), ...]} for validity checks.
    categorical_cols : dict, optional
        {column_name: [list_of_valid_values]} for consistency checks.

    Returns
    -------
    dict
        Quality scores across all dimensions.
    """
    n = len(df)
    report = {
        "dataset":      dataset_name,
        "rows":         n,
        "columns":      len(df.columns),
        "assessed_at":  datetime.now(timezone.utc).isoformat(),
        "dimensions":   {}
    }

    print(f"\n{'='*60}")
    print(f"DATA QUALITY REPORT: {dataset_name}")
    print(f"Rows: {n:,} | Columns: {len(df.columns)}")
    print(f"{'='*60}")

    # ── 1. COMPLETENESS ────────────────────────────────────────────
    null_rates = df.isnull().mean()
    completeness_score = 1 - null_rates.mean()
    critical_nulls = null_rates[null_rates > 0.20].to_dict()

    print(f"\n[1] COMPLETENESS: {completeness_score:.3f}")
    for col, rate in null_rates[null_rates > 0].items():
        icon = "" if rate > 0.20 else "" if rate > 0.05 else ""
        print(f"    {icon} {col}: {rate*100:.1f}% null")

    report["dimensions"]["completeness"] = {
        "score": round(completeness_score, 4),
        "critical_cols": critical_nulls
    }

    # ── 2. UNIQUENESS ──────────────────────────────────────────────
    if primary_key:
        n_unique = df[primary_key].drop_duplicates().shape[0]
        uniqueness_score = n_unique / n if n > 0 else 1.0
        n_dupes = n - n_unique

        print(f"\n[2] UNIQUENESS ({primary_key}): {uniqueness_score:.3f}")
        if n_dupes > 0:
            print(f"    ✗ {n_dupes:,} duplicate key values found")
        else:
            print(f"    ✓ All keys unique")
    else:
        exact_dupes = df.duplicated().sum()
        uniqueness_score = 1 - (exact_dupes / n) if n > 0 else 1.0
        print(f"\n[2] UNIQUENESS (exact rows): {uniqueness_score:.3f}")
        if exact_dupes > 0:
            print(f"    ✗ {exact_dupes:,} exact duplicate rows")

    report["dimensions"]["uniqueness"] = {"score": round(uniqueness_score, 4)}

    # ── 3. VALIDITY ────────────────────────────────────────────────
    if validation_rules:
        validity_scores = []
        print(f"\n[3] VALIDITY:")
        for col, rules in validation_rules.items():
            if col not in df.columns:
                continue
            for rule_name, rule_fn in rules:
                try:
                    is_valid = rule_fn(df[col])
                    pass_rate = is_valid.sum() / n
                    validity_scores.append(pass_rate)
                    icon = "" if pass_rate >= 0.99 else "" if pass_rate >= 0.95 else ""
                    print(f"    {icon} {col}.{rule_name}: {pass_rate*100:.1f}% valid")
                except Exception as e:
                    print(f"    ✗ {col}.{rule_name}: ERROR ({e})")

        validity_score = np.mean(validity_scores) if validity_scores else 1.0
    else:
        validity_score = None
        print(f"\n[3] VALIDITY: No rules defined")

    report["dimensions"]["validity"] = {
        "score": round(validity_score, 4) if validity_score else None
    }

    # ── 4. CONSISTENCY ─────────────────────────────────────────────
    if categorical_cols:
        consistency_scores = []
        print(f"\n[4] CONSISTENCY:")
        for col, valid_values in categorical_cols.items():
            if col not in df.columns:
                continue
            non_null = df[col].dropna()
            is_valid = non_null.isin(valid_values)
            rate = is_valid.mean()
            consistency_scores.append(rate)
            icon = "" if rate >= 0.99 else "" if rate >= 0.95 else ""
            invalid_vals = non_null[~is_valid].unique()[:5]
            print(f"    {icon} {col}: {rate*100:.1f}% canonical "
                  f"({len(invalid_vals)} non-standard values: {list(invalid_vals)})")

        consistency_score = np.mean(consistency_scores) if consistency_scores else 1.0
    else:
        consistency_score = None
        print(f"\n[4] CONSISTENCY: No canonical value sets defined")

    report["dimensions"]["consistency"] = {
        "score": round(consistency_score, 4) if consistency_score else None
    }

    # ── 5. TIMELINESS ──────────────────────────────────────────────
    if timestamp_col and timestamp_col in df.columns:
        ts_col = pd.to_datetime(df[timestamp_col], errors="coerce")
        now = pd.Timestamp.now(tz="UTC")
        if ts_col.dt.tz is None:
            ts_col = ts_col.dt.tz_localize("UTC")
        max_age_hours = (now - ts_col.max()).total_seconds() / 3600
        median_age_hours = (now - ts_col.median()).total_seconds() / 3600
        timeliness_score = max(0, 1 - (max_age_hours / (24 * 30)))  # Normalized to 30 days

        print(f"\n[5] TIMELINESS ({timestamp_col}):")
        print(f"    Newest record: {max_age_hours:.1f}h ago")
        print(f"    Median record age: {median_age_hours:.1f}h")
    else:
        timeliness_score = None
        print(f"\n[5] TIMELINESS: No timestamp column specified")

    report["dimensions"]["timeliness"] = {
        "score": round(timeliness_score, 4) if timeliness_score else None
    }

    # ── 6. OVERALL SCORE ───────────────────────────────────────────
    scores = [v["score"] for v in report["dimensions"].values()
              if v.get("score") is not None]
    overall = np.mean(scores) if scores else None

    print(f"\n{''*40}")
    print(f"OVERALL QUALITY SCORE: {overall:.3f}" if overall else "OVERALL: Partial assessment")
    interpretation = (
        "Excellent" if overall and overall >= 0.95 else
        "Good"      if overall and overall >= 0.90 else
        "Fair"      if overall and overall >= 0.80 else
        "Poor"
    )
    print(f"Assessment: {interpretation}")
    print(f"{'='*60}")

    report["overall_score"] = round(overall, 4) if overall else None
    report["interpretation"] = interpretation
    return report


# Apply to our sample orders table
report = data_quality_report(
    df=orders,
    dataset_name="E-Commerce Orders",
    primary_key=["order_id"],
    timestamp_col="order_date",
    validation_rules={
        "amount":  [("positive", lambda s: s > 0),
                     ("under_100k", lambda s: s < 100_000)],
        "status":  [("canonical", lambda s: s.str.lower().isin(
                        {"completed","cancelled","pending","shipped","processing"}
                    ).fillna(False))],
    },
    categorical_cols={
        "status": ["completed", "cancelled", "pending", "shipped", "processing"]
    }
)

Common Data Quality Problems and How to Fix Them

Problem 1: Inconsistent Categorical Values

Python
import pandas as pd

def standardize_categorical(series: pd.Series,
                              mapping: dict = None,
                              to_lower: bool = True,
                              strip: bool = True) -> pd.Series:
    """
    Standardize categorical values to canonical forms.

    Parameters
    ----------
    series : pd.Series
        Categorical column to standardize.
    mapping : dict, optional
        {raw_value: canonical_value} explicit mapping.
    to_lower : bool
        Lowercase before mapping lookup.
    strip : bool
        Strip whitespace before mapping lookup.

    Returns
    -------
    pd.Series
        Standardized series.
    """
    result = series.copy()
    if strip:
        result = result.str.strip()
    if to_lower:
        result = result.str.lower()
    if mapping:
        result = result.map(mapping).fillna(result)
    return result


# Standardize country codes
country_col = pd.Series([
    "USA", "US", "United States", "us", " USA ",
    "UK", "United Kingdom", "GB", "gb",
    "Germany", "DE", "de", "DEU"
])

country_mapping = {
    "us": "USA", "united states": "USA", "u.s.a.": "USA",
    "uk": "GBR", "united kingdom": "GBR", "gb": "GBR",
    "germany": "DEU", "de": "DEU", "deutschland": "DEU"
}

standardized = standardize_categorical(
    country_col,
    mapping=country_mapping,
    to_lower=True,
    strip=True
)
print(pd.DataFrame({"original": country_col, "standardized": standardized}))

Problem 2: Duplicate Records

Python
import pandas as pd

def deduplicate(
    df: pd.DataFrame,
    key_cols: list,
    tie_breaker_col: str = None,
    tie_breaker_ascending: bool = False,
    log: bool = True
) -> pd.DataFrame:
    """
    Remove duplicate records, keeping the most relevant version.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame with potential duplicates.
    key_cols : list
        Columns that define uniqueness.
    tie_breaker_col : str, optional
        Column to sort by when choosing which duplicate to keep
        (e.g., 'updated_at' to keep the most recent).
    tie_breaker_ascending : bool
        Sort ascending (False = keep highest/latest value).
    log : bool
        Whether to print deduplication statistics.

    Returns
    -------
    pd.DataFrame
        Deduplicated DataFrame.
    """
    n_before = len(df)

    if tie_breaker_col:
        df = df.sort_values(tie_breaker_col, ascending=tie_breaker_ascending)

    df_deduped = df.drop_duplicates(subset=key_cols, keep="first")
    n_after  = len(df_deduped)
    n_removed = n_before - n_after

    if log:
        print(f"Deduplication on {key_cols}:")
        print(f"  Before: {n_before:,} rows")
        print(f"  After:  {n_after:,} rows")
        print(f"  Removed:{n_removed:,} duplicates ({n_removed/n_before*100:.2f}%)")

    return df_deduped.reset_index(drop=True)


# Keep most recent record per customer
customer_dupes = pd.DataFrame({
    "customer_id": ["C001", "C001", "C002", "C003", "C003"],
    "name":        ["Jane Smith", "Jane Smith-Torres", "Bob", "Alice", "Alice W."],
    "updated_at":  pd.to_datetime(["2022-01-01", "2024-06-01",
                                    "2023-03-15", "2022-07-01", "2023-11-20"])
})

clean_customers = deduplicate(
    customer_dupes,
    key_cols=["customer_id"],
    tie_breaker_col="updated_at",
    tie_breaker_ascending=False  # Keep most recent
)

Problem 3: Outliers and Implausible Values

Python
import pandas as pd
import numpy as np

def detect_numeric_outliers(
    series: pd.Series,
    method: str = "iqr",
    threshold: float = 3.0
) -> pd.Series:
    """
    Detect outliers in a numeric column.

    Parameters
    ----------
    series : pd.Series
        Numeric column to check.
    method : str
        'iqr' (interquartile range) or 'zscore' (standard deviation).
    threshold : float
        For IQR: multiplier (standard is 1.5 or 3.0).
        For z-score: number of standard deviations.

    Returns
    -------
    pd.Series
        Boolean mask where True = outlier.
    """
    series = series.dropna()

    if method == "iqr":
        Q1 = series.quantile(0.25)
        Q3 = series.quantile(0.75)
        IQR = Q3 - Q1
        lower = Q1 - threshold * IQR
        upper = Q3 + threshold * IQR
        is_outlier = (series < lower) | (series > upper)

    elif method == "zscore":
        z_scores = (series - series.mean()) / series.std()
        is_outlier = z_scores.abs() > threshold

    n_outliers = is_outlier.sum()
    print(f"Outlier detection ({method}, threshold={threshold}):")
    print(f"  Outliers: {n_outliers:,} / {len(series):,} ({n_outliers/len(series)*100:.2f}%)")
    if n_outliers > 0:
        print(f"  Range of outliers: [{series[is_outlier].min():.2f}, "
              f"{series[is_outlier].max():.2f}]")
        print(f"  Normal range: [{series[~is_outlier].min():.2f}, "
              f"{series[~is_outlier].max():.2f}]")

    return is_outlier.reindex(series.index, fill_value=False)


# Detect outliers in order amounts
amounts = pd.Series([
    150.0, 89.99, 200.0, 45.0, 9999.0, 175.0, 120.0,
    -50.0, 300.0, 85.0, 210.0, 0.001, 195.0
])

outliers = detect_numeric_outliers(amounts, method="iqr", threshold=3.0)
print("\nOutlier values:")
print(amounts[outliers].tolist())  # [9999.0, -50.0, 0.001]

Problem 4: Encoding Sentinel Values

A particularly insidious problem: values that look like data but actually encode “missing” or “unknown”:

Python
import pandas as pd
import numpy as np

def replace_sentinel_values(
    df: pd.DataFrame,
    sentinels: dict = None
) -> pd.DataFrame:
    """
    Replace known sentinel values with proper NaN.

    Sentinel values are values that encode 'unknown' or 'missing'
    but look like real data: 9999, -1, 0, 'N/A', 'NULL', '0000-01-01'.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame containing sentinel values.
    sentinels : dict, optional
        {column_or_dtype: [list_of_sentinel_values]}.
        If None, uses common defaults.

    Returns
    -------
    pd.DataFrame
        DataFrame with sentinels replaced by NaN/NaT.
    """
    df = df.copy()

    default_sentinels = {
        "numeric":  [9999, 9999.0, -1, -999, -9999, 999999],
        "string":   ["N/A", "n/a", "NA", "NULL", "null", "None", "none",
                     "UNKNOWN", "unknown", "?", "", "0"],
        "datetime": ["1900-01-01", "1970-01-01", "0001-01-01", "9999-12-31"]
    }

    if sentinels:
        default_sentinels.update(sentinels)

    # Replace numeric sentinels
    for col in df.select_dtypes(include=[np.number]).columns:
        for s in default_sentinels["numeric"]:
            df[col] = df[col].replace(s, np.nan)

    # Replace string sentinels
    for col in df.select_dtypes(include=["object"]).columns:
        df[col] = df[col].replace(default_sentinels["string"], np.nan)
        df[col] = df[col].replace("", np.nan)

    # Replace datetime sentinels
    for col in df.select_dtypes(include=["datetime64"]).columns:
        for s in default_sentinels["datetime"]:
            df[col] = df[col].replace(pd.Timestamp(s), pd.NaT)

    return df


# Before sentinel replacement
raw_data = pd.DataFrame({
    "customer_id": ["C001", "C002", "C003"],
    "age":         [34, 9999, -1],          # 9999 and -1 are sentinels
    "income":      [85000, -999, 62000],    # -999 is sentinel
    "city":        ["Austin", "NULL", "Chicago"],  # "NULL" is sentinel
    "score":       [0.87, 0.0, 0.65]        # 0.0 might be a real value or sentinel
})

print("Before:", raw_data.isnull().sum().to_dict())
clean_data = replace_sentinel_values(raw_data)
print("After: ", clean_data.isnull().sum().to_dict())

Building a Data Quality Monitoring System

For production pipelines, quality checks must be automated and run continuously:

Python
import pandas as pd
import json
from datetime import datetime, timezone
from pathlib import Path

QUALITY_LOG_PATH = "logs/data_quality_history.jsonl"

class DataQualityMonitor:
    """
    Automated data quality monitoring with alerting and history tracking.

    Runs a configurable set of checks on each pipeline run and
    logs results for trend analysis. Raises exceptions or sends
    alerts when quality drops below thresholds.
    """

    def __init__(self, dataset_name: str, thresholds: dict = None):
        self.dataset_name = dataset_name
        self.thresholds = thresholds or {
            "completeness":  0.95,
            "uniqueness":    0.999,
            "validity":      0.98,
            "max_null_rate": 0.10
        }
        self.run_results = []

    def check(self, df: pd.DataFrame,
               primary_key: list = None) -> dict:
        """Run all quality checks and return results."""
        results = {
            "dataset":    self.dataset_name,
            "run_at":     datetime.now(timezone.utc).isoformat(),
            "n_rows":     len(df),
            "checks":     {},
            "alerts":     []
        }

        # Completeness check
        null_rates    = df.isnull().mean()
        completeness  = 1 - null_rates.mean()
        high_null_cols = null_rates[null_rates > self.thresholds["max_null_rate"]].to_dict()

        results["checks"]["completeness"] = {
            "score":   round(completeness, 4),
            "passed":  completeness >= self.thresholds["completeness"],
            "details": {k: round(v, 4) for k, v in high_null_cols.items()}
        }
        if not results["checks"]["completeness"]["passed"]:
            results["alerts"].append(
                f"Completeness {completeness:.3f} < threshold "
                f"{self.thresholds['completeness']}"
            )

        # Uniqueness check
        if primary_key:
            n_unique      = df[primary_key].drop_duplicates().shape[0]
            uniqueness    = n_unique / len(df) if len(df) > 0 else 1.0
            n_dupes       = len(df) - n_unique

            results["checks"]["uniqueness"] = {
                "score":       round(uniqueness, 4),
                "n_duplicates": n_dupes,
                "passed":      uniqueness >= self.thresholds["uniqueness"]
            }
            if not results["checks"]["uniqueness"]["passed"]:
                results["alerts"].append(
                    f"Uniqueness violation: {n_dupes:,} duplicate "
                    f"keys on {primary_key}"
                )

        # Row count check (detect unexpected drops)
        results["checks"]["row_count"] = {"n_rows": len(df)}

        # Zero-row check
        if len(df) == 0:
            results["alerts"].append("CRITICAL: Dataset has zero rows!")

        # Log results
        self._log_results(results)

        # Print summary
        self._print_summary(results)

        return results

    def _log_results(self, results: dict):
        """Append results to the quality history log."""
        Path(QUALITY_LOG_PATH).parent.mkdir(parents=True, exist_ok=True)
        with open(QUALITY_LOG_PATH, "a") as f:
            f.write(json.dumps(results) + "\n")

    def _print_summary(self, results: dict):
        """Print a concise quality summary."""
        status = "✓ ALL CHECKS PASSED" if not results["alerts"] else \
                 f"⚠ {len(results['alerts'])} ALERTS"
        print(f"\nQuality Check [{self.dataset_name}]: {status}")
        print(f"  Rows: {results['n_rows']:,} | Run: {results['run_at'][:19]}")
        for check, data in results["checks"].items():
            if "score" in data:
                icon = "" if data.get("passed", True) else ""
                print(f"  {icon} {check}: {data['score']:.4f}")
        for alert in results["alerts"]:
            print(f"  ⚠ ALERT: {alert}")

    def get_history(self) -> pd.DataFrame:
        """Load quality history as a DataFrame for trend analysis."""
        if not Path(QUALITY_LOG_PATH).exists():
            return pd.DataFrame()
        records = []
        with open(QUALITY_LOG_PATH) as f:
            for line in f:
                records.append(json.loads(line.strip()))
        return pd.DataFrame(records)

    def raise_on_alerts(self, results: dict):
        """Raise an exception if any quality alerts were triggered."""
        if results["alerts"]:
            raise ValueError(
                f"Data quality checks failed for {self.dataset_name}:\n" +
                "\n".join(f"  • {a}" for a in results["alerts"])
            )


# Usage in a pipeline
monitor = DataQualityMonitor(
    dataset_name="customer_features",
    thresholds={"completeness": 0.95, "uniqueness": 0.999, "max_null_rate": 0.10}
)

results = monitor.check(customers, primary_key=["customer_id"])
# Optionally: fail the pipeline on quality issues
# monitor.raise_on_alerts(results)

The Cost of Poor Data Quality

Data quality problems have measurable business costs that help justify investment in fixing them:

Problem TypeBusiness ImpactExample
Duplicate transactionsRevenue double-counting, wrong KPIs5% duplicate orders inflates revenue reports by 5%
Stale customer segmentsTargeting wrong customersSending premium offers to churned customers
Invalid email addressesFailed campaigns, low deliverability30% invalid emails = 30% wasted marketing spend
Inconsistent currencyWrong financial reportsEUR/USD confusion distorts revenue by up to 10%
Missing key featuresBiased or broken ML modelsModels trained without 20% of customers perform poorly in production
Incorrect geographyWrong market analysisAttributing online sales to wrong region corrupts regional P&L
Sentinel values treated as realExtreme outliers corrupt aggregatesAge=9999 inflates average customer age

The 1-10-100 rule: it costs $1 to verify data quality at entry, $10 to clean it after the fact, and $100 to deal with the consequences of not catching it at all (wrong decisions, rework, reputation damage).

Summary

Data quality is not a one-time cleanup task — it is an ongoing discipline. Good data is complete, accurate, consistent, timely, valid, and unique. Poor data in any of these dimensions silently corrupts analyses, degrades model performance, and erodes trust in data-driven decisions.

The practical workflow is: measure first (quantify quality across all six dimensions before deciding what to fix), prioritize by impact (fix completeness and uniqueness issues first, as they are most likely to produce wrong answers), implement systematic checks (automate quality monitoring so problems surface immediately rather than days later), and make quality visible (report quality scores in dashboards so stakeholders know what confidence to place in the data).

The DataQualityMonitor pattern — running checks at pipeline boundaries, logging results, alerting on violations, and raising exceptions when quality drops below thresholds — is the production-ready approach to ensuring your data is fit for purpose before it reaches analysis or models.

Key Takeaways

  • The six dimensions of data quality are completeness (values present), accuracy (values reflect reality), consistency (same facts represented the same way), timeliness (data is current enough), validity (values conform to business rules), and uniqueness (no unintended duplicates) — poor quality on any dimension can silently corrupt analyses
  • Measure before fixing: always quantify quality across all dimensions first; the most common problems to prioritize are high null rates (completeness), duplicate primary keys (uniqueness), and sentinel values masquerading as real data (accuracy/validity)
  • Validate data at pipeline boundaries — as it enters your system, as it flows between stages, and before it reaches downstream consumers; silent quality degradation discovered in production is far more costly than a failed pipeline check
  • Sentinel values (9999 for unknown age, -1 for missing score, “NULL” strings, “1970-01-01” dates) are among the most insidious quality problems because they pass null checks but corrupt aggregations — always explicitly replace them with proper NaN/NaT
  • Idempotent quality fixes — standardize categoricals, deduplicate with a tie-breaker, replace sentinels — should produce the same result when run multiple times and be documented alongside the data they clean
  • The 1-10-100 rule: it costs 1× to prevent a data quality issue at the source, 10× to fix it in the pipeline, and 100× to deal with its consequences (wrong business decisions, model failures, reputation damage)
  • Build a DataQualityMonitor class that runs checks automatically with each pipeline run, logs results to a history file for trend analysis, and raises exceptions when quality drops below configured thresholds — quality monitoring should be a first-class part of every data pipeline
  • Document quality findings alongside the data: maintain a data quality log that records when issues were discovered, what their root cause was, and how they were resolved — this institutional memory prevents the same problems from recurring
Share:
Subscribe
Notify of
0 Comments

Discover More

NPN versus PNP Transistors: How They Differ and When to Use Each

NPN versus PNP Transistors: How They Differ and When to Use Each

Master the difference between NPN and PNP transistors—polarity, current flow, biasing, circuit configurations—and know exactly…

ETL vs ELT: Understanding Data Pipelines

ETL vs ELT: Understanding Data Pipelines

Learn the difference between ETL and ELT data pipelines. Understand extract, transform, load, when to…

Java Control Flow: if, else, and switch Statements

Learn the fundamentals of Java control flow, including if-else statements, switch cases and loops. Optimize…

Color Theory for Data Visualization: Using Color Effectively in Charts

Learn how to use color effectively in data visualization. Explore color theory, best practices, and…

Understanding AC versus DC: Why Your Wall Outlet and Battery Work Differently

Discover the crucial differences between AC and DC electricity. Learn why batteries provide DC, wall…

Introduction to Data Warehousing Concepts

Introduction to Data Warehousing Concepts

Learn data warehousing fundamentals: OLTP vs OLAP, star schema, dimension and fact tables, slowly changing…

Click For More
0
Would love your thoughts, please comment.x
()
x