Data & AnalyticsDocumented
data-pipeline-architect
Modern data pipeline architect. Medallion architecture, Apache Airflow DAGs, Kafka streaming, dbt transformations, Great Expectations data quality, and batch vs streaming decisions. The modern data stack done right.
Share:
Installation
npx clawhub@latest install data-pipeline-architectView the full skill documentation and source below.
Documentation
Data Pipeline Architecture
Modern Data Stack
Sources → Ingestion → Storage → Transformation → Serving → Consumption
Sources: PostgreSQL, APIs, S3, Kafka, Webhooks, SaaS apps
Ingestion: Fivetran, Airbyte, custom pipelines, Kafka Connect
Storage: Data Lake (S3/GCS) + Data Warehouse (Snowflake/BigQuery/Redshift)
Transform: dbt, Spark, Beam
Serving: BI (Looker/Tableau), APIs, ML models, reverse ETL
Medallion Architecture (Delta Lake / Lakehouse)
Bronze (Raw):
- Exact copy of source data
- Never modified, append-only
- Immutable audit trail
- Schema: source schema + _ingested_at, _source, _batch_id
Silver (Cleaned):
- Deduplicated, validated, standardized
- Type-cast, null-handled
- Business logic applied
- Schema: clean, normalized, well-typed
Gold (Business):
- Aggregated for specific use cases
- Metric definitions embedded
- Optimized for query patterns
- Schema: wide, denormalized for analytics
Apache Airflow DAGs
# Modern Airflow 2.x with TaskFlow API
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import timedelta
import pandas as pd
@dag(
schedule_interval="0 2 * * *", # Daily at 2 AM UTC
start_date=days_ago(1),
catchup=False,
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(hours=1),
"email_on_failure": True,
"email": ["[email protected]"],
},
tags=["bronze", "users"],
doc_md="""
## Daily User Data Ingestion
Extracts user data from PostgreSQL (source), applies bronze transformations,
and loads to S3 data lake. Part of the users medallion pipeline.
**Schedule**: Daily at 2 AM UTC
**SLA**: Must complete by 4 AM UTC
""",
)
def users_bronze_pipeline():
@task
def extract_from_postgres(logical_date=None) -> dict:
"""Extract incremental data from source database."""
hook = PostgresHook(postgres_conn_id="source_db")
# Incremental extraction using watermark
query = """
SELECT
id, email, name, status, created_at, updated_at
FROM users
WHERE updated_at >= %(watermark)s
AND updated_at < %(end_time)s
ORDER BY updated_at
"""
watermark = logical_date - timedelta(days=1)
end_time = logical_date
df = hook.get_pandas_df(
query,
parameters={"watermark": watermark, "end_time": end_time}
)
# Serialize for XCom (use S3 for large datasets)
return {
"row_count": len(df),
"data": df.to_json(orient="records"),
"watermark": watermark.isoformat(),
}
@task
def apply_bronze_transforms(raw_data: dict) -> dict:
"""Apply bronze layer transformations."""
import json
from datetime import datetime
df = pd.read_json(raw_data["data"])
# Bronze transformations: add metadata, validate schema
df["_ingested_at"] = datetime.utcnow().isoformat()
df["_source"] = "postgres_users"
df["_batch_id"] = raw_data["watermark"]
df["_raw_hash"] = df.apply(
lambda row: hash(tuple(row.values())), axis=1
)
# Validate required fields
null_counts = df[["id", "email"]].isnull().sum()
if null_counts.any():
raise ValueError(f"Null values in required fields: {null_counts.to_dict()}")
return {
"row_count": len(df),
"data": df.to_parquet(index=False).hex(), # Parquet bytes
"watermark": raw_data["watermark"],
}
@task
def load_to_s3(transformed_data: dict, logical_date=None) -> dict:
"""Write bronze data to S3 as partitioned Parquet."""
import io, struct
s3_hook = S3Hook(aws_conn_id="aws_default")
# Hive-style partitioning
date_str = logical_date.strftime("year=%Y/month=%m/day=%d")
s3_key = f"bronze/users/{date_str}/data.parquet"
parquet_bytes = bytes.fromhex(transformed_data["data"])
s3_hook.load_bytes(
bytes_data=parquet_bytes,
key=s3_key,
bucket_name="my-data-lake",
replace=True,
)
return {
"s3_path": f"s3://my-data-lake/{s3_key}",
"row_count": transformed_data["row_count"],
}
@task
def update_metadata_catalog(load_result: dict) -> None:
"""Register partition in Glue/Hive metastore."""
# Update Glue catalog partition for Athena queries
import boto3
glue = boto3.client("glue")
# glue.create_partition(...)
# DAG wiring
raw = extract_from_postgres()
transformed = apply_bronze_transforms(raw)
loaded = load_to_s3(transformed)
update_metadata_catalog(loaded)
users_bronze = users_bronze_pipeline()
Kafka Streaming Pipeline
from kafka import KafkaConsumer, KafkaProducer
from kafka.errors import KafkaError
import json
import logging
from dataclasses import dataclass
from typing import Iterator
logger = logging.getLogger(__name__)
@dataclass
class ProcessingResult:
topic: str
partition: int
offset: int
success: bool
error: str = None
class StreamProcessor:
"""Base class for Kafka stream processing."""
def __init__(
self,
brokers: list[str],
input_topic: str,
output_topic: str,
consumer_group: str,
dlq_topic: str = None, # Dead letter queue
):
self.consumer = KafkaConsumer(
input_topic,
bootstrap_servers=brokers,
group_id=consumer_group,
auto_offset_reset="earliest",
enable_auto_commit=False, # Manual commit for exactly-once
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
max_poll_interval_ms=300000, # 5 min for slow processing
session_timeout_ms=10000,
)
self.producer = KafkaProducer(
bootstrap_servers=brokers,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
acks="all", # Wait for all replicas
retries=3,
compression_type="snappy",
)
self.output_topic = output_topic
self.dlq_topic = dlq_topic or f"{input_topic}-dlq"
def transform(self, message: dict) -> dict:
"""Override this in subclass."""
raise NotImplementedError
def run(self, batch_size: int = 100):
"""Process messages in batches for efficiency."""
while True:
messages = self.consumer.poll(timeout_ms=1000, max_records=batch_size)
if not messages:
continue
for tp, records in messages.items():
for record in records:
try:
transformed = self.transform(record.value)
self.producer.send(
self.output_topic,
value=transformed,
key=str(record.key).encode() if record.key else None,
)
except Exception as e:
logger.error(
f"Failed to process message",
extra={
"topic": tp.topic,
"partition": tp.partition,
"offset": record.offset,
"error": str(e),
}
)
# Send to dead letter queue
self.producer.send(
self.dlq_topic,
value={
"original": record.value,
"error": str(e),
"source_topic": tp.topic,
"source_offset": record.offset,
}
)
# Flush and commit after each batch
self.producer.flush()
self.consumer.commit()
# Example: User event enrichment processor
class UserEventEnricher(StreamProcessor):
def __init__(self, *args, db, **kwargs):
super().__init__(*args, **kwargs)
self.db = db
self._cache = {} # Simple in-process cache
def transform(self, event: dict) -> dict:
user_id = event["user_id"]
# Cache user data to avoid DB hit per event
if user_id not in self._cache:
user = self.db.get_user(user_id)
self._cache[user_id] = {
"email": user.email,
"segment": user.segment,
"plan": user.plan,
}
return {
**event,
"user": self._cache[user_id],
"_enriched_at": datetime.utcnow().isoformat(),
}
dbt Data Transformation
-- models/silver/users.sql
{{
config(
materialized='incremental',
unique_key='user_id',
incremental_strategy='merge',
cluster_by=['created_date'],
tags=['silver', 'daily'],
meta={
'owner': 'data-platform-team',
'description': 'Cleaned and deduplicated user records'
}
)
}}
WITH source AS (
SELECT * FROM {{ source('bronze', 'users') }}
{% if is_incremental() %}
-- Only process new/changed records
WHERE _ingested_at > (SELECT MAX(_ingested_at) FROM {{ this }})
{% endif %}
),
deduplicated AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY id
ORDER BY updated_at DESC, _ingested_at DESC
) AS rn
FROM source
WHERE id IS NOT NULL
AND email IS NOT NULL
),
cleaned AS (
SELECT
id AS user_id,
LOWER(TRIM(email)) AS email,
TRIM(name) AS full_name,
INITCAP(SPLIT_PART(TRIM(name), ' ', 1)) AS first_name,
INITCAP(SPLIT_PART(TRIM(name), ' ', -1)) AS last_name,
CASE
WHEN status IN ('active', 'enabled', '1') THEN 'active'
WHEN status IN ('inactive', 'disabled', '0') THEN 'inactive'
WHEN status = 'pending' THEN 'pending'
ELSE 'unknown'
END AS status,
created_at::TIMESTAMP AS created_at,
updated_at::TIMESTAMP AS updated_at,
DATE_TRUNC('day', created_at) AS created_date,
_ingested_at,
_source
FROM deduplicated
WHERE rn = 1
)
SELECT * FROM cleaned
# models/silver/schema.yml
version: 2
models:
- name: users
description: "Cleaned and deduplicated user records from all sources"
columns:
- name: user_id
description: "Unique user identifier"
tests:
- unique
- not_null
- name: email
tests:
- not_null
- unique
- dbt_utils.expression_is_true:
expression: "LOWER(email) = email" # Must be lowercase
- name: status
tests:
- not_null
- accepted_values:
values: ['active', 'inactive', 'pending', 'unknown']
- name: created_at
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "created_at <= CURRENT_TIMESTAMP"
# dbt_project.yml
name: my_analytics
version: '1.0.0'
vars:
start_date: '2023-01-01'
environment: "{{ env_var('DBT_TARGET', 'dev') }}"
models:
my_analytics:
bronze:
+materialized: table
+tags: ['bronze']
silver:
+materialized: incremental
+tags: ['silver']
gold:
+materialized: table
+tags: ['gold']
+grants:
select: ['ROLE_ANALYSTS', 'ROLE_BI_TOOLS']
Pipeline Monitoring and Quality
# Great Expectations for data quality
import great_expectations as gx
context = gx.get_context()
# Define expectations
suite = context.create_expectation_suite("users_bronze")
validator = context.get_validator(
datasource_name="s3_bronze",
data_connector_name="default_runtime_data_connector",
data_asset_name="users",
batch_identifiers={"default_identifier_name": "2024-01-15"},
expectation_suite=suite,
)
# Add expectations
validator.expect_column_values_to_not_be_null("id")
validator.expect_column_values_to_be_unique("id")
validator.expect_column_values_to_match_regex("email", r"^[^@]+@[^@]+\.[^@]+$")
validator.expect_column_values_to_be_in_set("status", ["active", "inactive", "pending"])
validator.expect_table_row_count_to_be_between(min_value=1000, max_value=None)
# Run validation
results = validator.validate()
if not results.success:
raise ValueError(f"Data quality check failed: {results}")
Batch vs Streaming Decision Matrix
| Factor | Batch | Streaming |
| Latency needed | Hours / Daily | Seconds / Minutes |
| Data volume | Any | High throughput |
| Complexity | Lower | Higher |
| Cost | Lower | Higher |
| Use cases | Reports, ML training, DWH loads | Fraud detection, personalization, monitoring |
| Tools | Airflow, Spark, dbt | Kafka, Flink, Spark Streaming |
| Error handling | Retry whole batch | DLQ, per-message retry |