DOC: spectrum-i
STATUS: ● PUBLISHED
SYSTEM SPECTRUM

An Idempotent ELT Pipeline With SKIP LOCKED and Phase Commits

Three committed phases that make re-runs produce exactly the right result.

Cover image — An Idempotent ELT Pipeline With SKIP LOCKED and Phase Commits

The pipeline that moves events from raw_events into fact_events runs every five minutes. It needs to be safe to run concurrently (two instances can’t process the same event) and safe to re-run (a crash mid-batch leaves no corrupted state). This is build log 03: the three-phase design that gives both properties.

// 01 — THE PHASES

The pipeline is explicitly split into three committed phases, each on its own connection that commits before the next phase begins:

Phase 1  ─  Partition provisioning   (DDL, committed alone)
Phase 2  ─  Event processing         (claim → extract → upsert dims → insert fact)
Phase 3  ─  Analytics refresh        (REFRESH MATERIALIZED VIEW CONCURRENTLY × 4)

Each phase is independent. Phase 2 can only see partitions Phase 1 committed. Phase 3 can only see events Phase 2 committed. Nothing crosses a phase boundary inside an uncommitted transaction.

// 02 — PHASE 1: PARTITION PROVISIONING

async with pool.acquire() as conn:
    async with conn.transaction():
        months = await get_pending_event_months(conn)
        for month in months:
            await conn.execute(
                "SELECT warehouse.create_monthly_partition($1)", month
            )
    # committed — partitions now visible to all connections

DDL (CREATE TABLE) is only visible to other connections after commit. Processing events in the same transaction as partition creation would mean the insert connections can’t see the new partitions. Phase 1 commits alone to solve this.

// 03 — PHASE 2: EVENT PROCESSING

Events are claimed in batches of 100 using FOR UPDATE SKIP LOCKED:

async with pool.acquire() as conn:
    rows = await conn.fetch("""
        SELECT id, payload FROM raw_events
        WHERE status = 'pending'
        ORDER BY received_at
        LIMIT 100
        FOR UPDATE SKIP LOCKED
    """)
    for row in rows:
        await process_event(conn, row['id'], row['payload'])
    # each event marked 'done' inside process_event

SKIP LOCKED means two concurrent pipeline instances cannot claim the same event. One gets it locked, the other skips past it to the next available row. Crash mid-batch? The locks release, the events return to pending, and the next run picks them up cleanly.

process_event is idempotent: it upserts all dimensions and uses ON CONFLICT DO NOTHING for the fact row, keyed on the deterministic event_hash. Re-running the same event always produces one fact row, not two.

// 04 — PHASE 3: ANALYTICS REFRESH

views = [
    'analytics.mv_conversion_funnels',
    'analytics.mv_retention_cohorts',
    'analytics.mv_revenue_summary',
    'analytics.mv_experiment_results',
]
async with pool.acquire() as conn:
    for v in views:
        await conn.execute(f'REFRESH MATERIALIZED VIEW CONCURRENTLY {v}')
    await conn.execute("""
        INSERT INTO analytics.mv_refresh_log (mv_name, refreshed_at)
        VALUES (unnest($1::text[]), NOW())
        ON CONFLICT (mv_name) DO UPDATE SET refreshed_at = NOW()
    """, views)

CONCURRENTLY refreshes don’t block Grafana readers. The mv_refresh_log upsert replaces the NOW() freshness column that was previously inside each MV (which would have defeated the incremental diff; see the NOW() anomaly log).

// 05 — THE PROPERTIES THIS GIVES

NEXT

@frogwebp brand mark
ANTHONY PENA · @FROGWEBP
I build data systems and write about everything around them, the architecture, the failures, what each one teaches me. Documenting in public since 2021: the process, not just the result.

// NEWSLETTER — THE BUILD LOG SIGNAL

When I ship something or learn something worth keeping, it lands here first — build logs, concepts, and the honest process behind them. Come along; no spam, leave anytime.