Skip to main content

Documentation Index

Fetch the complete documentation index at: https://langwatch.ai/docs/llms.txt

Use this file to discover all available pages before exploring further.

Some AI platforms push events to LangWatch via webhook or OTLP (see ingestion sources). Others expose a paginated audit-log REST API or drop NDJSON files in an S3 bucket on a schedule, and you’re expected to pull. Pull-mode connectors are LangWatch’s universal abstraction for those, you declare the source shape, the framework handles polling + pagination + parsing + cursor-based resume.
Pairs with: Ingestion sources (the broader substrate that pull-mode lands events into) and CLI debugging (langwatch ingest tail shows pulled events alongside pushed ones).
Available on Enterprise plans. Pull-mode connectors ship as part of the Enterprise multi-source governance ingestion stack. Apache 2.0 deployments use single-source otel_generic push only. See Open-core licensing.
Inspired by Singer Tap, Airbyte CDK, Apache Camel, Kafka Connect (pull side). The goal is the same: one declarative config → one universal runner → no per-platform adapter code unless absolutely required.

Two universal adapters + reference implementations

AdapterWhat it pollsWhen to use
http_pollingPaginated REST APIs with cursor-based paginationMicrosoft Copilot Studio audit logs; Anthropic compliance API; Workato job-history endpoint; any custom enterprise audit-log API
s3_pollingS3 buckets containing NDJSON, CSV, JSON-array filesAnthropic compliance dump; OpenAI enterprise audit export; custom S3-to-archive pipelines; on-prem object storage with S3-compatible interface (MinIO, etc.)
Reference implementations using http_polling ship for:
  • Microsoft Copilot Studio: one-click admin enable, credentials only
  • OpenAI compliance API: one-click admin enable, credentials only
  • Anthropic compliance API: one-click admin enable, credentials only

How the framework works

┌──────────────────────┐     ┌─────────────────────┐
│  Admin creates       │     │  governancePuller   │
│  IngestionSource     │────▶│  BullMQ worker      │
│  with pullConfig     │     │  (cron schedule)    │
└──────────────────────┘     └──────────┬──────────┘
                                        │ runOnce({ cursor })

                             ┌─────────────────────┐
                             │  PullerAdapter      │
                             │  (http or s3)       │
                             │                     │
                             │  - fetch / list     │
                             │  - parse            │
                             │  - map per event-   │
                             │    Mapping config   │
                             └──────────┬──────────┘
                                        │ NormalizedEvent[]
                                        │ + new cursor

                             ┌─────────────────────┐
                             │  Unified ingest     │
                             │  path → trace store │
                             └─────────────────────┘
The runner is restart-safe: cursor lives on IngestionSource.lastCursor in PG, persisted only after a successful runOnce. If the worker crashes mid-run, the next run resumes from the last persisted cursor (worst case = small re-pull window if the source is at-least-once).

http_polling config shape

{
  "adapter": "http_polling",
  "url": "https://api.acme.com/v1/audit-log",
  "method": "GET",
  "headers": {
    "Authorization": "Bearer ${{credentials.token}}",
    "X-Org": "${{ingestionSource.organizationId}}"
  },
  "authMode": "bearer",
  "credentialRef": "acme_audit_log_creds",
  "cursorJsonPath": "$.next_cursor",
  "eventsJsonPath": "$.events",
  "schedule": "*/5 * * * *",
  "eventMapping": {
    "source_event_id": "$.id",
    "event_timestamp": "$.created_at",
    "actor": "$.user.email",
    "action": "$.event_type",
    "target": "$.model",
    "cost_usd": "$.usage.cost",
    "tokens_input": "$.usage.input_tokens",
    "tokens_output": "$.usage.output_tokens"
  }
}
  • headers supports template substitution: ${{credentials.<key>}} pulls from the credential ref; ${{ingestionSource.<field>}} pulls from the IngestionSource row (organizationId, name, etc.).
  • cursorJsonPath + eventsJsonPath use JSONPath to locate the next-page cursor + the events array in the response body.
  • schedule is a standard cron expression.
  • eventMapping declares how to extract LangWatch’s normalized event shape from each event in the array.

s3_polling config shape

{
  "adapter": "s3_polling",
  "bucket": "acme-audit-logs",
  "prefix": "anthropic/compliance/",
  "region": "us-east-1",
  "credentialRef": "acme_aws_creds",
  "parser": "ndjson",
  "schedule": "0 * * * *",
  "eventMapping": { "...same JSON-path shape as http_polling..." }
}
  • Cursor = lexicographic-max key seen so far. New runs LIST the bucket
    • prefix and only read keys lexicographically AFTER the cursor.
  • parser"ndjson" | "csv" | "json-array", applied per-file.
  • Credential rotation is honored on every run (no in-process caching).

Normalized event shape

Every adapter returns events in the same canonical shape, so the worker handoff to the trace-store ingest path doesn’t care which source produced them:
FieldTypeNotes
source_event_idstringAdapter-specific stable ID; used for deduplication on re-pull
event_timestampISO 8601 stringWhen the event happened in the source
actorstringTypically user email or service-account name
actionstringEvent type (e.g. "completion", "audit.update")
targetstringModel name or resource being acted on
cost_usdnumber0 if the source doesn’t expose cost
tokens_inputnumber0 if unknown
tokens_outputnumber0 if unknown
raw_payloadstringFull original event for audit, debugging

Writing a custom puller (TypeScript module)

If neither http_polling nor s3_polling fits your source, you can ship a custom adapter:
// langwatch/ee/governance/services/pullers/myCustom.puller.ts
import { PullerAdapter, PullResult } from "./PullerAdapter";
import { z } from "zod";

const ConfigSchema = z.object({
  endpoint: z.string().url(),
  apiKey: z.string(),
  // ... your config shape
});

export class MyCustomPullerAdapter extends PullerAdapter {
  id = "my_custom";

  validateConfig(config: unknown) {
    return ConfigSchema.parse(config);
  }

  async runOnce({ cursor }: { cursor: string | null }): Promise<PullResult> {
    // 1. Fetch from your source
    // 2. Map to NormalizedEvent[]
    // 3. Return { events, cursor: nextCursor, errorCount }
  }
}
Register it in the framework’s adapter registry. The worker + admin UI auto-discover it; no other glue code needed.

Reference impl: Microsoft Copilot Studio

The Copilot Studio reference puller wraps http_polling with a locked-down config matching Microsoft’s documented audit-log API shape. Admins enable it at Settings → Governance → Ingestion sources → Add → Microsoft Copilot Studio, enter their tenant credentials, and the framework handles the rest. URL + auth shape are fixed (only credentials are admin-editable). The same one-click pattern applies for openai_compliance and claude_compliance once those reference impls land alongside.

Where to next

  • Ingestion sources: the broader substrate; push-mode and pull-mode both feed the same trace store
  • CLI debugging: langwatch ingest tail for live inspection of pulled events
  • Compliance architecture — how pulled events flow through the unified observability substrate