Data quality is the degree to which data is fit for its intended use. Good data is complete (all required values present), accurate (values reflect reality), consistent (same facts represented the same way across systems), timely (current enough for the analysis at hand), valid (values conform to defined rules and formats), and unique (no duplicate records). Poor data quality is the leading cause of wrong analytical conclusions, failed machine learning models, and eroded stakeholder trust in data-driven decisions. Measuring, monitoring, and systematically improving data quality is one of the most impactful skills a data scientist can develop.
Introduction
The most sophisticated machine learning model, the most elegant analysis, the most compelling visualization — all of it is worthless if the underlying data is wrong. Garbage in, garbage out is not just a catchy phrase; it describes the single most common root cause of failed data science projects.
Poor data quality takes many forms. A customer table where 30% of email addresses are invalid. A transaction log where the same purchase appears three times due to a processing error. A sensor dataset where readings of 9999.0 represent “device offline” but get treated as real measurements. A sales table where revenue is recorded in different currencies across different regions with no conversion applied. A patient records system where the same person has five different spellings of their name across five encounters. Each of these is a data quality problem, and each will silently corrupt any analysis or model built on top of it.
What makes data quality especially dangerous is that most problems are silent. Your code runs, the model trains, the dashboard loads — but the answers are wrong. The average order value is inflated because duplicate transactions were counted. The churn model is biased because historical cancellations are systematically underreported in one region. The revenue forecast is off by 15% because one year of data was recorded in the wrong currency.
This article is a comprehensive guide to data quality: the six core dimensions, how to detect and measure quality problems in Python, how to prioritize and fix them, and how to build systematic quality monitoring that catches problems before they corrupt your work.
The Six Dimensions of Data Quality
Data quality researchers and practitioners have converged on six core dimensions that capture the different ways data can fail to be fit for purpose. Understanding these dimensions gives you a vocabulary for diagnosing quality problems and a framework for designing quality checks.
1. Completeness
Completeness measures whether all required data is present. Missing values, empty fields, and absent records all represent incompleteness.
import pandas as pd
import numpy as np
def measure_completeness(df: pd.DataFrame) -> pd.DataFrame:
"""
Measure completeness for each column in a DataFrame.
Returns a summary showing null counts, null rates, and
a completeness score (1.0 = fully complete, 0.0 = all nulls).
"""
n = len(df)
summary = pd.DataFrame({
"column": df.columns,
"total_rows": n,
"null_count": df.isnull().sum().values,
"non_null": df.notnull().sum().values,
})
summary["null_rate"] = summary["null_count"] / n
summary["completeness"] = 1 - summary["null_rate"]
# Classify completeness
summary["status"] = pd.cut(
summary["completeness"],
bins=[-0.001, 0.90, 0.99, 1.001],
labels=["Poor (<90%)", "Warning (90-99%)", "Good (≥99%)"]
)
return summary.sort_values("completeness")
# Example: customer dataset with various missing patterns
customers = pd.DataFrame({
"customer_id": ["C001", "C002", "C003", "C004", "C005"],
"name": ["Jane Smith", None, "Alice Williams", "Carlos Garcia", "Sophie Brown"],
"email": ["jane@ex.com", "bob@ex.com", None, "carlos@ex.com", None],
"phone": ["512-555-0101", None, None, None, "44-20-7123"],
"city": ["Austin", "Seattle", None, "Miami", "London"],
"lifetime_value": [384.96, 139.98, 269.97, None, 184.98]
})
completeness_report = measure_completeness(customers)
print(completeness_report.to_string(index=False))Output:
column total_rows null_count non_null null_rate completeness status
phone 5 3 2 0.60 0.40 Poor (<90%)
email 5 2 3 0.40 0.60 Poor (<90%)
city 5 1 4 0.20 0.80 Poor (<90%)
name 5 1 4 0.20 0.80 Poor (<90%)
lifetime_value 5 1 4 0.20 0.80 Poor (<90%)
customer_id 5 0 5 0.00 1.00 Good (≥99%)Completeness problems have different causes and solutions:
- Structurally missing: Field not applicable (phone number for a B2B company) — expected and acceptable
- Missing at random: Genuinely unknown at data entry — imputation may be appropriate
- Systematically missing: Missing for a specific segment (e.g., international customers have no US phone) — must understand the pattern before imputing
2. Accuracy
Accuracy measures whether data values correctly reflect the real-world entity or event being described. It’s the hardest dimension to measure because it requires an external ground truth.
import pandas as pd
import re
def check_email_accuracy(series: pd.Series) -> pd.DataFrame:
"""Check email addresses for format validity as a proxy for accuracy."""
# Regex for basic email format validation
email_pattern = r'^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$'
results = pd.DataFrame({
"email": series,
"is_valid_format": series.apply(
lambda x: bool(re.match(email_pattern, str(x))) if pd.notna(x) else False
)
})
n_valid = results["is_valid_format"].sum()
n_total = len(series.dropna())
n_missing = series.isna().sum()
print(f"Email accuracy check:")
print(f" Valid format: {n_valid} / {n_total} ({n_valid/n_total*100:.1f}%)")
print(f" Missing: {n_missing}")
print(f"\nInvalid emails:")
invalid = results[~results["is_valid_format"] & results["email"].notna()]
print(invalid["email"].tolist())
return results
# Accuracy issues in practice
emails = pd.Series([
"jane.smith@email.com",
"bob.johnson@", # Missing domain
"alice@email.com",
"not-an-email", # No @ sign
"carlos@email.c", # TLD too short
"sophie@email.com",
"wei zhang@email.com", # Space in local part
None
])
check_email_accuracy(emails)
# Geographic accuracy check — lat/lon should be within valid ranges
def check_coordinate_accuracy(lat_series: pd.Series,
lon_series: pd.Series) -> pd.Series:
"""Flag coordinates outside valid lat/lon ranges."""
invalid_lat = (lat_series < -90) | (lat_series > 90)
invalid_lon = (lon_series < -180) | (lon_series > 180)
return invalid_lat | invalid_lon
# Cross-field accuracy: delivery_date should not precede order_date
def check_date_sequence_accuracy(df: pd.DataFrame,
earlier_col: str,
later_col: str) -> pd.Series:
"""Flag records where the later_col date precedes the earlier_col date."""
invalid = df[later_col] < df[earlier_col]
n_invalid = invalid.sum()
if n_invalid > 0:
print(f"⚠️ {n_invalid} records where {later_col} < {earlier_col}")
return invalid3. Consistency
Consistency measures whether the same fact is represented the same way across different tables, systems, or time periods.
import pandas as pd
def check_cross_table_consistency(
df_a: pd.DataFrame,
df_b: pd.DataFrame,
key_col: str,
value_col: str,
table_a_name: str = "Table A",
table_b_name: str = "Table B"
) -> pd.DataFrame:
"""
Check that a value is consistent for the same key across two tables.
Reports keys where the value differs.
"""
merged = pd.merge(
df_a[[key_col, value_col]].rename(columns={value_col: f"{value_col}_a"}),
df_b[[key_col, value_col]].rename(columns={value_col: f"{value_col}_b"}),
on=key_col,
how="inner"
)
inconsistent = merged[
merged[f"{value_col}_a"] != merged[f"{value_col}_b"]
]
n_checked = len(merged)
n_inconsistent = len(inconsistent)
consistency_rate = 1 - (n_inconsistent / n_checked) if n_checked > 0 else 1.0
print(f"Consistency check: {value_col} across {table_a_name} vs {table_b_name}")
print(f" Records checked: {n_checked:,}")
print(f" Inconsistent: {n_inconsistent:,} ({(1-consistency_rate)*100:.1f}%)")
print(f" Consistency rate: {consistency_rate:.3f}")
if n_inconsistent > 0:
print(f"\nSample inconsistencies:")
print(inconsistent.head(10).to_string(index=False))
return inconsistent
# Example: customer revenue recorded differently in CRM vs Data Warehouse
crm_data = pd.DataFrame({
"customer_id": ["C001", "C002", "C003"],
"annual_revenue": [85000.0, 62000.0, 110000.0]
})
warehouse_data = pd.DataFrame({
"customer_id": ["C001", "C002", "C003"],
"annual_revenue": [85000.0, 65000.0, 110000.0] # C002 differs!
})
inconsistencies = check_cross_table_consistency(
crm_data, warehouse_data,
key_col="customer_id",
value_col="annual_revenue",
table_a_name="CRM",
table_b_name="Data Warehouse"
)
# Within-column consistency: same concept represented differently
def check_value_standardization(series: pd.Series,
canonical_values: list = None) -> pd.DataFrame:
"""
Identify non-standard representations of categorical values.
Common data quality issue: 'USA', 'US', 'United States', 'U.S.A.' for the same country.
"""
value_counts = series.value_counts(dropna=False).reset_index()
value_counts.columns = ["value", "count"]
value_counts["pct"] = value_counts["count"] / len(series) * 100
if canonical_values:
value_counts["is_canonical"] = value_counts["value"].isin(canonical_values)
non_canonical = value_counts[~value_counts["is_canonical"] &
value_counts["value"].notna()]
if len(non_canonical) > 0:
print(f"Non-canonical values found:")
print(non_canonical.to_string(index=False))
return value_counts
# Check country code consistency
country_col = pd.Series([
"USA", "US", "United States", "USA", "U.S.A.",
"UK", "United Kingdom", "GB", "USA", "DE"
])
check_value_standardization(country_col, canonical_values=["USA", "UK", "DE", "FR"])4. Timeliness
Timeliness measures whether data is current enough for its intended use. Data can be perfectly accurate as of when it was captured but stale relative to when it’s being used.
import pandas as pd
from datetime import datetime, timezone, timedelta
def check_timeliness(
df: pd.DataFrame,
timestamp_col: str,
max_age_hours: float = 24.0,
reference_time: datetime = None
) -> dict:
"""
Check whether data records are fresh enough for use.
Parameters
----------
df : pd.DataFrame
DataFrame containing the timestamp column.
timestamp_col : str
Column containing the data timestamp.
max_age_hours : float
Maximum acceptable age in hours.
reference_time : datetime, optional
Reference point for age calculation. Defaults to now (UTC).
Returns
-------
dict
Timeliness statistics and fresh/stale breakdown.
"""
if reference_time is None:
reference_time = datetime.now(timezone.utc)
ref_ts = pd.Timestamp(reference_time)
ts_col = pd.to_datetime(df[timestamp_col])
# Handle timezone-naive timestamps by assuming UTC
if ts_col.dt.tz is None:
ts_col = ts_col.dt.tz_localize("UTC")
age_hours = (ref_ts - ts_col).dt.total_seconds() / 3600
is_stale = age_hours > max_age_hours
n_stale = is_stale.sum()
n_fresh = (~is_stale).sum()
pct_stale = n_stale / len(df) * 100
oldest_record = age_hours.max()
newest_record = age_hours.min()
median_age = age_hours.median()
print(f"Timeliness check: {timestamp_col} (max age: {max_age_hours}h)")
print(f" Fresh records: {n_fresh:,} ({100-pct_stale:.1f}%)")
print(f" Stale records: {n_stale:,} ({pct_stale:.1f}%)")
print(f" Oldest record: {oldest_record:.1f} hours ago")
print(f" Newest record: {newest_record:.2f} hours ago")
print(f" Median age: {median_age:.1f} hours")
return {
"n_fresh": n_fresh, "n_stale": n_stale,
"pct_stale": pct_stale, "oldest_hours": oldest_record,
"max_allowed_hours": max_age_hours
}5. Validity
Validity measures whether data values conform to defined business rules, formats, and constraints — even if they’re plausible at first glance.
import pandas as pd
import numpy as np
from typing import Callable
def validate_column(
series: pd.Series,
col_name: str,
rules: list
) -> pd.DataFrame:
"""
Apply a list of validation rules to a column.
Each rule is a tuple: (rule_name, condition_function).
The condition function takes a Series and returns a boolean mask
where True = VALID.
Parameters
----------
series : pd.Series
Column to validate.
col_name : str
Column name for reporting.
rules : list of (str, callable)
List of (rule_name, is_valid_fn) tuples.
Returns
-------
pd.DataFrame
Rule-by-rule validation results.
"""
results = []
for rule_name, is_valid_fn in rules:
try:
is_valid = is_valid_fn(series)
n_valid = is_valid.sum()
n_invalid = (~is_valid).sum()
pass_rate = n_valid / len(series) if len(series) > 0 else 1.0
status = "✓ PASS" if pass_rate >= 0.99 else \
"⚠ WARN" if pass_rate >= 0.95 else "✗ FAIL"
results.append({
"column": col_name,
"rule": rule_name,
"n_valid": n_valid,
"n_invalid": n_invalid,
"pass_rate": round(pass_rate, 4),
"status": status
})
except Exception as e:
results.append({
"column": col_name, "rule": rule_name,
"n_valid": None, "n_invalid": None,
"pass_rate": None, "status": f"ERROR: {e}"
})
return pd.DataFrame(results)
# Example validation rules for an orders table
orders = pd.DataFrame({
"order_id": ["ORD_001", "ORD_002", None, "ORD_004", "ORD_004"], # None + duplicate
"amount": [149.99, -25.00, 89.99, 0.00, 149.99], # Negative + zero
"customer_id": ["CUST_001", "CUST_002", "CUST_003", "CUST_004", "CUST_004"],
"status": ["completed", "cancelled", "PENDING", "invalid_status", "completed"],
"order_date": pd.to_datetime(["2024-09-01", "2024-09-02", "2024-09-03",
"2099-01-01", "2024-09-05"]), # Future date
"email": ["a@b.com", "not-email", "c@d.com", "e@f.com", "g@h.com"]
})
VALID_STATUSES = {"completed", "cancelled", "pending", "shipped", "processing"}
EMAIL_PATTERN = r'^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$'
TODAY = pd.Timestamp.now()
rules_order_id = [
("not_null", lambda s: s.notna()),
("unique", lambda s: ~s.duplicated(keep=False)),
("format_ORD", lambda s: s.str.startswith("ORD_").fillna(False))
]
rules_amount = [
("not_null", lambda s: s.notna()),
("positive", lambda s: s > 0),
("under_100k", lambda s: s < 100_000),
]
rules_status = [
("not_null", lambda s: s.notna()),
("canonical_value", lambda s: s.str.lower().isin(VALID_STATUSES).fillna(False)),
]
rules_order_date = [
("not_null", lambda s: s.notna()),
("not_future", lambda s: s <= TODAY),
("not_before_2020", lambda s: s >= pd.Timestamp("2020-01-01")),
]
# Run all validations and combine into a single report
all_validations = pd.concat([
validate_column(orders["order_id"], "order_id", rules_order_id),
validate_column(orders["amount"], "amount", rules_amount),
validate_column(orders["status"], "status", rules_status),
validate_column(orders["order_date"], "order_date", rules_order_date),
])
print(all_validations.to_string(index=False))6. Uniqueness
Uniqueness measures whether records appear exactly once when they should. Duplicate records are one of the most common and damaging data quality problems.
import pandas as pd
def detect_duplicates(
df: pd.DataFrame,
subset: list = None,
keep: str = "first"
) -> dict:
"""
Detect and characterize duplicate records in a DataFrame.
Parameters
----------
df : pd.DataFrame
DataFrame to check.
subset : list, optional
Columns to consider for duplicate detection.
If None, all columns are used.
keep : str
Which duplicate to mark as the "original":
'first', 'last', or False (mark all as duplicates).
Returns
-------
dict
Duplicate statistics and a DataFrame of duplicate records.
"""
if subset is None:
subset = list(df.columns)
n_total = len(df)
is_dup = df.duplicated(subset=subset, keep=keep)
n_dupes = is_dup.sum()
n_unique = df[subset].drop_duplicates().shape[0]
dupe_rate = n_dupes / n_total if n_total > 0 else 0.0
print(f"Uniqueness check on: {subset}")
print(f" Total rows: {n_total:,}")
print(f" Unique key sets: {n_unique:,}")
print(f" Duplicate rows: {n_dupes:,} ({dupe_rate*100:.2f}%)")
if n_dupes > 0:
# Show the most-duplicated keys
dup_counts = (
df[subset]
.value_counts()
.reset_index()
.query("count > 1")
.sort_values("count", ascending=False)
.head(10)
)
print(f"\nMost duplicated keys (top 10):")
print(dup_counts.to_string(index=False))
return {
"n_total": n_total, "n_unique": n_unique,
"n_duplicates": n_dupes, "duplicate_rate": dupe_rate,
"duplicate_rows": df[is_dup].copy()
}
# Detect duplicates in an orders table
orders_with_dupes = pd.DataFrame({
"order_id": ["ORD_001", "ORD_001", "ORD_002", "ORD_003", "ORD_003", "ORD_003"],
"customer_id": ["C001", "C001", "C002", "C003", "C003", "C003"],
"amount": [149.99, 149.99, 89.99, 29.99, 29.99, 29.99],
"status": ["completed","completed","shipped","pending","pending","completed"]
})
# Exact duplicates (all columns match)
exact_result = detect_duplicates(orders_with_dupes)
# Key duplicates (order_id should be unique)
key_result = detect_duplicates(orders_with_dupes, subset=["order_id"])
# ORD_001 appears twice (exact dupe), ORD_003 appears 3 times (status changed)Building a Comprehensive Data Quality Report
Now let’s build a unified quality report that measures all six dimensions:
import pandas as pd
import numpy as np
from datetime import datetime, timezone
def data_quality_report(
df: pd.DataFrame,
dataset_name: str,
primary_key: list = None,
timestamp_col: str = None,
validation_rules: dict = None,
categorical_cols: dict = None
) -> dict:
"""
Generate a comprehensive data quality report across all six dimensions.
Parameters
----------
df : pd.DataFrame
Dataset to assess.
dataset_name : str
Human-readable name for reporting.
primary_key : list, optional
Columns that should be unique (for uniqueness check).
timestamp_col : str, optional
Timestamp column for timeliness check.
validation_rules : dict, optional
{column_name: [(rule_name, fn), ...]} for validity checks.
categorical_cols : dict, optional
{column_name: [list_of_valid_values]} for consistency checks.
Returns
-------
dict
Quality scores across all dimensions.
"""
n = len(df)
report = {
"dataset": dataset_name,
"rows": n,
"columns": len(df.columns),
"assessed_at": datetime.now(timezone.utc).isoformat(),
"dimensions": {}
}
print(f"\n{'='*60}")
print(f"DATA QUALITY REPORT: {dataset_name}")
print(f"Rows: {n:,} | Columns: {len(df.columns)}")
print(f"{'='*60}")
# ── 1. COMPLETENESS ────────────────────────────────────────────
null_rates = df.isnull().mean()
completeness_score = 1 - null_rates.mean()
critical_nulls = null_rates[null_rates > 0.20].to_dict()
print(f"\n[1] COMPLETENESS: {completeness_score:.3f}")
for col, rate in null_rates[null_rates > 0].items():
icon = "✗" if rate > 0.20 else "⚠" if rate > 0.05 else "ℹ"
print(f" {icon} {col}: {rate*100:.1f}% null")
report["dimensions"]["completeness"] = {
"score": round(completeness_score, 4),
"critical_cols": critical_nulls
}
# ── 2. UNIQUENESS ──────────────────────────────────────────────
if primary_key:
n_unique = df[primary_key].drop_duplicates().shape[0]
uniqueness_score = n_unique / n if n > 0 else 1.0
n_dupes = n - n_unique
print(f"\n[2] UNIQUENESS ({primary_key}): {uniqueness_score:.3f}")
if n_dupes > 0:
print(f" ✗ {n_dupes:,} duplicate key values found")
else:
print(f" ✓ All keys unique")
else:
exact_dupes = df.duplicated().sum()
uniqueness_score = 1 - (exact_dupes / n) if n > 0 else 1.0
print(f"\n[2] UNIQUENESS (exact rows): {uniqueness_score:.3f}")
if exact_dupes > 0:
print(f" ✗ {exact_dupes:,} exact duplicate rows")
report["dimensions"]["uniqueness"] = {"score": round(uniqueness_score, 4)}
# ── 3. VALIDITY ────────────────────────────────────────────────
if validation_rules:
validity_scores = []
print(f"\n[3] VALIDITY:")
for col, rules in validation_rules.items():
if col not in df.columns:
continue
for rule_name, rule_fn in rules:
try:
is_valid = rule_fn(df[col])
pass_rate = is_valid.sum() / n
validity_scores.append(pass_rate)
icon = "✓" if pass_rate >= 0.99 else "⚠" if pass_rate >= 0.95 else "✗"
print(f" {icon} {col}.{rule_name}: {pass_rate*100:.1f}% valid")
except Exception as e:
print(f" ✗ {col}.{rule_name}: ERROR ({e})")
validity_score = np.mean(validity_scores) if validity_scores else 1.0
else:
validity_score = None
print(f"\n[3] VALIDITY: No rules defined")
report["dimensions"]["validity"] = {
"score": round(validity_score, 4) if validity_score else None
}
# ── 4. CONSISTENCY ─────────────────────────────────────────────
if categorical_cols:
consistency_scores = []
print(f"\n[4] CONSISTENCY:")
for col, valid_values in categorical_cols.items():
if col not in df.columns:
continue
non_null = df[col].dropna()
is_valid = non_null.isin(valid_values)
rate = is_valid.mean()
consistency_scores.append(rate)
icon = "✓" if rate >= 0.99 else "⚠" if rate >= 0.95 else "✗"
invalid_vals = non_null[~is_valid].unique()[:5]
print(f" {icon} {col}: {rate*100:.1f}% canonical "
f"({len(invalid_vals)} non-standard values: {list(invalid_vals)})")
consistency_score = np.mean(consistency_scores) if consistency_scores else 1.0
else:
consistency_score = None
print(f"\n[4] CONSISTENCY: No canonical value sets defined")
report["dimensions"]["consistency"] = {
"score": round(consistency_score, 4) if consistency_score else None
}
# ── 5. TIMELINESS ──────────────────────────────────────────────
if timestamp_col and timestamp_col in df.columns:
ts_col = pd.to_datetime(df[timestamp_col], errors="coerce")
now = pd.Timestamp.now(tz="UTC")
if ts_col.dt.tz is None:
ts_col = ts_col.dt.tz_localize("UTC")
max_age_hours = (now - ts_col.max()).total_seconds() / 3600
median_age_hours = (now - ts_col.median()).total_seconds() / 3600
timeliness_score = max(0, 1 - (max_age_hours / (24 * 30))) # Normalized to 30 days
print(f"\n[5] TIMELINESS ({timestamp_col}):")
print(f" Newest record: {max_age_hours:.1f}h ago")
print(f" Median record age: {median_age_hours:.1f}h")
else:
timeliness_score = None
print(f"\n[5] TIMELINESS: No timestamp column specified")
report["dimensions"]["timeliness"] = {
"score": round(timeliness_score, 4) if timeliness_score else None
}
# ── 6. OVERALL SCORE ───────────────────────────────────────────
scores = [v["score"] for v in report["dimensions"].values()
if v.get("score") is not None]
overall = np.mean(scores) if scores else None
print(f"\n{'─'*40}")
print(f"OVERALL QUALITY SCORE: {overall:.3f}" if overall else "OVERALL: Partial assessment")
interpretation = (
"Excellent" if overall and overall >= 0.95 else
"Good" if overall and overall >= 0.90 else
"Fair" if overall and overall >= 0.80 else
"Poor"
)
print(f"Assessment: {interpretation}")
print(f"{'='*60}")
report["overall_score"] = round(overall, 4) if overall else None
report["interpretation"] = interpretation
return report
# Apply to our sample orders table
report = data_quality_report(
df=orders,
dataset_name="E-Commerce Orders",
primary_key=["order_id"],
timestamp_col="order_date",
validation_rules={
"amount": [("positive", lambda s: s > 0),
("under_100k", lambda s: s < 100_000)],
"status": [("canonical", lambda s: s.str.lower().isin(
{"completed","cancelled","pending","shipped","processing"}
).fillna(False))],
},
categorical_cols={
"status": ["completed", "cancelled", "pending", "shipped", "processing"]
}
)Common Data Quality Problems and How to Fix Them
Problem 1: Inconsistent Categorical Values
import pandas as pd
def standardize_categorical(series: pd.Series,
mapping: dict = None,
to_lower: bool = True,
strip: bool = True) -> pd.Series:
"""
Standardize categorical values to canonical forms.
Parameters
----------
series : pd.Series
Categorical column to standardize.
mapping : dict, optional
{raw_value: canonical_value} explicit mapping.
to_lower : bool
Lowercase before mapping lookup.
strip : bool
Strip whitespace before mapping lookup.
Returns
-------
pd.Series
Standardized series.
"""
result = series.copy()
if strip:
result = result.str.strip()
if to_lower:
result = result.str.lower()
if mapping:
result = result.map(mapping).fillna(result)
return result
# Standardize country codes
country_col = pd.Series([
"USA", "US", "United States", "us", " USA ",
"UK", "United Kingdom", "GB", "gb",
"Germany", "DE", "de", "DEU"
])
country_mapping = {
"us": "USA", "united states": "USA", "u.s.a.": "USA",
"uk": "GBR", "united kingdom": "GBR", "gb": "GBR",
"germany": "DEU", "de": "DEU", "deutschland": "DEU"
}
standardized = standardize_categorical(
country_col,
mapping=country_mapping,
to_lower=True,
strip=True
)
print(pd.DataFrame({"original": country_col, "standardized": standardized}))Problem 2: Duplicate Records
import pandas as pd
def deduplicate(
df: pd.DataFrame,
key_cols: list,
tie_breaker_col: str = None,
tie_breaker_ascending: bool = False,
log: bool = True
) -> pd.DataFrame:
"""
Remove duplicate records, keeping the most relevant version.
Parameters
----------
df : pd.DataFrame
DataFrame with potential duplicates.
key_cols : list
Columns that define uniqueness.
tie_breaker_col : str, optional
Column to sort by when choosing which duplicate to keep
(e.g., 'updated_at' to keep the most recent).
tie_breaker_ascending : bool
Sort ascending (False = keep highest/latest value).
log : bool
Whether to print deduplication statistics.
Returns
-------
pd.DataFrame
Deduplicated DataFrame.
"""
n_before = len(df)
if tie_breaker_col:
df = df.sort_values(tie_breaker_col, ascending=tie_breaker_ascending)
df_deduped = df.drop_duplicates(subset=key_cols, keep="first")
n_after = len(df_deduped)
n_removed = n_before - n_after
if log:
print(f"Deduplication on {key_cols}:")
print(f" Before: {n_before:,} rows")
print(f" After: {n_after:,} rows")
print(f" Removed:{n_removed:,} duplicates ({n_removed/n_before*100:.2f}%)")
return df_deduped.reset_index(drop=True)
# Keep most recent record per customer
customer_dupes = pd.DataFrame({
"customer_id": ["C001", "C001", "C002", "C003", "C003"],
"name": ["Jane Smith", "Jane Smith-Torres", "Bob", "Alice", "Alice W."],
"updated_at": pd.to_datetime(["2022-01-01", "2024-06-01",
"2023-03-15", "2022-07-01", "2023-11-20"])
})
clean_customers = deduplicate(
customer_dupes,
key_cols=["customer_id"],
tie_breaker_col="updated_at",
tie_breaker_ascending=False # Keep most recent
)Problem 3: Outliers and Implausible Values
import pandas as pd
import numpy as np
def detect_numeric_outliers(
series: pd.Series,
method: str = "iqr",
threshold: float = 3.0
) -> pd.Series:
"""
Detect outliers in a numeric column.
Parameters
----------
series : pd.Series
Numeric column to check.
method : str
'iqr' (interquartile range) or 'zscore' (standard deviation).
threshold : float
For IQR: multiplier (standard is 1.5 or 3.0).
For z-score: number of standard deviations.
Returns
-------
pd.Series
Boolean mask where True = outlier.
"""
series = series.dropna()
if method == "iqr":
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - threshold * IQR
upper = Q3 + threshold * IQR
is_outlier = (series < lower) | (series > upper)
elif method == "zscore":
z_scores = (series - series.mean()) / series.std()
is_outlier = z_scores.abs() > threshold
n_outliers = is_outlier.sum()
print(f"Outlier detection ({method}, threshold={threshold}):")
print(f" Outliers: {n_outliers:,} / {len(series):,} ({n_outliers/len(series)*100:.2f}%)")
if n_outliers > 0:
print(f" Range of outliers: [{series[is_outlier].min():.2f}, "
f"{series[is_outlier].max():.2f}]")
print(f" Normal range: [{series[~is_outlier].min():.2f}, "
f"{series[~is_outlier].max():.2f}]")
return is_outlier.reindex(series.index, fill_value=False)
# Detect outliers in order amounts
amounts = pd.Series([
150.0, 89.99, 200.0, 45.0, 9999.0, 175.0, 120.0,
-50.0, 300.0, 85.0, 210.0, 0.001, 195.0
])
outliers = detect_numeric_outliers(amounts, method="iqr", threshold=3.0)
print("\nOutlier values:")
print(amounts[outliers].tolist()) # [9999.0, -50.0, 0.001]Problem 4: Encoding Sentinel Values
A particularly insidious problem: values that look like data but actually encode “missing” or “unknown”:
import pandas as pd
import numpy as np
def replace_sentinel_values(
df: pd.DataFrame,
sentinels: dict = None
) -> pd.DataFrame:
"""
Replace known sentinel values with proper NaN.
Sentinel values are values that encode 'unknown' or 'missing'
but look like real data: 9999, -1, 0, 'N/A', 'NULL', '0000-01-01'.
Parameters
----------
df : pd.DataFrame
DataFrame containing sentinel values.
sentinels : dict, optional
{column_or_dtype: [list_of_sentinel_values]}.
If None, uses common defaults.
Returns
-------
pd.DataFrame
DataFrame with sentinels replaced by NaN/NaT.
"""
df = df.copy()
default_sentinels = {
"numeric": [9999, 9999.0, -1, -999, -9999, 999999],
"string": ["N/A", "n/a", "NA", "NULL", "null", "None", "none",
"UNKNOWN", "unknown", "?", "", "0"],
"datetime": ["1900-01-01", "1970-01-01", "0001-01-01", "9999-12-31"]
}
if sentinels:
default_sentinels.update(sentinels)
# Replace numeric sentinels
for col in df.select_dtypes(include=[np.number]).columns:
for s in default_sentinels["numeric"]:
df[col] = df[col].replace(s, np.nan)
# Replace string sentinels
for col in df.select_dtypes(include=["object"]).columns:
df[col] = df[col].replace(default_sentinels["string"], np.nan)
df[col] = df[col].replace("", np.nan)
# Replace datetime sentinels
for col in df.select_dtypes(include=["datetime64"]).columns:
for s in default_sentinels["datetime"]:
df[col] = df[col].replace(pd.Timestamp(s), pd.NaT)
return df
# Before sentinel replacement
raw_data = pd.DataFrame({
"customer_id": ["C001", "C002", "C003"],
"age": [34, 9999, -1], # 9999 and -1 are sentinels
"income": [85000, -999, 62000], # -999 is sentinel
"city": ["Austin", "NULL", "Chicago"], # "NULL" is sentinel
"score": [0.87, 0.0, 0.65] # 0.0 might be a real value or sentinel
})
print("Before:", raw_data.isnull().sum().to_dict())
clean_data = replace_sentinel_values(raw_data)
print("After: ", clean_data.isnull().sum().to_dict())Building a Data Quality Monitoring System
For production pipelines, quality checks must be automated and run continuously:
import pandas as pd
import json
from datetime import datetime, timezone
from pathlib import Path
QUALITY_LOG_PATH = "logs/data_quality_history.jsonl"
class DataQualityMonitor:
"""
Automated data quality monitoring with alerting and history tracking.
Runs a configurable set of checks on each pipeline run and
logs results for trend analysis. Raises exceptions or sends
alerts when quality drops below thresholds.
"""
def __init__(self, dataset_name: str, thresholds: dict = None):
self.dataset_name = dataset_name
self.thresholds = thresholds or {
"completeness": 0.95,
"uniqueness": 0.999,
"validity": 0.98,
"max_null_rate": 0.10
}
self.run_results = []
def check(self, df: pd.DataFrame,
primary_key: list = None) -> dict:
"""Run all quality checks and return results."""
results = {
"dataset": self.dataset_name,
"run_at": datetime.now(timezone.utc).isoformat(),
"n_rows": len(df),
"checks": {},
"alerts": []
}
# Completeness check
null_rates = df.isnull().mean()
completeness = 1 - null_rates.mean()
high_null_cols = null_rates[null_rates > self.thresholds["max_null_rate"]].to_dict()
results["checks"]["completeness"] = {
"score": round(completeness, 4),
"passed": completeness >= self.thresholds["completeness"],
"details": {k: round(v, 4) for k, v in high_null_cols.items()}
}
if not results["checks"]["completeness"]["passed"]:
results["alerts"].append(
f"Completeness {completeness:.3f} < threshold "
f"{self.thresholds['completeness']}"
)
# Uniqueness check
if primary_key:
n_unique = df[primary_key].drop_duplicates().shape[0]
uniqueness = n_unique / len(df) if len(df) > 0 else 1.0
n_dupes = len(df) - n_unique
results["checks"]["uniqueness"] = {
"score": round(uniqueness, 4),
"n_duplicates": n_dupes,
"passed": uniqueness >= self.thresholds["uniqueness"]
}
if not results["checks"]["uniqueness"]["passed"]:
results["alerts"].append(
f"Uniqueness violation: {n_dupes:,} duplicate "
f"keys on {primary_key}"
)
# Row count check (detect unexpected drops)
results["checks"]["row_count"] = {"n_rows": len(df)}
# Zero-row check
if len(df) == 0:
results["alerts"].append("CRITICAL: Dataset has zero rows!")
# Log results
self._log_results(results)
# Print summary
self._print_summary(results)
return results
def _log_results(self, results: dict):
"""Append results to the quality history log."""
Path(QUALITY_LOG_PATH).parent.mkdir(parents=True, exist_ok=True)
with open(QUALITY_LOG_PATH, "a") as f:
f.write(json.dumps(results) + "\n")
def _print_summary(self, results: dict):
"""Print a concise quality summary."""
status = "✓ ALL CHECKS PASSED" if not results["alerts"] else \
f"⚠ {len(results['alerts'])} ALERTS"
print(f"\nQuality Check [{self.dataset_name}]: {status}")
print(f" Rows: {results['n_rows']:,} | Run: {results['run_at'][:19]}")
for check, data in results["checks"].items():
if "score" in data:
icon = "✓" if data.get("passed", True) else "✗"
print(f" {icon} {check}: {data['score']:.4f}")
for alert in results["alerts"]:
print(f" ⚠ ALERT: {alert}")
def get_history(self) -> pd.DataFrame:
"""Load quality history as a DataFrame for trend analysis."""
if not Path(QUALITY_LOG_PATH).exists():
return pd.DataFrame()
records = []
with open(QUALITY_LOG_PATH) as f:
for line in f:
records.append(json.loads(line.strip()))
return pd.DataFrame(records)
def raise_on_alerts(self, results: dict):
"""Raise an exception if any quality alerts were triggered."""
if results["alerts"]:
raise ValueError(
f"Data quality checks failed for {self.dataset_name}:\n" +
"\n".join(f" • {a}" for a in results["alerts"])
)
# Usage in a pipeline
monitor = DataQualityMonitor(
dataset_name="customer_features",
thresholds={"completeness": 0.95, "uniqueness": 0.999, "max_null_rate": 0.10}
)
results = monitor.check(customers, primary_key=["customer_id"])
# Optionally: fail the pipeline on quality issues
# monitor.raise_on_alerts(results)The Cost of Poor Data Quality
Data quality problems have measurable business costs that help justify investment in fixing them:
| Problem Type | Business Impact | Example |
|---|---|---|
| Duplicate transactions | Revenue double-counting, wrong KPIs | 5% duplicate orders inflates revenue reports by 5% |
| Stale customer segments | Targeting wrong customers | Sending premium offers to churned customers |
| Invalid email addresses | Failed campaigns, low deliverability | 30% invalid emails = 30% wasted marketing spend |
| Inconsistent currency | Wrong financial reports | EUR/USD confusion distorts revenue by up to 10% |
| Missing key features | Biased or broken ML models | Models trained without 20% of customers perform poorly in production |
| Incorrect geography | Wrong market analysis | Attributing online sales to wrong region corrupts regional P&L |
| Sentinel values treated as real | Extreme outliers corrupt aggregates | Age=9999 inflates average customer age |
The 1-10-100 rule: it costs $1 to verify data quality at entry, $10 to clean it after the fact, and $100 to deal with the consequences of not catching it at all (wrong decisions, rework, reputation damage).
Summary
Data quality is not a one-time cleanup task — it is an ongoing discipline. Good data is complete, accurate, consistent, timely, valid, and unique. Poor data in any of these dimensions silently corrupts analyses, degrades model performance, and erodes trust in data-driven decisions.
The practical workflow is: measure first (quantify quality across all six dimensions before deciding what to fix), prioritize by impact (fix completeness and uniqueness issues first, as they are most likely to produce wrong answers), implement systematic checks (automate quality monitoring so problems surface immediately rather than days later), and make quality visible (report quality scores in dashboards so stakeholders know what confidence to place in the data).
The DataQualityMonitor pattern — running checks at pipeline boundaries, logging results, alerting on violations, and raising exceptions when quality drops below thresholds — is the production-ready approach to ensuring your data is fit for purpose before it reaches analysis or models.
Key Takeaways
- The six dimensions of data quality are completeness (values present), accuracy (values reflect reality), consistency (same facts represented the same way), timeliness (data is current enough), validity (values conform to business rules), and uniqueness (no unintended duplicates) — poor quality on any dimension can silently corrupt analyses
- Measure before fixing: always quantify quality across all dimensions first; the most common problems to prioritize are high null rates (completeness), duplicate primary keys (uniqueness), and sentinel values masquerading as real data (accuracy/validity)
- Validate data at pipeline boundaries — as it enters your system, as it flows between stages, and before it reaches downstream consumers; silent quality degradation discovered in production is far more costly than a failed pipeline check
- Sentinel values (9999 for unknown age, -1 for missing score, “NULL” strings, “1970-01-01” dates) are among the most insidious quality problems because they pass null checks but corrupt aggregations — always explicitly replace them with proper
NaN/NaT - Idempotent quality fixes — standardize categoricals, deduplicate with a tie-breaker, replace sentinels — should produce the same result when run multiple times and be documented alongside the data they clean
- The 1-10-100 rule: it costs 1× to prevent a data quality issue at the source, 10× to fix it in the pipeline, and 100× to deal with its consequences (wrong business decisions, model failures, reputation damage)
- Build a
DataQualityMonitorclass that runs checks automatically with each pipeline run, logs results to a history file for trend analysis, and raises exceptions when quality drops below configured thresholds — quality monitoring should be a first-class part of every data pipeline - Document quality findings alongside the data: maintain a data quality log that records when issues were discovered, what their root cause was, and how they were resolved — this institutional memory prevents the same problems from recurring








