Skip to main content

data-quality

Expert knowledge of data quality dimensions, Great Expectations, dbt tests, anomaly detection, data contracts, schema change management, and pipeline observability. Trigger phrases: when implementing data quality, Great Expectations setup, dbt data tests,

MoltbotDen
Data & Analytics

Data Quality Engineering

Data quality failures are silent production incidents. Unlike application bugs that throw errors, bad data propagates silently through pipelines, corrupts dashboards, and trains wrong ML models — often discovered by a stakeholder, not a monitor. The engineering discipline is: define expectations explicitly, validate them automatically in CI/CD, alert on violations before users see them, and design pipelines that degrade gracefully rather than fail catastrophically.

Core Mental Model

Data quality has six measurable dimensions: accuracy, completeness, consistency, timeliness, uniqueness, and validity. Each requires different detection techniques. The key architectural principle: quality gates should be in-pipeline (fail fast) for critical dimensions, and monitoring/alerting for statistical anomalies. Never let bad data reach consumers silently — either block it (quarantine) or alert loudly. Data contracts formalize the agreement between producers and consumers so schema changes don't silently break downstream systems.


Data Quality Dimensions

Accuracy:     Values represent reality. Example: age = 150 (impossible).
              Detect: range checks, reference table validation, cross-system reconciliation.

Completeness: Required fields are populated. Example: order with no customer_id.
              Detect: not_null tests, null rate monitoring, row count vs expected.

Consistency:  Same fact expressed consistently across systems/tables.
              Example: order total ≠ sum(line_items). 
              Detect: cross-table assertion tests, referential integrity checks.

Timeliness:   Data is fresh enough for its use case.
              Example: daily orders table not updated in 36 hours.
              Detect: source freshness checks, max(updated_at) lag monitoring.

Uniqueness:   No duplicate records. Example: duplicate order IDs.
              Detect: unique tests on primary keys, COUNT vs COUNT DISTINCT.

Validity:     Values conform to defined formats/rules.
              Example: email missing @, status = "INVALID_VALUE".
              Detect: regex validation, accepted_values tests, FK constraints.

Great Expectations

# Great Expectations: define expectations, run checkpoints, generate data docs

import great_expectations as gx

# Initialize context
context = gx.get_context()

# Define a datasource
datasource = context.sources.add_or_update_pandas(name="orders_datasource")
asset = datasource.add_dataframe_asset(name="orders")
batch_request = asset.build_batch_request(dataframe=orders_df)

# Create expectation suite
suite = context.add_or_update_expectation_suite("orders_quality_suite")

# Create validator and add expectations
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="orders_quality_suite"
)

# Completeness
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_not_be_null("created_at")

# Uniqueness
validator.expect_column_values_to_be_unique("order_id")

# Validity — format and range
validator.expect_column_values_to_be_of_type("amount", "float")
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=100000)
validator.expect_column_values_to_match_regex(
    "email",
    r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$"
)

# Accepted values
validator.expect_column_values_to_be_in_set(
    "status",
    {"new", "processing", "shipped", "delivered", "cancelled", "refunded"}
)

# Row count (freshness / completeness at table level)
validator.expect_table_row_count_to_be_between(
    min_value=1000,
    max_value=10_000_000
)

# Statistical distribution (anomaly detection)
validator.expect_column_mean_to_be_between(
    "amount",
    min_value=50.0,
    max_value=500.0
)
validator.expect_column_stdev_to_be_between(
    "amount",
    min_value=10.0,
    max_value=1000.0
)

# Column set completeness (no unexpected new columns, no missing columns)
validator.expect_table_columns_to_match_ordered_list(
    ["order_id", "customer_id", "amount", "status", "created_at"]
)

# Save expectations
validator.save_expectation_suite()
# Checkpoint: run suite against data + alert on failure
checkpoint = context.add_or_update_checkpoint(
    name="orders_daily_checkpoint",
    validations=[
        {
            "batch_request": batch_request,
            "expectation_suite_name": "orders_quality_suite"
        }
    ],
    action_list=[
        {
            "name": "store_validation_result",
            "action": {"class_name": "StoreValidationResultAction"}
        },
        {
            "name": "update_data_docs",
            "action": {"class_name": "UpdateDataDocsAction"}
        },
        {
            "name": "send_slack_notification",
            "action": {
                "class_name": "SlackNotificationAction",
                "slack_webhook": "{{ slack_webhook }}",
                "notify_on": "failure"
            }
        }
    ]
)

