posthog.tracks — source & the incremental MERGE model
TL;DR. Pipeline is PostHog batch export → GCS parquet →load_posthog_tracks.py→posthog.raw_tracks→ dbt incremental MERGE (unique_key=event_id) →posthog.tracks. The table is currently degraded / under review: ~28.8M excess rows from duplicatedevent_ids, surfaced by a sourceuniquetest. See 2026-06-12 posthog.tracks duplicate event_id.
Source pipeline
| Layer | Object | File / owner |
|---|---|---|
| Export (PostHog) | batch export → GCS parquet | gs://data-platform-dataflow/{env}/posthog_batch/events/{date}/{hour}/*.parquet.sz |
| Sensor (Airflow) | wait_for_posthog_batch | dag.py · GCSUploadSessionCompleteSensor (deferrable, 30-min inactivity, 65-min timeout) |
| Load (GKE Python) | posthog.raw_tracks | load_posthog_tracks.py · parquet → Pydantic (RawPostHogEvent) → BQ; DELETE-before-APPEND on window_start |
| Freshness (dbt) | source:posthog.raw_tracks | dag.py · dbt source freshness (warn 2h / error 4h on bq_ingested_timestamp) |
| Transform (dbt) | posthog_tracks → posthog.tracks | models/posthog_tracks.sql · incremental MERGE, unique_key=event_id ← the consumer table |
| Test (dbt) | source & model tests on tracks | dbt/sources/sources.yml + models/posthog_tracks.yml · twice daily (06:00 / 18:00 UTC) |
Lineage (amber = degraded / under review)
posthog_batch/events"] loader["load_posthog_tracks.py
→ raw_tracks"] raw["posthog.raw_tracks
2026-03-12 → present"] model["posthog_tracks (dbt)
incremental MERGE on event_id"] tracks["posthog.tracks
~3.1B rows, all history"] backfill["posthog.backfill_tracks
2025-05 → 2026-03-11"] mergebf["merge_backfill_to_tracks.py
day-by-day, insert-only"] bucket --> loader --> raw --> model --> tracks backfill --> mergebf --> tracks classDef partial fill:#fff2cc,stroke:#c90,color:#000 classDef frozen fill:#eaeef2,stroke:#8c959f,color:#57606a class tracks,model partial class backfill,mergebf frozen
The posthog_tracks incremental model
models/posthog_tracks.sql is an incremental merge with
unique_key='event_id', full_refresh=false, partitioned
by created_at (DAY) and clustered by event_name,
user_id. It only processes rows with
created_at >= 2026-03-04 (the cutover date when the legacy
Dataflow pipeline stopped); earlier history already lives in tracks
from the completed backfill.
- Source CTE reads
source('posthog','raw_tracks'), filterscreated_at >= 2026-03-04, and on incremental runs only takes rows withbq_ingested_timestamp > max(this)(watermark). - Within-batch dedup:
ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY bq_ingested_timestamp DESC), keepingrn = 1— so a single dbt run never inserts two rows for oneevent_id. - MERGE on
event_idonly (global, no partition predicate in the compiled SQL). This is the key behaviour behind the duplicate report: when anevent_idalready exists twice in the target,ON event_idmatches both rows and BigQuery updates both from the one source row.
Columns
| Column | Type | Description |
|---|---|---|
event_id | STRING | PostHog event UUID, stable across retries. unique_key. Re-emitted by PostHog for months-old events. |
created_at | TIMESTAMP | Event timestamp (UTC). Partition key. Re-written on re-delivery today (secondary bug; fix = merge_exclude_columns=['created_at']). |
event_name | STRING | PostHog event name. Cluster key. |
user_id | STRING | PostHog distinct_id mapped to Moises user. Cluster key. |
person_id | STRING | PostHog person UUID. |
session_id | STRING | From session_id or $session_id. |
active_feature_flags | JSON | Flags active at event time ($active_feature_flags). |
feature_flags_values | JSON | Flag evaluations from $feature/* properties. |
lib_values | JSON | PostHog library metadata ($lib*). Growth derives platform from $lib. |
device_model / device_type / device_os | STRING | Device attributes from event properties. |
ip_address | STRING | Client IP ($ip). |
client | STRING | Client/source marker (web / ios / android). |
geoip_* | STRING | GeoIP-enriched city / continent / country / time zone. |
app_version / locale | STRING | From $app_version / $locale. |
event_payload | JSON | Full PostHog properties payload, preserved for downstream analysis. |
bq_ingested_timestamp | TIMESTAMP | Loader stamp — one datetime.now() per loader run, copied to every row. MERGE dedup tiebreaker. Not an insert time. |
event_inserted_at | TIMESTAMP | Upstream insert timestamp from the export payload (_inserted_at / inserted_at). |
Data tests
Two layers of tests reference tracks:
| Where | Test | Scope | Status |
|---|---|---|---|
dbt/sources/sources.yml | unique on event_id | created_at >= 2026-03-04 | FAIL — pulled into bq-marketing via +tag:marketing on 2026-06-11. |
dbt/sources/sources.yml | not_null on event_id, created_at | created_at >= 2026-03-04 | PASS |
dbt/sources/sources.yml | elementary volume anomalies (incl. purchase_flow) | 14-day training | — |
models/posthog_tracks.yml | model-level unique on event_id | n/a | Disabled — commented out; will be re-enabled (scoped >= 2026-03-04) after dedup. |
models/posthog_tracks.yml | elementary volume / dimension / freshness anomalies | 28-day training | — |
Because the owning model's unique test is commented out, the
posthog-tracks DAG never caught the duplicates; the
bq-marketing DAG did, once the marketing model added a
source('posthog','tracks') dependency. Re-enabling the model-level
test (gated on dedup) puts the alarm back on the owning DAG.
Repo files
| File | Purpose |
|---|---|
| dag.py | Hourly DAG: sensor → loader → freshness → dbt run → (gated) dbt test. |
| load_posthog_tracks.py | Core loader: GCS parquet → Pydantic → raw_tracks. |
| common.py | DAG-layer constants and CLI arg builders; gcs_prefix_template. |
| models/posthog_tracks.sql / .yml | Incremental MERGE model + tests for tracks. |
| merge_backfill_to_tracks.py | Ad-hoc day-by-day insert-only MERGE of backfill_tracks into tracks (legacy dup contributor). |
| dedup_tracks.py | One-time dedup (snapshot + atomic CREATE OR REPLACE). Lives on branch NOREF/posthog-tracks-dedup. |
| dbt/sources/sources.yml | posthog source declaration + event_id tests. |
See the full investigation: 2026-06-12 posthog.tracks duplicate event_id · PostHog Tracks home