The pipeline that moves events from raw_events into fact_events runs every five minutes. It needs to be safe to run concurrently (two instances can’t process the same event) and safe to re-run (a crash mid-batch leaves no corrupted state). This is build log 03: the three-phase design that gives both properties.
// 01 — THE PHASES
The pipeline is explicitly split into three committed phases, each on its own connection that commits before the next phase begins:
Phase 1 ─ Partition provisioning (DDL, committed alone)
Phase 2 ─ Event processing (claim → extract → upsert dims → insert fact)
Phase 3 ─ Analytics refresh (REFRESH MATERIALIZED VIEW CONCURRENTLY × 4)
Each phase is independent. Phase 2 can only see partitions Phase 1 committed. Phase 3 can only see events Phase 2 committed. Nothing crosses a phase boundary inside an uncommitted transaction.
// 02 — PHASE 1: PARTITION PROVISIONING
async with pool.acquire() as conn:
async with conn.transaction():
months = await get_pending_event_months(conn)
for month in months:
await conn.execute(
"SELECT warehouse.create_monthly_partition($1)", month
)
# committed — partitions now visible to all connections
DDL (CREATE TABLE) is only visible to other connections after commit. Processing events in the same transaction as partition creation would mean the insert connections can’t see the new partitions. Phase 1 commits alone to solve this.
// 03 — PHASE 2: EVENT PROCESSING
Events are claimed in batches of 100 using FOR UPDATE SKIP LOCKED:
async with pool.acquire() as conn:
rows = await conn.fetch("""
SELECT id, payload FROM raw_events
WHERE status = 'pending'
ORDER BY received_at
LIMIT 100
FOR UPDATE SKIP LOCKED
""")
for row in rows:
await process_event(conn, row['id'], row['payload'])
# each event marked 'done' inside process_event
SKIP LOCKED means two concurrent pipeline instances cannot claim the same event. One gets it locked, the other skips past it to the next available row. Crash mid-batch? The locks release, the events return to pending, and the next run picks them up cleanly.
process_event is idempotent: it upserts all dimensions and uses ON CONFLICT DO NOTHING for the fact row, keyed on the deterministic event_hash. Re-running the same event always produces one fact row, not two.
// 04 — PHASE 3: ANALYTICS REFRESH
views = [
'analytics.mv_conversion_funnels',
'analytics.mv_retention_cohorts',
'analytics.mv_revenue_summary',
'analytics.mv_experiment_results',
]
async with pool.acquire() as conn:
for v in views:
await conn.execute(f'REFRESH MATERIALIZED VIEW CONCURRENTLY {v}')
await conn.execute("""
INSERT INTO analytics.mv_refresh_log (mv_name, refreshed_at)
VALUES (unnest($1::text[]), NOW())
ON CONFLICT (mv_name) DO UPDATE SET refreshed_at = NOW()
""", views)
CONCURRENTLY refreshes don’t block Grafana readers. The mv_refresh_log upsert replaces the NOW() freshness column that was previously inside each MV (which would have defeated the incremental diff; see the NOW() anomaly log).
// 05 — THE PROPERTIES THIS GIVES
- Concurrent-safe:
SKIP LOCKEDpartitions work between instances with zero coordination beyond the database lock. - Crash-safe: uncommitted work releases locks and is retried. Committed work is marked
doneand skipped. - Idempotent: deterministic event hashes and upsert semantics mean re-running a batch of already-processed events is a no-op.
- Observable:
SELECT status, count(*) FROM raw_events GROUP BY 1shows pipeline health at any moment.
NEXT
- Build log 04: the star schema, table partitioning, and the indexes that make aggregation queries fast.
