Skip to main content

snowflake-expert

Expert knowledge of Snowflake architecture, virtual warehouses, query profiling, clustering, data sharing, zero-copy cloning, Time Travel, VARIANT type, Tasks/Streams, Snowpark, and RBAC. Trigger phrases: when working with Snowflake, Snowflake virtual warehouse sizing, Snowflake query profiling,

MoltbotDen
Data & Analytics

Snowflake Expert

Snowflake's architecture separates compute (virtual warehouses) from storage (S3/GCS/Azure Blob), which creates a different cost model than traditional data warehouses. Compute is billed per-second when the warehouse is active; storage is billed per-TB per-month. The expert move is to minimize warehouse uptime (auto-suspend aggressively) while maximizing query efficiency (partition pruning, clustering, result cache). Most Snowflake cost problems trace to: warehouses left running, queries scanning entire tables, and poorly designed VARIANT column queries.

Core Mental Model

A virtual warehouse is a cluster of compute nodes that cache remote storage locally. The first query after resume is slow (cold cache); subsequent queries benefit from the warm cache. Snowflake's three-tier caching: local disk cache (hot), result cache (exact query cache, free), and remote storage (always available). Every table is automatically micro-partitioned into ~16MB compressed files with min/max metadata — the query optimizer uses this for partition pruning without any DDL clustering. Explicit clustering keys add a secondary sort that improves pruning for high-cardinality columns that don't align with insertion order.


Virtual Warehouse Sizing and Configuration

Warehouse sizes (credit consumption per hour):
  X-Small:  1 credit/hr    (good for: dev, small transforms, < 1GB queries)
  Small:    2 credits/hr   (general ETL, medium queries)
  Medium:   4 credits/hr   (complex queries, 1-10GB scans)
  Large:    8 credits/hr   (large scans, parallel workloads, analytics queries)
  X-Large:  16 credits/hr  (very large joins, 10GB+ scans)
  2X-Large: 32 credits/hr  (data science, large ML training)

Sizing rules:
  Start small → profile → scale up if spilling to disk
  Prefer scale-up over scale-out for single complex queries
  Use multi-cluster for concurrency (many concurrent users, not complex queries)
  
Auto-suspend: ALWAYS set. Default should be 5-10 minutes for prod, 2-5 for dev.
Auto-resume: keep enabled — transparently resumes on query.
-- Create optimally configured virtual warehouse
CREATE WAREHOUSE analytics_wh
  WAREHOUSE_SIZE = 'MEDIUM'
  AUTO_SUSPEND  = 300          -- 5 minutes (300 seconds)
  AUTO_RESUME   = TRUE
  INITIALLY_SUSPENDED = TRUE   -- don't start until first query
  COMMENT = 'Analytics workloads — auto-suspends in 5m';

-- Multi-cluster warehouse for high concurrency
CREATE WAREHOUSE reporting_wh
  WAREHOUSE_SIZE = 'SMALL'
  AUTO_SUSPEND   = 120
  AUTO_RESUME    = TRUE
  MIN_CLUSTER_COUNT = 1
  MAX_CLUSTER_COUNT = 3
  SCALING_POLICY = 'ECONOMY';  -- or 'STANDARD' (faster scaling)

-- Check warehouse credit usage
SELECT
  warehouse_name,
  SUM(credits_used)                                  AS total_credits,
  SUM(credits_used) * 3.0                            AS estimated_cost_usd,
  COUNT(DISTINCT DATE_TRUNC('day', start_time))      AS active_days
FROM SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY
WHERE start_time >= DATEADD('day', -30, CURRENT_TIMESTAMP())
GROUP BY warehouse_name
ORDER BY total_credits DESC;

Query Profiling with Query Profile UI

