import { setupObservability } from "langwatch/observability/node";
import { getLangWatchTracer } from "langwatch";
import { SpanStatusCode } from "@opentelemetry/api";
import { HttpInstrumentation } from "@opentelemetry/instrumentation-http";
import { ExpressInstrumentation } from "@opentelemetry/instrumentation-express";
import OpenAI from "openai";
// 1. Setup Observability
const handle = setupObservability({
langwatch: {
apiKey: process.env.LANGWATCH_API_KEY
},
serviceName: "rag-service",
instrumentations: [
new HttpInstrumentation(),
new ExpressInstrumentation()
]
});
const tracer = getLangWatchTracer("rag-service");
const client = new OpenAI();
// 2. Define RAG Function
async function retrieveDocuments(query: string): Promise<string[]> {
return tracer.startActiveSpan("rag", async (span) => {
try {
span.setType("rag");
span.setInput({ query });
console.log(`Retrieving documents for: ${query}`);
// Simulate RAG retrieval
const chunks = [
{ document_id: "doc-abc", content: "LangWatch uses OpenTelemetry." },
{ document_id: "doc-def", content: "Express integrates with OpenTelemetry." }
];
span.setAttributes({
"rag.chunks_count": chunks.length,
"rag.query": query
});
// Simulate processing time
await new Promise(resolve => setTimeout(resolve, 100));
const results = chunks.map(c => c.content);
span.setOutput({ documents: results });
span.setStatus({ code: SpanStatusCode.OK });
return results;
} catch (error) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message
});
span.recordException(error);
throw error;
}
});
}
// 3. Define Background Task
async function processResultBackground(resultId: string, llmOutput: string): Promise<string> {
return tracer.startActiveSpan("background-processing", async (span) => {
try {
span.setType("background_job");
span.setInput({ resultId, llmOutput });
console.log(`[Background] Processing result ${resultId}...`);
// Simulate background processing
await new Promise(resolve => setTimeout(resolve, 1000));
const result = `Processed: ${llmOutput.substring(0, 10)}...`;
span.setOutput({ result });
span.setStatus({ code: SpanStatusCode.OK });
console.log(`[Background] Finished processing ${resultId}`);
return result;
} catch (error) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message
});
span.recordException(error);
throw error;
}
});
}
// 4. Define Main Request Handler
async function handleRequest(userQuery: string): Promise<string> {
return tracer.startActiveSpan("handle-user-query", async (span) => {
try {
span.setType("request");
span.setInput({ userQuery });
// Get context documents
const contextDocs = await retrieveDocuments(userQuery);
// Call OpenAI
const completion = await client.chat.completions.create({
model: "gpt-5-mini",
messages: [
{ role: "system", content: `Use this context: ${contextDocs.join(" ")}` },
{ role: "user", content: userQuery }
],
temperature: 0.5,
});
const llmResult = completion.choices[0].message.content || "No response";
// Trigger background processing
const resultId = `res_${Date.now()}`;
processResultBackground(resultId, llmResult).catch(console.error);
console.log(`Enqueued background processing task ${resultId}`);
span.setOutput({ result: llmResult });
span.setStatus({ code: SpanStatusCode.OK });
return llmResult;
} catch (error) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message
});
span.recordException(error);
throw error;
}
});
}
// 5. Simulate Request
async function main() {
console.log("Simulating web request...");
const finalAnswer = await handleRequest("How does LangWatch work with Express?");
console.log(`\nFinal Answer returned to user: ${finalAnswer}`);
// Allow time for background task
await new Promise(resolve => setTimeout(resolve, 2000));
// Graceful shutdown
await handle.shutdown();
}
main().catch(console.error);