Skip to main content

What is Streaming?

Streaming delivers responses incrementally as they’re generated, instead of waiting for the complete response. This provides:
  • Better UX: Users see text appearing in real-time
  • Lower latency: First token arrives quickly
  • No timeouts: Streaming bypasses the 60-second timeout

Enabling Streaming

Use the stream=True parameter with invoke():
from reminix import Reminix

client = Reminix()

# Streaming invoke
for chunk in client.agents.invoke(
    "my-agent",
    prompt="Write a story",
    stream=True
):
    print(chunk.chunk, end="", flush=True)

# Streaming chat
for chunk in client.agents.invoke(
    "chat-assistant",
    messages=[{"role": "user", "content": "Tell me a story"}],
    stream=True
):
    print(chunk.chunk, end="", flush=True)

Stream Format

Streaming uses Server-Sent Events (SSE). Each event contains a JSON chunk:
data: {"chunk": "Once"}

data: {"chunk": " upon"}

data: {"chunk": " a time"}

data: [DONE]
The final [DONE] message indicates the stream is complete.

Collecting Full Response

To get the complete response from a stream:
chunks = []

for chunk in client.agents.invoke("my-agent", prompt="Generate", stream=True):
    chunks.append(chunk.chunk)
    print(chunk.chunk, end="", flush=True)

full_response = "".join(chunks)
print(f"\n\nFull response: {full_response}")

Use Cases

ChatGPT-like Interface

Build real-time chat experiences:
def stream_chat(agent: str, message: str):
    messages = [{"role": "user", "content": message}]
    
    print("Assistant: ", end="")
    for chunk in client.agents.invoke(
        agent,
        messages=messages,
        stream=True
    ):
        print(chunk.chunk, end="", flush=True)
    print()  # Newline after response

stream_chat("assistant", "Explain quantum computing")

Long-Running Tasks

Avoid timeouts for lengthy operations:
# Without streaming: May timeout after 60s
response = client.agents.invoke("analyzer", prompt="Process large dataset")

# With streaming: No timeout concerns
for chunk in client.agents.invoke(
    "analyzer",
    prompt="Process large dataset",
    stream=True
):
    print(chunk.chunk, end="")

Progress Updates

Agents can stream progress updates:
data: {"chunk": "Analyzing data..."}
data: {"chunk": "\nProcessing 1000 records..."}
data: {"chunk": "\nCompleted 50%..."}
data: {"chunk": "\nCompleted 100%"}
data: {"chunk": "\n\nResults: ..."}
data: [DONE]

Streaming vs Non-Streaming

FeatureNon-StreamingStreaming
Response deliveryAll at onceIncremental
Timeout60 secondsNo timeout
IdempotencyYes (cached)No caching
Use caseQuick operationsLong tasks, real-time UI

Error Handling

Errors during streaming are delivered as exceptions:
try:
    for chunk in client.agents.invoke("agent", prompt="Generate", stream=True):
        print(chunk.chunk, end="")
except Exception as e:
    print(f"\nStream error: {e}")
If an error occurs mid-stream, you’ll receive partial content before the error.

Async Streaming

For async applications:
import asyncio
from reminix import AsyncReminix

async def main() -> None:
    client = AsyncReminix()

    async for chunk in await client.agents.invoke(
        "my-agent",
        messages=[{"role": "user", "content": "Tell me a story"}],
        stream=True
    ):
        print(chunk.chunk, end="", flush=True)
    print()

asyncio.run(main())