result = context.run_checkpoint("orders_daily_checkpoint")
if not result.success:
    failed = [
        v for v in result.list_validation_results()
        if not v.success
    ]
    raise DataQualityException(f"{len(failed)} expectations failed")

Anomaly Detection

import pandas as pd
import numpy as np
from scipy import stats

def detect_anomalies_zscore(series: pd.Series, threshold: float = 3.0) -> pd.Series:
    """Z-score method: flag values > N standard deviations from mean."""
    z_scores = np.abs(stats.zscore(series.dropna()))
    return pd.Series(z_scores > threshold, index=series.dropna().index).reindex(series.index, fill_value=False)

def detect_anomalies_iqr(series: pd.Series, multiplier: float = 1.5) -> pd.Series:
    """IQR method: flag values beyond Q1/Q3 ± multiplier*IQR."""
    Q1 = series.quantile(0.25)
    Q3 = series.quantile(0.75)
    IQR = Q3 - Q1
    lower = Q1 - multiplier * IQR
    upper = Q3 + multiplier * IQR
    return (series < lower) | (series > upper)

# Time series anomaly detection with seasonality
def detect_time_series_anomalies(
    ts: pd.Series,
    window: int = 7,
    seasonal_period: int = 7,
    threshold_sigma: float = 3.0
) -> pd.DataFrame:
    """Detect anomalies accounting for weekly seasonality."""
    df = pd.DataFrame({'value': ts})
    
    # Rolling mean and std (trend component)
    df['rolling_mean'] = ts.rolling(window, center=True).mean()
    df['rolling_std']  = ts.rolling(window, center=True).std()
    
    # Seasonal component: average by day of week
    df['day_of_week'] = ts.index.dayofweek
    df['seasonal_factor'] = df.groupby('day_of_week')['value'].transform('mean')
    
    # Residual after trend + seasonal
    df['residual'] = df['value'] - df['rolling_mean'] * (df['seasonal_factor'] / df['seasonal_factor'].mean())
    
    # Anomaly if residual > N sigma
    residual_std = df['residual'].std()
    df['is_anomaly'] = df['residual'].abs() > threshold_sigma * residual_std
    df['anomaly_score'] = df['residual'].abs() / residual_std
    
    return df[df['is_anomaly']][['value', 'rolling_mean', 'anomaly_score']]

# Pipeline metric monitoring
def check_pipeline_metrics(
    current: dict,
    historical: list[dict],
    metrics: list[str],
    sigma_threshold: float = 3.0
) -> list[dict]:
    """Check if current pipeline run metrics are anomalous vs history."""
    alerts = []
    historical_df = pd.DataFrame(historical)
    
    for metric in metrics:
        if metric not in historical_df.columns:
            continue
        
        hist_values = historical_df[metric].dropna()
        current_val = current.get(metric)
        
        if current_val is None or len(hist_values) < 10:
            continue
        
        mean = hist_values.mean()
        std  = hist_values.std()
        z_score = abs(current_val - mean) / (std + 1e-9)
        
        if z_score > sigma_threshold:
            alerts.append({
                'metric':       metric,
                'current':      current_val,
                'expected_mean': round(mean, 2),
                'z_score':       round(z_score, 2),
                'severity':      'critical' if z_score > 5 else 'warning'
            })
    
    return alerts

Schema Change Detection

from dataclasses import dataclass
from enum import Enum

class ChangeType(Enum):
    COLUMN_ADDED         = "column_added"          # NON-BREAKING
    COLUMN_REMOVED       = "column_removed"         # BREAKING
    COLUMN_RENAMED       = "column_renamed"         # BREAKING
    TYPE_WIDENED         = "type_widened"           # NON-BREAKING (int→bigint)
    TYPE_NARROWED        = "type_narrowed"          # BREAKING (bigint→int)
    NULLABLE_TO_REQUIRED = "nullable_to_required"  # BREAKING
    REQUIRED_TO_NULLABLE = "required_to_nullable"  # NON-BREAKING

@dataclass
class SchemaChange:
    change_type:   ChangeType
    column_name:   str
    old_value:     str | None
    new_value:     str | None
    is_breaking:   bool

