Documentation Index
Fetch the complete documentation index at: https://langwatch.ai/docs/llms.txt
Use this file to discover all available pages before exploring further.
Activity Monitor — event-sourcing architecture
Why this exists
The original D2 receiver implementation (commits1abae1676,
92e515cc2) wrote OCSF-normalised events directly into ClickHouse
gateway_activity_events from the Hono receiver handler, and the
plan for anomaly detection (Option C v0) was a poller worker that
periodically swept active AnomalyRule rows and SELECT-ed against
the CH table.
Per @rchaves’s 2026-04-27 directive — “event sourcing is the one
true way” — and @master_orchestrator’s follow-up (rebase/learn from
PR #3351) we
redesigned the trigger architecture before the eval engine landed.
The receiver now appends an ActivityEventReceived event to
event_log, and a dedicated activity-monitor-processing pipeline
takes over from there. Anomaly detection becomes a reactor that
fires as new events arrive, not a worker that polls.
The pipeline
pipelines/trace-processing/ (PR #3351’s
alertTrigger reactor) — same definePipeline().withFoldProjection(). withMapProjection().withReactor() builder, same
ReactorDefinition<EventShape, FoldState> contract, same
triggerActionDispatch.ts shared helper.
Why a dedicated pipeline (not bolted onto trace-processing)
Per @master_orchestrator’s call: gateway/activity events have different aggregate semantics from traces. A trace is a multi-span aggregate that folds into aTraceSummaryData over
its lifetime. An activity event is a single completed
observation of upstream platform behaviour — there’s no
multi-event aggregate to fold across; each event already has
final cost/tokens/actor when it arrives.
Bolting them onto trace-processing would force one of:
- Activity events get represented as fake single-span traces (lossy + confusing — trace_summaries would mix gateway-proxied traces and per-event activity rows under the same TenantId).
- trace_summaries grows a discriminator column and the fold projection becomes branchy.
activity-monitor-processing pipeline keeps each surface’s
aggregate semantics clean.
Aggregate identity
anomalyWindow) does not aggregate events
into an aggregate — it aggregates across aggregates within a
tenant, keyed by tenant + rolling window. That’s a different shape
from trace-processing’s “fold spans into a trace summary” —
in our case the fold is “tally per-tenant rolling spend / request
count / per-actor breakdown for the past N minutes/hours”. Same
machinery, different aggregate semantics.
Slicing the redesign
Per @master_orchestrator’s C0/C1/C2/C3 sequence:C0 — this doc + spec updates
- This architecture doc.
specs/ai-gateway/governance/anomaly-detection.featureupdated to drop poller language; reactor framing throughout.AnomalyAlertPrisma model + migration20260427020000_add_anomaly_alert/doc-comment updated to reference the reactor as producer.- Existing receivers continue to write CH directly until C1 lands — this slice is doc-only so the team can review the architecture before more code moves.
C1 — receiver → event_log → projection reactor
- New event schema:
ActivityEventReceivedwith the OCSF-normalised ActivityEventRow shape. - New command:
RecordActivityEventCommandwired into the pipeline. - Refactor
/api/ingest/otel/:sourceIdand/api/ingest/webhook/:sourceIdto call the command instead of writing CH directly. - Map projection
activityEventStoragewrites togateway_activity_events(replaces today’s direct insert). - Dogfood: curl → 202 → row visible in CH (same as today, just via event-sourced path).
C2 — AnomalyAlert + anomaly reactor for one rule type
- Apply the AnomalyAlert migration that’s already drafted but doesn’t ship behaviour yet.
- Add
anomalyWindowfold projection (per-tenant rolling totals). - Add
anomalyDetectionreactor forspend_spikeonly first (cleanest mapping to the existing CostUSD field). - Wire into
api.activityMonitor.recentAnomalies(replaces current[]stub). - Dogfood: create rule in Alexis’s UI → curl violating event →
alert appears on
/governancewithin ~30s.
C3 — Dispatch destinations
- Generic webhook + log-only first (matches PR #3351’s triggerActionDispatch shape).
- Slack / PagerDuty / SIEM / email follow as per-destination adapter slices once the reactor pattern is proven.
What we keep from the v0 receiver code
IngestionSourceService(CRUD + auth) — unchanged.gateway_activity_eventsCH schema (migration00019_*) — unchanged. The map projection writes the same columns.- OTel + webhook normalisers (
normalizers/otel.tsetc.) — unchanged. They get called from the map projection now instead of the receiver handler. - All receiver auth + sourceId-mismatch + 24h secret rotation grace — unchanged.
What we drop from the v0 receiver code
- The direct
ActivityEventRepository.insert(...)call from the receiver handler. The receiver instead enqueues an event into the pipeline; the map projection does the actual CH insert. - The poller-based AnomalyEvaluatorService design that was sketched but never shipped. Replaced by the anomaly reactor.
Test strategy per slice
| Slice | BDD spec | Integration test | Dogfood |
|---|---|---|---|
| C0 (this) | anomaly-detection.feature updated | n/a (doc + schema) | architecture review in-channel |
| C1 | activity-monitor pipeline scenarios in activity-monitor.feature | pipeline test: append event → projection fires → CH row | curl → 202 → CH SELECT |
| C2 | spend_spike scenario in anomaly-detection.feature | reactor test: violating fold state → AnomalyAlert.upsert called | UI rule + violating event → /governance shows alert |
| C3 | dispatch scenarios in anomaly-detection.feature | reactor test: dispatch helper called with right shape | webhook receives canonical body |
evaluateNow
appends a synthetic event and lets the reactor handle it (test
harness, not parallel code path).
Cross-references
- PR #3351 — feat: event-driven trace triggers via reactor (the pattern this redesign learns from).
anomaly-detection.feature— user-facing contract, updated for event-sourcing.anomaly-rules.feature— configuration entity (already shipped, unchanged).activity-monitor.feature— admin UI contract (already shipped; pipeline section adds in C1).architecture.md— top-level governance architecture; this doc is the activity-monitor deep-dive linked from the “Activity Monitor (Tier C/D)” block.