-- Get query ID for a recent slow query
SELECT
  query_id,
  query_text,
  warehouse_name,
  execution_time / 1000.0    AS execution_seconds,
  bytes_scanned / POW(2, 30) AS gb_scanned,
  partitions_scanned,
  partitions_total,
  ROUND(100.0 * partitions_scanned / NULLIF(partitions_total, 0), 1)
                              AS pct_partitions_scanned,
  bytes_spilled_to_local_storage / POW(2, 30) AS gb_spilled_local,
  bytes_spilled_to_remote_storage / POW(2, 30) AS gb_spilled_remote
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE start_time >= DATEADD('hour', -24, CURRENT_TIMESTAMP())
  AND execution_status = 'SUCCESS'
ORDER BY execution_time DESC
LIMIT 50;
Query Profile UI — what to look for:

Compilation time > 50% of total:
  → Query is too complex (many joins, subqueries)
  → Break into CTEs or intermediate tables
  → Missing statistics (rare in Snowflake)

Execution time with large "TableScan" node:
  → Large partitions_scanned / partitions_total ratio
  → Add clustering key on filter column
  → Ensure filter columns are in WHERE clause

Spill to local/remote disk:
  → Not enough memory for operation (sort, join, aggregation)
  → Increase warehouse size (Scale Up)
  → Reduce data early (filter/project before joins)

"Join" node with large data volumes:
  → Consider query order (filter heavily before join)
  → Check if small table can be broadcast (Snowflake does this automatically)

Pruning ratio (partitions_scanned / partitions_total):
  < 10%: excellent (only reading small slice)
  10-50%: good
  50-100%: poor — clustering or filter issue

Clustering Keys

-- When Snowflake auto-clustering is insufficient:
-- Natural clustering comes from INSERT order (time-based tables are already clustered on time)
-- Only add explicit clustering when queries frequently filter on a non-time column

-- Check clustering depth (lower = better)
SELECT SYSTEM$CLUSTERING_DEPTH('my_database.my_schema.orders', '(customer_id, status)');
-- Depth 1-2: excellent
-- Depth 3-5: good
-- Depth > 5: poor — queries scan many micro-partitions

-- Add clustering key
ALTER TABLE orders CLUSTER BY (customer_id, status);
-- Background automatic reclustering runs continuously (costs credits)

-- For time-partitioned tables, add the date column first
ALTER TABLE events CLUSTER BY (event_date, user_id);
-- event_date first → queries filtering by date get best pruning
-- user_id second → helps when filtering by both date AND user

-- Monitor clustering information
SELECT
  table_name,
  clustering_key,
  average_overlaps,
  average_depth,
  total_constant_partition_count,
  last_clustering_time
FROM SNOWFLAKE.ACCOUNT_USAGE.TABLE_STORAGE_METRICS
WHERE average_depth > 3
ORDER BY average_depth DESC;

Zero-Copy Cloning

-- Zero-copy clone: instant "copy" that shares underlying storage
-- Storage only charged for changes made to the clone
-- Perfect for dev/test environments and branch-based development

-- Clone production database for testing
CREATE DATABASE dev_db CLONE prod_db;
-- Creates immediately, regardless of database size

-- Clone a schema
CREATE SCHEMA dev_db.analytics CLONE prod_db.analytics;

-- Clone a table for safe testing
CREATE TABLE orders_test CLONE orders;

-- Point-in-time clone (AT/BEFORE: clone as of a specific time or before a statement)
CREATE TABLE orders_backup CLONE orders
  AT (TIMESTAMP => '2024-01-15 10:00:00'::TIMESTAMP);

CREATE TABLE orders_before_migration CLONE orders
  BEFORE (STATEMENT => '<query_id>');  -- clone state before a specific DML

-- Zero-copy cloning in CI/CD:
-- 1. dev branch: CREATE DATABASE feature_branch_db CLONE prod_db
-- 2. Run migrations and tests on feature_branch_db
-- 3. PR merged: apply migrations to prod_db
-- 4. DROP DATABASE feature_branch_db

Time Travel and Fail-Safe

-- Time Travel: query historical data (default 1 day, configurable 0-90 days)
-- Each additional day of Time Travel = ~1.5x storage cost (versioned data)

