Streaming

Stream responses using async generators

Stream responses to clients using Python async generators.

The Agent

import asyncio
from reminix.runtime import Agent, serve

agent = Agent("streamer")

@agent.invoke_stream
async def handle_invoke_stream(input_data: dict):
    """Stream the input text word by word."""
    text = input_data.get("text", "Hello world")
    words = text.split()

    for word in words:
        yield {"chunk": word + " "}
        await asyncio.sleep(0.1)  # Simulate processing delay

@agent.chat_stream
async def handle_chat_stream(messages: list):
    """Stream a response to the chat."""
    last_message = messages[-1]["content"] if messages else "Hello"

    response = f"I received your message: '{last_message}'. Let me think..."

    for char in response:
        yield {"chunk": char}
        await asyncio.sleep(0.02)  # Simulate typing

if __name__ == "__main__":
    serve(agent, port=8080)

How Streaming Works

When you register a streaming handler using @agent.invoke_stream or @agent.chat_stream, the runtime:

  1. Routes stream: true requests to the streaming handler
  2. Converts chunks to Server-Sent Events (SSE)
  3. Sends data: {"chunk": "..."} for each yielded chunk
  4. Sends data: [DONE] when complete

Handler Routing

Client RequestHandler Used
stream: false@agent.invoke / @agent.chat
stream: true@agent.invoke_stream / @agent.chat_stream

Running

python agent.py

Testing

# Streaming invoke
curl -X POST http://localhost:8080/agent/streamer/invoke \
    -H "Content-Type: application/json" \
    -d '{"input": {"text": "Hello world"}, "stream": true}'

# Streaming chat
curl -X POST http://localhost:8080/agent/streamer/chat \
    -H "Content-Type: application/json" \
    -d '{"messages": [{"role": "user", "content": "Hi"}], "stream": true}'

SSE Response Format

data: {"chunk": "Hello "}

data: {"chunk": "world "}

data: [DONE]

Next Steps

On this page