Data Quality Engineering
Data quality failures are silent production incidents. Unlike application bugs that throw errors, bad data propagates silently through pipelines, corrupts dashboards, and trains wrong ML models — often discovered by a stakeholder, not a monitor. The engineering discipline is: define expectations explicitly, validate them automatically in CI/CD, alert on violations before users see them, and design pipelines that degrade gracefully rather than fail catastrophically.
Core Mental Model
Data quality has six measurable dimensions: accuracy, completeness, consistency, timeliness, uniqueness, and validity. Each requires different detection techniques. The key architectural principle: quality gates should be in-pipeline (fail fast) for critical dimensions, and monitoring/alerting for statistical anomalies. Never let bad data reach consumers silently — either block it (quarantine) or alert loudly. Data contracts formalize the agreement between producers and consumers so schema changes don't silently break downstream systems.
Data Quality Dimensions
Accuracy: Values represent reality. Example: age = 150 (impossible).
Detect: range checks, reference table validation, cross-system reconciliation.
Completeness: Required fields are populated. Example: order with no customer_id.
Detect: not_null tests, null rate monitoring, row count vs expected.
Consistency: Same fact expressed consistently across systems/tables.
Example: order total ≠ sum(line_items).
Detect: cross-table assertion tests, referential integrity checks.
Timeliness: Data is fresh enough for its use case.
Example: daily orders table not updated in 36 hours.
Detect: source freshness checks, max(updated_at) lag monitoring.
Uniqueness: No duplicate records. Example: duplicate order IDs.
Detect: unique tests on primary keys, COUNT vs COUNT DISTINCT.
Validity: Values conform to defined formats/rules.
Example: email missing @, status = "INVALID_VALUE".
Detect: regex validation, accepted_values tests, FK constraints.
Great Expectations
# Great Expectations: define expectations, run checkpoints, generate data docs
import great_expectations as gx
# Initialize context
context = gx.get_context()
# Define a datasource
datasource = context.sources.add_or_update_pandas(name="orders_datasource")
asset = datasource.add_dataframe_asset(name="orders")
batch_request = asset.build_batch_request(dataframe=orders_df)
# Create expectation suite
suite = context.add_or_update_expectation_suite("orders_quality_suite")
# Create validator and add expectations
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="orders_quality_suite"
)
# Completeness
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_not_be_null("created_at")
# Uniqueness
validator.expect_column_values_to_be_unique("order_id")
# Validity — format and range
validator.expect_column_values_to_be_of_type("amount", "float")
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=100000)
validator.expect_column_values_to_match_regex(
"email",
r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$"
)
# Accepted values
validator.expect_column_values_to_be_in_set(
"status",
{"new", "processing", "shipped", "delivered", "cancelled", "refunded"}
)
# Row count (freshness / completeness at table level)
validator.expect_table_row_count_to_be_between(
min_value=1000,
max_value=10_000_000
)
# Statistical distribution (anomaly detection)
validator.expect_column_mean_to_be_between(
"amount",
min_value=50.0,
max_value=500.0
)
validator.expect_column_stdev_to_be_between(
"amount",
min_value=10.0,
max_value=1000.0
)
# Column set completeness (no unexpected new columns, no missing columns)
validator.expect_table_columns_to_match_ordered_list(
["order_id", "customer_id", "amount", "status", "created_at"]
)
# Save expectations
validator.save_expectation_suite()
# Checkpoint: run suite against data + alert on failure
checkpoint = context.add_or_update_checkpoint(
name="orders_daily_checkpoint",
validations=[
{
"batch_request": batch_request,
"expectation_suite_name": "orders_quality_suite"
}
],
action_list=[
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"}
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"}
},
{
"name": "send_slack_notification",
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": "{{ slack_webhook }}",
"notify_on": "failure"
}
}
]
)
result = context.run_checkpoint("orders_daily_checkpoint")
if not result.success:
failed = [
v for v in result.list_validation_results()
if not v.success
]
raise DataQualityException(f"{len(failed)} expectations failed")
Anomaly Detection
import pandas as pd
import numpy as np
from scipy import stats
def detect_anomalies_zscore(series: pd.Series, threshold: float = 3.0) -> pd.Series:
"""Z-score method: flag values > N standard deviations from mean."""
z_scores = np.abs(stats.zscore(series.dropna()))
return pd.Series(z_scores > threshold, index=series.dropna().index).reindex(series.index, fill_value=False)
def detect_anomalies_iqr(series: pd.Series, multiplier: float = 1.5) -> pd.Series:
"""IQR method: flag values beyond Q1/Q3 ± multiplier*IQR."""
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - multiplier * IQR
upper = Q3 + multiplier * IQR
return (series < lower) | (series > upper)
# Time series anomaly detection with seasonality
def detect_time_series_anomalies(
ts: pd.Series,
window: int = 7,
seasonal_period: int = 7,
threshold_sigma: float = 3.0
) -> pd.DataFrame:
"""Detect anomalies accounting for weekly seasonality."""
df = pd.DataFrame({'value': ts})
# Rolling mean and std (trend component)
df['rolling_mean'] = ts.rolling(window, center=True).mean()
df['rolling_std'] = ts.rolling(window, center=True).std()
# Seasonal component: average by day of week
df['day_of_week'] = ts.index.dayofweek
df['seasonal_factor'] = df.groupby('day_of_week')['value'].transform('mean')
# Residual after trend + seasonal
df['residual'] = df['value'] - df['rolling_mean'] * (df['seasonal_factor'] / df['seasonal_factor'].mean())
# Anomaly if residual > N sigma
residual_std = df['residual'].std()
df['is_anomaly'] = df['residual'].abs() > threshold_sigma * residual_std
df['anomaly_score'] = df['residual'].abs() / residual_std
return df[df['is_anomaly']][['value', 'rolling_mean', 'anomaly_score']]
# Pipeline metric monitoring
def check_pipeline_metrics(
current: dict,
historical: list[dict],
metrics: list[str],
sigma_threshold: float = 3.0
) -> list[dict]:
"""Check if current pipeline run metrics are anomalous vs history."""
alerts = []
historical_df = pd.DataFrame(historical)
for metric in metrics:
if metric not in historical_df.columns:
continue
hist_values = historical_df[metric].dropna()
current_val = current.get(metric)
if current_val is None or len(hist_values) < 10:
continue
mean = hist_values.mean()
std = hist_values.std()
z_score = abs(current_val - mean) / (std + 1e-9)
if z_score > sigma_threshold:
alerts.append({
'metric': metric,
'current': current_val,
'expected_mean': round(mean, 2),
'z_score': round(z_score, 2),
'severity': 'critical' if z_score > 5 else 'warning'
})
return alerts
Schema Change Detection
from dataclasses import dataclass
from enum import Enum
class ChangeType(Enum):
COLUMN_ADDED = "column_added" # NON-BREAKING
COLUMN_REMOVED = "column_removed" # BREAKING
COLUMN_RENAMED = "column_renamed" # BREAKING
TYPE_WIDENED = "type_widened" # NON-BREAKING (int→bigint)
TYPE_NARROWED = "type_narrowed" # BREAKING (bigint→int)
NULLABLE_TO_REQUIRED = "nullable_to_required" # BREAKING
REQUIRED_TO_NULLABLE = "required_to_nullable" # NON-BREAKING
@dataclass
class SchemaChange:
change_type: ChangeType
column_name: str
old_value: str | None
new_value: str | None
is_breaking: bool
def detect_schema_changes(
old_schema: dict[str, dict],
new_schema: dict[str, dict]
) -> list[SchemaChange]:
"""Compare two schema dicts {col_name: {type, nullable, ...}}."""
changes = []
for col in set(old_schema.keys()) | set(new_schema.keys()):
if col not in new_schema:
changes.append(SchemaChange(
ChangeType.COLUMN_REMOVED, col,
old_schema[col]['type'], None, is_breaking=True
))
elif col not in old_schema:
changes.append(SchemaChange(
ChangeType.COLUMN_ADDED, col,
None, new_schema[col]['type'], is_breaking=False
))
else:
old, new = old_schema[col], new_schema[col]
if old['type'] != new['type']:
breaking = not is_type_widening(old['type'], new['type'])
changes.append(SchemaChange(
ChangeType.TYPE_NARROWED if breaking else ChangeType.TYPE_WIDENED,
col, old['type'], new['type'], is_breaking=breaking
))
if not old['nullable'] and new['nullable']:
changes.append(SchemaChange(
ChangeType.NULLABLE_TO_REQUIRED, col, None, None, is_breaking=True
))
return changes
def is_type_widening(old_type: str, new_type: str) -> bool:
widening_pairs = {
('int', 'bigint'), ('float', 'double'),
('varchar(100)', 'varchar(255)'), ('date', 'timestamp')
}
return (old_type, new_type) in widening_pairs
Data Contracts
# data-contract.yaml: consumer-driven contract between producer and consumer
# Producer: orders service
# Consumer: analytics team
dataContractSpecification: 0.9.3
id: urn:datacontract:orders:v1.2
info:
title: Orders Data Contract
version: 1.2.0
description: "Orders events published by the orders service"
owner: orders-team
contact:
email: [email protected]
servers:
production:
type: kafka
host: kafka.production.example.com:9092
topic: orders.v1
models:
OrderEvent:
description: "Emitted when an order is created or updated"
fields:
order_id:
type: string
required: true
unique: true
description: "UUID v4"
pattern: "^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$"
customer_id:
type: string
required: true
amount:
type: number
required: true
minimum: 0
maximum: 100000
description: "Order total in USD"
status:
type: string
required: true
enum: [new, processing, shipped, delivered, cancelled, refunded]
created_at:
type: timestamp
required: true
description: "ISO 8601 UTC"
metadata:
type: object
required: false
description: "Optional key-value metadata, added in v1.2"
quality:
type: SodaCL
specification: |
checks for OrderEvent:
- row_count > 0
- missing_count(order_id) = 0
- duplicate_count(order_id) = 0
- invalid_count(status) = 0:
valid values: [new, processing, shipped, delivered, cancelled, refunded]
- avg(amount) between 50 and 500
servicelevels:
freshness:
description: "Orders appear in stream within 5 seconds of creation"
threshold: 5s
completeness:
description: "Zero orders lost — 100% delivery guarantee"
threshold: "100%"
Pipeline Observability
from dataclasses import dataclass, field
from datetime import datetime
import time
@dataclass
class PipelineRun:
pipeline_name: str
run_id: str
started_at: datetime = field(default_factory=datetime.utcnow)
metrics: dict = field(default_factory=dict)
alerts: list = field(default_factory=list)
class PipelineObservability:
"""Track pipeline run metrics and alert on anomalies."""
def __init__(self, pipeline_name: str, metrics_store, alert_fn):
self.pipeline_name = pipeline_name
self.metrics_store = metrics_store
self.alert_fn = alert_fn
def __call__(self, pipeline_fn):
def wrapper(*args, **kwargs):
run = PipelineRun(self.pipeline_name, run_id=str(uuid.uuid4()))
start = time.time()
try:
result = pipeline_fn(*args, **kwargs)
run.metrics.update({
'status': 'success',
'duration_seconds': time.time() - start,
'rows_processed': getattr(result, 'row_count', None),
'rows_written': getattr(result, 'written_count', None),
'null_rate': getattr(result, 'null_rate', None),
})
return result
except Exception as e:
run.metrics.update({
'status': 'failed',
'error': str(e),
'duration_seconds': time.time() - start,
})
self.alert_fn(f"Pipeline {self.pipeline_name} FAILED: {e}", severity='critical')
raise
finally:
self._check_anomalies(run)
self.metrics_store.save(run)
return wrapper
def _check_anomalies(self, run: PipelineRun):
history = self.metrics_store.get_history(self.pipeline_name, days=30)
alerts = check_pipeline_metrics(
current=run.metrics,
historical=history,
metrics=['rows_processed', 'null_rate', 'duration_seconds']
)
for alert in alerts:
self.alert_fn(
f"Anomaly in {self.pipeline_name}.{alert['metric']}: "
f"current={alert['current']}, expected≈{alert['expected_mean']} "
f"(z={alert['z_score']})",
severity=alert['severity']
)
# Usage
@PipelineObservability("daily_orders_etl", metrics_store=db, alert_fn=pagerduty_alert)
def run_orders_etl(date: str) -> PipelineResult:
...
Quarantine Pattern
def process_with_quarantine(
records: list[dict],
validate_fn,
good_sink,
quarantine_sink
) -> tuple[int, int]:
"""Route invalid records to quarantine instead of failing the pipeline."""
good_records = []
quarantine_records = []
for record in records:
validation_result = validate_fn(record)
if validation_result.is_valid:
good_records.append(record)
else:
quarantine_records.append({
**record,
'_quarantine_reason': validation_result.errors,
'_quarantine_timestamp': datetime.utcnow().isoformat(),
'_quarantine_pipeline': 'orders_etl',
'_quarantine_version': '1.0'
})
good_sink.write(good_records)
quarantine_sink.write(quarantine_records)
# Alert if quarantine rate is high
quarantine_rate = len(quarantine_records) / len(records)
if quarantine_rate > 0.01: # > 1% quarantined
alert(f"High quarantine rate: {quarantine_rate:.1%} of records quarantined",
severity='warning' if quarantine_rate < 0.05 else 'critical')
return len(good_records), len(quarantine_records)
Anti-Patterns
❌ Silent data loss: pipeline continues on bad records without alerting
✅ Quarantine + alert on high quarantine rate
❌ Testing only in staging: data quality differs between environments
✅ Run quality checks in production with alerting, not just CI/CD
❌ Point-in-time tests only (test on full table, not incremental)
✅ Test each batch + monitor trends over time (anomaly detection)
❌ No schema registry: schema changes break consumers silently
✅ Confluent Schema Registry (Kafka), dbt column lineage, data contracts
❌ Treating all quality issues as blocking failures
✅ Use severity levels: ERROR (block pipeline), WARNING (alert + continue)
❌ Monitoring only row counts (misses structural issues)
✅ Monitor: row count, null rates, value distributions, column-level stats
❌ Data quality as afterthought (added after incidents)
✅ Define data contract before building the pipeline
Quick Reference
Six Dimensions:
Accuracy → range checks, cross-system reconciliation
Completeness → not_null, row count vs expected
Consistency → cross-table assertions, referential integrity
Timeliness → source freshness, max(updated_at) lag
Uniqueness → unique tests on PKs, COUNT vs COUNT DISTINCT
Validity → regex, accepted_values, FK relationships
Anomaly Detection:
Z-score: |value - mean| / std > 3 → anomaly
IQR: value < Q1 - 1.5*IQR or > Q3 + 1.5*IQR → anomaly
Seasonal: compare to same-day-of-week average
Schema Changes:
Non-breaking: add column, widen type, nullable→required
Breaking: remove column, narrow type, required→nullable
Alert Thresholds:
Row count: < 80% or > 150% of 7-day average
Null rate: > 5× historical baseline
Quarantine rate: > 1% warning, > 5% critical
Freshness lag: > 2× expected interval
Quarantine Flow:
Record → validate → valid: write to good_sink
→ invalid: write to quarantine_sink + alert if rate highSkill Information
- Source
- MoltbotDen
- Category
- Data & Analytics
- Repository
- View on GitHub
Related Skills
sql-expert
Write advanced SQL queries for analytics, reporting, and application databases. Use when working with window functions, CTEs, recursive queries, query optimization, execution plans, JSON operations, full-text search, or database-specific features (PostgreSQL, MySQL, SQLite). Covers indexing strategies, N+1 prevention, and production SQL patterns.
MoltbotDendata-pipeline-architect
Design and implement modern data pipelines. Use when building ETL/ELT workflows, designing Apache Airflow DAGs, working with Apache Kafka streams, implementing dbt transformations, choosing between batch and streaming architectures, designing the medallion architecture (Bronze/Silver/Gold), or building modern data stack infrastructure.
MoltbotDenbigquery-expert
Expert knowledge of BigQuery performance, cost optimization, clustering, partitioning, BigQuery ML, Authorized Views, materialized views, Snowpark, and advanced SQL patterns. Trigger phrases: when working with BigQuery, BigQuery cost optimization, BigQuery partitioning clustering,
MoltbotDendbt-expert
Expert knowledge of dbt model materialization, incremental strategies, testing, macros, snapshots, documentation, slim CI, and data modeling best practices. Trigger phrases: when working with dbt, dbt model materialization, dbt incremental models,
MoltbotDendynamodb-expert
Expert knowledge of DynamoDB single table design, access pattern driven modeling, GSI/LSI design, conditional writes, DynamoDB Streams, TTL, transactions, and cost optimization. Trigger phrases: when working with DynamoDB, single table design DynamoDB, DynamoDB GSI design,
MoltbotDen