Streaming

Stream responses with async generators

Stream responses to clients using Python async generators.

How It Works

Use dedicated streaming handlers with yield to stream chunks:

@agent.invoke_stream
async def handle_invoke_stream(input_data: dict):
    yield {"chunk": "Hello "}
    yield {"chunk": "World!"}

The runtime automatically:

  1. Routes stream: true requests to the streaming handler
  2. Converts chunks to Server-Sent Events (SSE)
  3. Sends data: {"chunk": "..."} for each yield
  4. Sends data: [DONE] when complete

Streaming Invoke

import asyncio
from reminix.runtime import Agent, serve

agent = Agent("streamer")

@agent.invoke_stream
async def handle_invoke_stream(input_data: dict):
    text = input_data.get("text", "Hello world")
    words = text.split()

    for word in words:
        yield {"chunk": word + " "}
        await asyncio.sleep(0.1)  # Simulate processing

if __name__ == "__main__":
    serve(agent, port=8080)

Streaming Chat

@agent.chat_stream
async def handle_chat_stream(messages: list):
    last_message = messages[-1]["content"] if messages else ""
    response = f"You said: {last_message}"

    for char in response:
        yield {"chunk": char}
        await asyncio.sleep(0.02)  # Simulate typing

Handler Routing

The server routes to the appropriate handler based on the client's stream flag:

Client RequestHandler Used
stream: false@agent.invoke / @agent.chat
stream: true@agent.invoke_stream / @agent.chat_stream

Testing Streams

Request with stream: true:

curl -X POST http://localhost:8080/agent/streamer/invoke \
  -H "Content-Type: application/json" \
  -d '{"input": {"text": "Hello world"}, "stream": true}'

Response:

data: {"chunk": "Hello "}

data: {"chunk": "world "}

data: [DONE]

Next Steps

On this page