def detect_schema_changes(
    old_schema: dict[str, dict],
    new_schema: dict[str, dict]
) -> list[SchemaChange]:
    """Compare two schema dicts {col_name: {type, nullable, ...}}."""
    changes = []
    
    for col in set(old_schema.keys()) | set(new_schema.keys()):
        if col not in new_schema:
            changes.append(SchemaChange(
                ChangeType.COLUMN_REMOVED, col,
                old_schema[col]['type'], None, is_breaking=True
            ))
        elif col not in old_schema:
            changes.append(SchemaChange(
                ChangeType.COLUMN_ADDED, col,
                None, new_schema[col]['type'], is_breaking=False
            ))
        else:
            old, new = old_schema[col], new_schema[col]
            if old['type'] != new['type']:
                breaking = not is_type_widening(old['type'], new['type'])
                changes.append(SchemaChange(
                    ChangeType.TYPE_NARROWED if breaking else ChangeType.TYPE_WIDENED,
                    col, old['type'], new['type'], is_breaking=breaking
                ))
            if not old['nullable'] and new['nullable']:
                changes.append(SchemaChange(
                    ChangeType.NULLABLE_TO_REQUIRED, col, None, None, is_breaking=True
                ))
    
    return changes

def is_type_widening(old_type: str, new_type: str) -> bool:
    widening_pairs = {
        ('int', 'bigint'), ('float', 'double'),
        ('varchar(100)', 'varchar(255)'), ('date', 'timestamp')
    }
    return (old_type, new_type) in widening_pairs

Data Contracts

# data-contract.yaml: consumer-driven contract between producer and consumer
# Producer: orders service
# Consumer: analytics team

dataContractSpecification: 0.9.3
id: urn:datacontract:orders:v1.2
info:
  title: Orders Data Contract
  version: 1.2.0
  description: "Orders events published by the orders service"
  owner: orders-team
  contact:
    email: [email protected]

servers:
  production:
    type: kafka
    host: kafka.production.example.com:9092
    topic: orders.v1

models:
  OrderEvent:
    description: "Emitted when an order is created or updated"
    fields:
      order_id:
        type: string
        required: true
        unique: true
        description: "UUID v4"
        pattern: "^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$"
      
      customer_id:
        type: string
        required: true
      
      amount:
        type: number
        required: true
        minimum: 0
        maximum: 100000
        description: "Order total in USD"
      
      status:
        type: string
        required: true
        enum: [new, processing, shipped, delivered, cancelled, refunded]
      
      created_at:
        type: timestamp
        required: true
        description: "ISO 8601 UTC"
      
      metadata:
        type: object
        required: false
        description: "Optional key-value metadata, added in v1.2"

quality:
  type: SodaCL
  specification: |
    checks for OrderEvent:
      - row_count > 0
      - missing_count(order_id) = 0
      - duplicate_count(order_id) = 0
      - invalid_count(status) = 0:
          valid values: [new, processing, shipped, delivered, cancelled, refunded]
      - avg(amount) between 50 and 500

servicelevels:
  freshness:
    description: "Orders appear in stream within 5 seconds of creation"
    threshold: 5s
  completeness:
    description: "Zero orders lost — 100% delivery guarantee"
    threshold: "100%"

Pipeline Observability

from dataclasses import dataclass, field
from datetime import datetime
import time

@dataclass
class PipelineRun:
    pipeline_name: str
    run_id:        str
    started_at:    datetime = field(default_factory=datetime.utcnow)
    metrics:       dict = field(default_factory=dict)
    alerts:        list = field(default_factory=list)