-- Query table as it was at a point in time
SELECT * FROM orders AT (TIMESTAMP => '2024-01-15 09:00:00'::TIMESTAMP);

-- Query table before a statement was run
SELECT * FROM orders BEFORE (STATEMENT => '<query_id>');

-- Restore accidentally deleted data
-- 1. Find what was deleted
CREATE TABLE orders_restored AS
SELECT * FROM orders BEFORE (STATEMENT => '<delete_query_id>');

-- 2. Restore the original table
UNDROP TABLE orders;  -- or
CREATE TABLE orders CLONE orders AT (TIMESTAMP => 'before_deletion');

-- Configure per-table Time Travel retention
ALTER TABLE orders SET DATA_RETENTION_TIME_IN_DAYS = 7;  -- override account default

-- Check Time Travel storage cost
SELECT
  table_name,
  TIME_TRAVEL_BYTES / POW(2, 30)   AS time_travel_gb,
  FAILSAFE_BYTES / POW(2, 30)      AS failsafe_gb
FROM SNOWFLAKE.ACCOUNT_USAGE.TABLE_STORAGE_METRICS
ORDER BY time_travel_gb DESC
LIMIT 20;

-- Fail-safe: 7 additional days of non-queryable recovery (Snowflake internal recovery only)
-- Reduces storage cost for large tables: set TIME_TRAVEL = 0 for transient tables
CREATE TRANSIENT TABLE staging_load (  -- no fail-safe, 0 time travel by default
  id BIGINT,
  data VARIANT
);

VARIANT Type for Semi-Structured Data

-- VARIANT: Snowflake's equivalent of JSONB — stores JSON, Avro, ORC, Parquet
CREATE TABLE events (
  event_id   VARCHAR    PRIMARY KEY,
  event_type VARCHAR,
  payload    VARIANT,   -- semi-structured column
  created_at TIMESTAMP
);

-- Insert JSON
INSERT INTO events VALUES (
  'evt_001',
  'order_placed',
  PARSE_JSON('{"order_id":"ord_123","customer_id":"cust_456","items":[{"sku":"ABC","qty":2}]}'),
  CURRENT_TIMESTAMP()
);

-- Query VARIANT fields using dot notation and colon syntax
SELECT
  event_id,
  payload:order_id::VARCHAR     AS order_id,           -- colon = field access + cast
  payload:customer_id::VARCHAR  AS customer_id,
  payload:items[0]:sku::VARCHAR AS first_item_sku,     -- array indexing
  payload:metadata.region::VARCHAR AS region           -- nested dot notation
FROM events
WHERE event_type = 'order_placed'
  AND payload:order_id::VARCHAR = 'ord_123';

-- FLATTEN: explode array/object into rows
SELECT
  e.event_id,
  f.value:sku::VARCHAR   AS sku,
  f.value:qty::INTEGER   AS quantity,
  f.index                AS line_number
FROM events e,
  LATERAL FLATTEN(INPUT => e.payload:items) f   -- LATERAL JOIN + FLATTEN
WHERE e.event_type = 'order_placed';

-- Extract nested object keys dynamically
SELECT
  payload,
  GET_PATH(payload, 'metadata.region')  AS region,  -- programmatic key access
  OBJECT_KEYS(payload)                  AS top_level_keys
FROM events;

-- Build VARIANT from columns
SELECT OBJECT_CONSTRUCT(
  'user_id',    user_id,
  'email',      email,
  'created_at', created_at
) AS user_json
FROM users;

Tasks and Streams for CDC

-- Stream: captures DML changes (INSERT, UPDATE, DELETE) on a table
CREATE STREAM orders_stream ON TABLE orders
  APPEND_ONLY = FALSE;   -- captures inserts + updates + deletes
  -- APPEND_ONLY = TRUE captures only inserts (more efficient for immutable tables)

-- Query the stream
SELECT
  *,
  METADATA$ACTION,     -- INSERT or DELETE (updates = DELETE old + INSERT new)
  METADATA$ISUPDATE,   -- TRUE if this row is part of an update
  METADATA$ROW_ID      -- unique row identifier
