Every event that arrives at the Spectrum ingestion endpoint lands first in raw_events, untouched. No parsing, no validation beyond structure, no column extraction. Just a JSONB blob, a timestamp, and a status flag. This is build log 02: the staging pattern, and why skipping it was a mistake I made once.
// 01 — THE ORIGINAL APPROACH
The first version parsed events inline at ingestion: pull account_id, user_id, event_type, properties from the JSON and insert typed rows directly into fact_events. Simpler, one step, one table.
The problems showed up quickly:
- A malformed event with a new property shape caused the insert to fail, dropping the event.
- Changing
fact_eventsschema (adding a column, changing a partition key) required coordinating with the ingestion endpoint. - There was no way to recover a batch of events that had failed mid-processing without re-replaying from the source.
The pipeline had no separation between “receiving data” and “interpreting data.”
// 02 — THE STAGING TABLE
CREATE TABLE raw_events (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
payload JSONB NOT NULL,
received_at TIMESTAMPTZ DEFAULT NOW() NOT NULL,
status TEXT DEFAULT 'pending' NOT NULL, -- pending / processing / done / failed
processed_at TIMESTAMPTZ,
error_msg TEXT
);
The ingestion endpoint does exactly one thing: write the raw JSON into payload. That’s it. If the JSON is valid JSON, the row gets written. Interpretation happens later, in the pipeline.
// 03 — THE PIPELINE’S JOB
The pipeline runs on a five-minute tick. It claims pending rows with FOR UPDATE SKIP LOCKED, then for each row:
- Extracts fields from
payloadwith JSONB operators (->,->>,#>>) - Upserts dimension rows (
dim_accounts,dim_users,dim_geography) - Inserts the fact row into
fact_eventswith FK references to the dimensions - Marks the
raw_eventsrow asdone
If any step fails, the row stays pending (or is marked failed with error_msg set) and is retried on the next run. No event is silently dropped.
// 04 — THE JSONB OPERATORS
The extraction code in the pipeline reads naturally:
event_type = payload['event_type'] # KeyError if missing
account_id = payload.get('account_id') # None if missing
region = payload.get('geo', {}).get('region')
And in the PostgreSQL functions themselves:
p_event_type := (p_payload->>'event_type')::TEXT;
p_account_id := (p_payload->>'account_id')::UUID;
JSONB makes it trivial to handle optional, nested, or varying-shape payloads without needing to alter the staging table schema every time the event shape evolves.
// 05 — WHAT STAGING BUYS
Durability: every event is persisted before it’s interpreted. A crash mid-pipeline doesn’t lose events. They’re still in raw_events with status = 'pending'.
Reprocessability: if the pipeline logic has a bug (wrong dimension mapping, bad aggregation), I can reset affected rows to pending and reprocess them with the fixed code.
Schema decoupling: the ingestion endpoint and the warehouse schema can evolve independently. Adding a new fact column doesn’t require touching the endpoint, and the endpoint doesn’t need to know what Spectrum does with the data it receives.
Observability: SELECT status, count(*) FROM raw_events GROUP BY 1 immediately shows how many events are pending, processing, done, or failed.
NEXT
- Build log 03: making the ELT pipeline idempotent:
FOR UPDATE SKIP LOCKED, phase commits, and the partition DDL bug.