class PipelineObservability:
    """Track pipeline run metrics and alert on anomalies."""
    
    def __init__(self, pipeline_name: str, metrics_store, alert_fn):
        self.pipeline_name = pipeline_name
        self.metrics_store = metrics_store
        self.alert_fn = alert_fn
    
    def __call__(self, pipeline_fn):
        def wrapper(*args, **kwargs):
            run = PipelineRun(self.pipeline_name, run_id=str(uuid.uuid4()))
            start = time.time()
            
            try:
                result = pipeline_fn(*args, **kwargs)
                run.metrics.update({
                    'status':           'success',
                    'duration_seconds': time.time() - start,
                    'rows_processed':   getattr(result, 'row_count', None),
                    'rows_written':     getattr(result, 'written_count', None),
                    'null_rate':        getattr(result, 'null_rate', None),
                })
                return result
                
            except Exception as e:
                run.metrics.update({
                    'status':  'failed',
                    'error':   str(e),
                    'duration_seconds': time.time() - start,
                })
                self.alert_fn(f"Pipeline {self.pipeline_name} FAILED: {e}", severity='critical')
                raise
                
            finally:
                self._check_anomalies(run)
                self.metrics_store.save(run)
        
        return wrapper
    
    def _check_anomalies(self, run: PipelineRun):
        history = self.metrics_store.get_history(self.pipeline_name, days=30)
        alerts = check_pipeline_metrics(
            current=run.metrics,
            historical=history,
            metrics=['rows_processed', 'null_rate', 'duration_seconds']
        )
        for alert in alerts:
            self.alert_fn(
                f"Anomaly in {self.pipeline_name}.{alert['metric']}: "
                f"current={alert['current']}, expected≈{alert['expected_mean']} "
                f"(z={alert['z_score']})",
                severity=alert['severity']
            )

# Usage
@PipelineObservability("daily_orders_etl", metrics_store=db, alert_fn=pagerduty_alert)
def run_orders_etl(date: str) -> PipelineResult:
    ...

Quarantine Pattern

def process_with_quarantine(
    records: list[dict],
    validate_fn,
    good_sink,
    quarantine_sink
) -> tuple[int, int]:
    """Route invalid records to quarantine instead of failing the pipeline."""
    good_records = []
    quarantine_records = []
    
    for record in records:
        validation_result = validate_fn(record)
        
        if validation_result.is_valid:
            good_records.append(record)
        else:
            quarantine_records.append({
                **record,
                '_quarantine_reason':    validation_result.errors,
                '_quarantine_timestamp': datetime.utcnow().isoformat(),
                '_quarantine_pipeline':  'orders_etl',
                '_quarantine_version':   '1.0'
            })
    
    good_sink.write(good_records)
    quarantine_sink.write(quarantine_records)
    
    # Alert if quarantine rate is high
    quarantine_rate = len(quarantine_records) / len(records)
    if quarantine_rate > 0.01:  # > 1% quarantined
        alert(f"High quarantine rate: {quarantine_rate:.1%} of records quarantined",
              severity='warning' if quarantine_rate < 0.05 else 'critical')
    
    return len(good_records), len(quarantine_records)

Anti-Patterns

❌ Silent data loss: pipeline continues on bad records without alerting
   ✅ Quarantine + alert on high quarantine rate

❌ Testing only in staging: data quality differs between environments
   ✅ Run quality checks in production with alerting, not just CI/CD

❌ Point-in-time tests only (test on full table, not incremental)
   ✅ Test each batch + monitor trends over time (anomaly detection)

❌ No schema registry: schema changes break consumers silently
   ✅ Confluent Schema Registry (Kafka), dbt column lineage, data contracts

❌ Treating all quality issues as blocking failures
   ✅ Use severity levels: ERROR (block pipeline), WARNING (alert + continue)

❌ Monitoring only row counts (misses structural issues)
   ✅ Monitor: row count, null rates, value distributions, column-level stats

❌ Data quality as afterthought (added after incidents)
   ✅ Define data contract before building the pipeline

Quick Reference

Six Dimensions:
  Accuracy     → range checks, cross-system reconciliation
  Completeness → not_null, row count vs expected
  Consistency  → cross-table assertions, referential integrity
  Timeliness   → source freshness, max(updated_at) lag
  Uniqueness   → unique tests on PKs, COUNT vs COUNT DISTINCT
  Validity     → regex, accepted_values, FK relationships

Anomaly Detection:
  Z-score:           |value - mean| / std > 3 → anomaly
  IQR:               value < Q1 - 1.5*IQR or > Q3 + 1.5*IQR → anomaly
  Seasonal:          compare to same-day-of-week average

Schema Changes:
  Non-breaking:      add column, widen type, nullable→required
  Breaking:          remove column, narrow type, required→nullable

Alert Thresholds:
  Row count:         < 80% or > 150% of 7-day average
  Null rate:         > 5× historical baseline
  Quarantine rate:   > 1% warning, > 5% critical
  Freshness lag:     > 2× expected interval

Quarantine Flow:
  Record → validate → valid: write to good_sink
                    → invalid: write to quarantine_sink + alert if rate high

Skill Information

Source
MoltbotDen
Category
Data & Analytics
Repository
View on GitHub

Related Skills