DOC: spectrum-j
STATUS: ● PUBLISHED
SYSTEM SPECTRUM

JSONB Staging: Keep Raw Events Unmodified

Why the staging table stores event payloads as JSONB, not typed columns.

Cover image — JSONB Staging: Keep Raw Events Unmodified

Every event that arrives at the Spectrum ingestion endpoint lands first in raw_events, untouched. No parsing, no validation beyond structure, no column extraction. Just a JSONB blob, a timestamp, and a status flag. This is build log 02: the staging pattern, and why skipping it was a mistake I made once.

// 01 — THE ORIGINAL APPROACH

The first version parsed events inline at ingestion: pull account_id, user_id, event_type, properties from the JSON and insert typed rows directly into fact_events. Simpler, one step, one table.

The problems showed up quickly:

The pipeline had no separation between “receiving data” and “interpreting data.”

// 02 — THE STAGING TABLE

CREATE TABLE raw_events (
    id          UUID DEFAULT gen_random_uuid() PRIMARY KEY,
    payload     JSONB NOT NULL,
    received_at TIMESTAMPTZ DEFAULT NOW() NOT NULL,
    status      TEXT DEFAULT 'pending' NOT NULL,  -- pending / processing / done / failed
    processed_at TIMESTAMPTZ,
    error_msg   TEXT
);

The ingestion endpoint does exactly one thing: write the raw JSON into payload. That’s it. If the JSON is valid JSON, the row gets written. Interpretation happens later, in the pipeline.

// 03 — THE PIPELINE’S JOB

The pipeline runs on a five-minute tick. It claims pending rows with FOR UPDATE SKIP LOCKED, then for each row:

  1. Extracts fields from payload with JSONB operators (->, ->>, #>>)
  2. Upserts dimension rows (dim_accounts, dim_users, dim_geography)
  3. Inserts the fact row into fact_events with FK references to the dimensions
  4. Marks the raw_events row as done

If any step fails, the row stays pending (or is marked failed with error_msg set) and is retried on the next run. No event is silently dropped.

// 04 — THE JSONB OPERATORS

The extraction code in the pipeline reads naturally:

event_type = payload['event_type']          # KeyError if missing
account_id = payload.get('account_id')      # None if missing
region     = payload.get('geo', {}).get('region')

And in the PostgreSQL functions themselves:

p_event_type := (p_payload->>'event_type')::TEXT;
p_account_id := (p_payload->>'account_id')::UUID;

JSONB makes it trivial to handle optional, nested, or varying-shape payloads without needing to alter the staging table schema every time the event shape evolves.

// 05 — WHAT STAGING BUYS

Durability: every event is persisted before it’s interpreted. A crash mid-pipeline doesn’t lose events. They’re still in raw_events with status = 'pending'.

Reprocessability: if the pipeline logic has a bug (wrong dimension mapping, bad aggregation), I can reset affected rows to pending and reprocess them with the fixed code.

Schema decoupling: the ingestion endpoint and the warehouse schema can evolve independently. Adding a new fact column doesn’t require touching the endpoint, and the endpoint doesn’t need to know what Spectrum does with the data it receives.

Observability: SELECT status, count(*) FROM raw_events GROUP BY 1 immediately shows how many events are pending, processing, done, or failed.

NEXT

@frogwebp brand mark
ANTHONY PENA · @FROGWEBP
I build data systems and write about everything around them, the architecture, the failures, what each one teaches me. Documenting in public since 2021: the process, not just the result.

// NEWSLETTER — THE BUILD LOG SIGNAL

When I ship something or learn something worth keeping, it lands here first — build logs, concepts, and the honest process behind them. Come along; no spam, leave anytime.