FROM orders_stream;

-- Task: scheduled SQL execution (like cron for Snowflake)
CREATE TASK process_orders_cdc
  WAREHOUSE = 'TRANSFORM_WH'
  SCHEDULE  = '5 MINUTE'                         -- run every 5 minutes
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')   -- only run if stream has data
AS
  INSERT INTO orders_audit
  SELECT
    order_id,
    METADATA$ACTION    AS action,
    METADATA$ISUPDATE  AS is_update,
    CURRENT_TIMESTAMP() AS processed_at,
    *
  FROM orders_stream;

-- Task chains (DAG of tasks)
CREATE TASK transform_step_2
  WAREHOUSE = 'TRANSFORM_WH'
  AFTER     = transform_step_1    -- runs after step_1 succeeds
AS
  MERGE INTO orders_summary ...;

-- Enable task
ALTER TASK process_orders_cdc RESUME;
ALTER TASK process_orders_cdc SUSPEND;

-- Monitor task history
SELECT *
FROM TABLE(SNOWFLAKE.INFORMATION_SCHEMA.TASK_HISTORY(
  SCHEDULED_TIME_RANGE_START => DATEADD('hour', -1, CURRENT_TIMESTAMP()),
  RESULT_LIMIT => 100
))
ORDER BY SCHEDULED_TIME DESC;

Snowpark

# Snowpark: run Python/Java/Scala code in Snowflake compute
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import *

# Connect
session = Session.builder.configs({
    "account":   "myaccount",
    "user":      "myuser",
    "password":  "mypassword",
    "role":      "ANALYST",
    "warehouse": "ANALYTICS_WH",
    "database":  "ANALYTICS",
    "schema":    "PUBLIC"
}).create()

# DataFrame API (lazy evaluation, runs in Snowflake)
df = session.table("orders")

result = (
    df.filter(F.col("status") == "completed")
      .filter(F.col("created_at") >= "2024-01-01")
      .with_column("revenue_tier",
                   F.when(F.col("amount") > 1000, "high")
                    .when(F.col("amount") > 100, "medium")
                    .otherwise("low"))
      .group_by("region", "revenue_tier")
      .agg(
          F.sum("amount").alias("total_revenue"),
          F.count("*").alias("order_count")
      )
      .sort(F.col("total_revenue").desc())
)

# Show plan without executing
result.explain()

# Execute and fetch (runs in Snowflake warehouse)
pandas_df = result.to_pandas()

# Vectorized UDF (UDTF for table function)
@F.udf(return_type=FloatType(), input_types=[FloatType(), FloatType()])
def weighted_avg(value, weight):
    return value * weight

session.call("system$wait", 0)  # force Snowpark session refresh

# Stored procedure: Python running in Snowflake
def my_stored_proc(session: Session, date: str) -> str:
    df = session.sql(f"""
        SELECT COUNT(*) AS cnt FROM orders
        WHERE DATE(created_at) = '{date}'
    """).collect()
    return f"Processed {df[0]['CNT']} orders for {date}"

session.sproc.register(my_stored_proc, name="count_daily_orders", replace=True)

RBAC Design

-- Snowflake role hierarchy
-- ACCOUNTADMIN → SYSADMIN → custom roles

-- Create role hierarchy
CREATE ROLE analyst;
CREATE ROLE senior_analyst;
CREATE ROLE data_engineer;
CREATE ROLE data_scientist;

GRANT ROLE analyst TO ROLE senior_analyst;  -- inheritance
GRANT ROLE senior_analyst TO ROLE data_scientist;

-- Object privileges
GRANT USAGE ON DATABASE analytics TO ROLE analyst;
GRANT USAGE ON SCHEMA analytics.marts TO ROLE analyst;
GRANT SELECT ON ALL TABLES IN SCHEMA analytics.marts TO ROLE analyst;
GRANT SELECT ON FUTURE TABLES IN SCHEMA analytics.marts TO ROLE analyst;

