Streaming Response Pattern
Test agents that stream responses in chunks using HTTP chunked transfer encoding. This provides progressive output as the LLM generates text.
Use Cases
- Long-form responses where you want progressive output
- Better perceived performance for users
- Agents that use streaming LLM APIs
- Real-time feel for generated content
Complete Working Example
This example demonstrates:
- Creating a server that streams real LLM responses
- Building an adapter that collects chunks into complete response
- Proper stream reading with appropriate APIs
- Full scenario test with streaming
test_testing_remote_agents_streaming
# Source: https://github.com/langwatch/scenario/blob/main/python/examples/test_testing_remote_agents_streaming.py
"""
Example: Testing an agent that returns streaming responses
This test demonstrates handling agents that stream their responses in chunks
rather than returning a complete message at once. The server uses real LLM streaming.
"""
import asyncio
import json
from aiohttp import web
import aiohttp
import pytest
import pytest_asyncio
import scenario
from openai import AsyncOpenAI
# Base URL for the test server (set during server startup)
base_url = ""
class StreamingAgentAdapter(scenario.AgentAdapter):
"""
Adapter for testing agents that stream responses in chunks.
This adapter:
1. Makes an HTTP POST request to the streaming endpoint
2. Collects all chunks as they arrive
3. Returns the complete response after streaming completes
"""
async def call(self, input: scenario.AgentInput) -> scenario.AgentReturnTypes:
# Request streaming response from your agent
async with aiohttp.ClientSession() as session:
async with session.post(
f"{base_url}/chat/stream",
json={"messages": input.messages},
) as response:
# Collect all chunks into a single response
full_response = ""
# Read stream chunk by chunk
async for chunk in response.content.iter_any():
# Decode chunk and append to full response
full_response += chunk.decode("utf-8")
# Return complete response after all chunks received
return full_response
# OpenAI client for LLM
client = AsyncOpenAI()
async def stream_handler(request: web.Request) -> web.StreamResponse:
"""
HTTP endpoint that streams LLM responses chunk by chunk.
This uses chunked transfer encoding to send the response progressively.
"""
data = await request.json()
messages = data["messages"]
# Determine last user message content
last_msg = messages[-1]
content = last_msg["content"]
if not isinstance(content, str):
content = ""
# Set up streaming response
response = web.StreamResponse()
response.headers["Content-Type"] = "text/plain"
response.headers["Transfer-Encoding"] = "chunked"
await response.prepare(request)
# Stream response using real LLM
stream = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are a helpful weather assistant. Provide brief, friendly responses, immediately. Pretend like you have access to a weather API and make up the weather.",
},
{"role": "user", "content": content},
],
temperature=0.7,
stream=True,
)
# Stream chunks to client
async for chunk in stream:
if chunk.choices[0].delta.content:
await response.write(chunk.choices[0].delta.content.encode("utf-8"))
await response.write_eof()
return response
@pytest_asyncio.fixture
async def test_server():
"""
Start a test HTTP server before tests and shut it down after.
This server simulates a deployed agent endpoint with streaming.
"""
global base_url
# Create web application
app = web.Application()
app.router.add_post("/chat/stream", stream_handler)
# Start server on random available port
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "localhost", 0)
await site.start()
# Get the actual port assigned
server = site._server
assert server is not None
port = server.sockets[0].getsockname()[1] # type: ignore[union-attr]
base_url = f"http://localhost:{port}"
yield
# Cleanup: stop server
await runner.cleanup()
@pytest.mark.asyncio
async def test_streaming_response(test_server):
"""
Test agent via HTTP endpoint with streaming response.
This test verifies:
- Adapter correctly handles streaming chunks
- Complete response is assembled from chunks
- Agent provides relevant weather information
- Full scenario flow works with streaming
"""
result = await scenario.run(
name="Streaming weather response",
description="User asks about weather and receives streamed response",
agents=[
scenario.UserSimulatorAgent(model="openai/gpt-4o-mini"),
StreamingAgentAdapter(),
scenario.JudgeAgent(
model="openai/gpt-4o-mini",
criteria=[
"Agent should provide weather information",
"Response should be complete and coherent",
],
),
],
script=[
scenario.user("What's the weather forecast in Amsterdam?"),
scenario.agent(),
scenario.judge(),
],
set_id="python-examples",
)
assert result.successKey Points
- Server streams chunks: Uses
streamTextto get real LLM streaming - Transfer-Encoding: chunked: HTTP header for streaming
- Collect all chunks: Adapter reads stream completely before returning
- TextDecoder: Properly decode binary chunks to text
- Return complete response: Scenario expects full text, not chunks
Adapter Pattern
The adapter collects all streamed chunks:
const streamingAdapter: AgentAdapter = {
role: AgentRole.AGENT,
call: async (input) => {
// Request streaming response
const response = await fetch(`${baseUrl}/chat/stream`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ messages: input.messages }),
});
// Collect chunks
let fullResponse = "";
const reader = response.body?.getReader();
const decoder = new TextDecoder();
if (reader) {
while (true) {
const { done, value } = await reader.read();
if (done) break;
fullResponse += decoder.decode(value, { stream: true });
}
}
return fullResponse;
},
};Why Not Return Chunks?
Scenario expects complete responses because:
- Judge needs full context: Can't evaluate partial responses
- Message history: Stores complete messages for conversation context
- Simplicity: One response per turn
If you need to test streaming behavior specifically (latency, chunk timing), consider integration tests outside Scenario.
Testing Your Own Agent
To test your streaming agent:
const myStreamingAdapter: AgentAdapter = {
role: AgentRole.AGENT,
call: async (input) => {
const response = await fetch("https://my-agent.com/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ query: input.messages }),
});
let result = "";
const reader = response.body?.getReader();
const decoder = new TextDecoder();
if (reader) {
while (true) {
const { done, value } = await reader.read();
if (done) break;
result += decoder.decode(value, { stream: true });
}
}
return result;
},
};See Also
- SSE Pattern - For Server-Sent Events streaming format
- JSON Pattern - For complete non-streaming responses
- Stateful Pattern - For server-side conversation state
- Testing Remote Agents Overview - All HTTP adapter patterns
- Blackbox Testing - Testing philosophy
- Agent Integration - Core adapter interface
