Streaming
Stream responses using async generators
Stream responses to clients using Python async generators.
The Agent
import asyncio
from reminix.runtime import Agent, serve
agent = Agent("streamer")
@agent.invoke_stream
async def handle_invoke_stream(input_data: dict):
"""Stream the input text word by word."""
text = input_data.get("text", "Hello world")
words = text.split()
for word in words:
yield {"chunk": word + " "}
await asyncio.sleep(0.1) # Simulate processing delay
@agent.chat_stream
async def handle_chat_stream(messages: list):
"""Stream a response to the chat."""
last_message = messages[-1]["content"] if messages else "Hello"
response = f"I received your message: '{last_message}'. Let me think..."
for char in response:
yield {"chunk": char}
await asyncio.sleep(0.02) # Simulate typing
if __name__ == "__main__":
serve(agent, port=8080)How Streaming Works
When you register a streaming handler using @agent.invoke_stream or @agent.chat_stream, the runtime:
- Routes
stream: truerequests to the streaming handler - Converts chunks to Server-Sent Events (SSE)
- Sends
data: {"chunk": "..."}for each yielded chunk - Sends
data: [DONE]when complete
Handler Routing
| Client Request | Handler Used |
|---|---|
stream: false | @agent.invoke / @agent.chat |
stream: true | @agent.invoke_stream / @agent.chat_stream |
Running
python agent.pyTesting
# Streaming invoke
curl -X POST http://localhost:8080/agent/streamer/invoke \
-H "Content-Type: application/json" \
-d '{"input": {"text": "Hello world"}, "stream": true}'
# Streaming chat
curl -X POST http://localhost:8080/agent/streamer/chat \
-H "Content-Type: application/json" \
-d '{"messages": [{"role": "user", "content": "Hi"}], "stream": true}'SSE Response Format
data: {"chunk": "Hello "}
data: {"chunk": "world "}
data: [DONE]Next Steps
- Basic Agent - Simple non-streaming agent
- Multi-Agent - Run multiple agents