GRANT CREATE TABLE ON SCHEMA analytics.staging TO ROLE data_engineer;
GRANT OPERATE ON WAREHOUSE transform_wh TO ROLE data_engineer;

-- Row-level and column-level security
-- Column masking policy
CREATE MASKING POLICY email_mask AS (val STRING) RETURNS STRING ->
  CASE
    WHEN CURRENT_ROLE() IN ('ANALYST', 'DATA_ENGINEER')
    THEN REGEXP_REPLACE(val, '.+@', '***@')  -- mask local part
    ELSE val
  END;

ALTER TABLE customers MODIFY COLUMN email
  SET MASKING POLICY email_mask;

-- Row access policy
CREATE ROW ACCESS POLICY region_filter AS (region STRING) RETURNS BOOLEAN ->
  region = CURRENT_ROLE_BASED_SETTING('user_region')
  OR CURRENT_ROLE() = 'SYSADMIN';

ALTER TABLE customers ADD ROW ACCESS POLICY region_filter ON (region);

Anti-Patterns

-- ❌ Leaving warehouses running when not in use
-- (auto-suspend solves this — set to 5 minutes for production)

-- ❌ Using X-Large for simple aggregations (huge credit burn)
USE WAREHOUSE extra_large_wh;
SELECT COUNT(*) FROM small_table;  -- overkill
-- ✅ Size warehouse for typical workload, not peak

-- ❌ SELECT * on VARIANT column (reads all nested fields)
SELECT * FROM events;
-- ✅ Select specific VARIANT paths
SELECT payload:order_id::VARCHAR, payload:amount::FLOAT FROM events;

-- ❌ FLATTEN without filter (cartesian explosion)
SELECT e.*, f.value FROM events e, LATERAL FLATTEN(INPUT => e.payload:items) f;
-- without WHERE clause = every row × array length

-- ❌ Treating Snowflake like OLTP (point lookups, small frequent writes)
-- Snowflake is optimized for analytical queries on large tables
-- Use RDS/Aurora for transactional workloads

-- ❌ Too many Time Travel days on staging/transient tables (storage cost)
-- ✅ Use TRANSIENT tables for staging (no fail-safe)
CREATE TRANSIENT TABLE staging_orders (...);

-- ❌ Clustering all tables (wastes auto-clustering credits)
-- ✅ Only cluster when: query profile shows poor partition pruning AND table > 500GB

Quick Reference

Warehouse Sizing:
  Dev/small queries      → X-Small or Small
  Analytics queries      → Medium or Large
  Complex joins, ML      → X-Large or 2X-Large
  High concurrency       → Multi-cluster (not larger size)
  Always: AUTO_SUSPEND=300 (5 min)

Cost Levers:
  1. Auto-suspend warehouses (biggest lever)
  2. Clustering → partition pruning (fewer bytes scanned)
  3. TRANSIENT tables for staging (no fail-safe storage)
  4. Result cache (free) → identical queries within 24h

Query Profile Red Flags:
  Compilation > 50%      → simplify query, use CTEs
  Spill > 0              → scale up warehouse
  Partitions > 50%       → add clustering key
  Large JOIN node        → filter before join

VARIANT Syntax:
  payload:field          → access field
  payload:field::TYPE    → cast to Snowflake type
  payload:arr[0]         → array indexing
  FLATTEN(payload:arr)   → array to rows
  GET_PATH(payload, 'a.b.c') → programmatic path

Time Travel:
  Accidental delete      → UNDROP TABLE or CLONE BEFORE
  Dev/test data          → CLONE ... AT (TIMESTAMP)
  Storage management     → set TRANSIENT for staging tables

Snowpark:
  Python DataFrame API   → lazy, runs in Snowflake warehouse
  UDF registration       → vectorized (Python UDF) or Snowflake native
  Stored procedures      → Python code as SQL-callable

Skill Information

Source
MoltbotDen
Category
Data & Analytics
Repository
View on GitHub

Related Skills