11  Streaming

Real-time responses for better user experience.

Note

Code Reference: code/v0.8/src/agentsilex/

  • stream_event.py
  • runner.py

11.1 Why Streaming?

Without streaming:

User: "Tell me about AI"
[... 3 seconds of waiting ...]
Assistant: "Artificial Intelligence is a field of..."

With streaming:

User: "Tell me about AI"
Assistant: "A" "r" "t" "i" "f" "i" "c" "i" "a" "l" ...

Users see responses character by character, making the experience feel faster and more interactive.

11.2 Event Types

We define events for different things that happen during execution (stream_event.py):

from typing import Any


class Event:
    def __init__(self, type: str, data: Any = None):
        self.type = type
        self.data = data or {}

    def __repr__(self):
        return f"Event(type={self.type}, data={self.data})"


class RawChunkEvent(Event):
    def __init__(self, chunk: Any):
        super().__init__(type="raw_chunk", data={"chunk": chunk})


class PartialOutputEvent(Event):
    def __init__(self, content: str):
        super().__init__(type="partial_output", data={"content": content})


class AgentHandoffEvent(Event):
    def __init__(self, agent_name: str):
        super().__init__(type="agent_handoff", data={"agent_name": agent_name})


class ToolCallEvent(Event):
    def __init__(self, func_request: Any):
        super().__init__(
            type="tool_call",
            data={"func_request": func_request},
        )


class ToolResponseEvent(Event):
    def __init__(self, func_response: Any):
        super().__init__(
            type="tool_response",
            data={"func_response": func_response},
        )


class FinalResultEvent(Event):
    def __init__(self, final_output: Any):
        super().__init__(
            type="final_result",
            data={"final_output": final_output},
        )

Event types:

Event Purpose
RawChunkEvent Raw LLM response chunk
PartialOutputEvent Text content to display
AgentHandoffEvent Agent is transferring control
ToolCallEvent Tool is being called
ToolResponseEvent Tool returned result
FinalResultEvent Execution complete

11.3 The run_stream Method

Add streaming to Runner (runner.py):

from typing import Generator
import collections
from litellm import stream_chunk_builder

from agentsilex.stream_event import (
    Event,
    PartialOutputEvent,
    RawChunkEvent,
    FinalResultEvent,
    AgentHandoffEvent,
    ToolCallEvent,
    ToolResponseEvent,
)


class Runner:
    # ... existing code ...

    def run_stream(
        self,
        agent: Agent,
        prompt: str,
    ) -> Generator[Event, None, None]:
        with span("workflow_run", run_id=str(uuid.uuid4())):
            span_manager.switch_to(f"agent_{agent.name}", agent=agent.name)

            current_agent = agent

            msg = user_msg(prompt)
            self.session.add_new_messages([msg])

            loop_count = 0
            should_stop = False
            while loop_count < 10 and not should_stop:
                # callbacks before LLM call
                for callback_func in self.before_llm_call_callbacks:
                    callback_func(self.session)

                dialogs = self.session.get_dialogs()

                tools_spec = (
                    current_agent.tools_set.get_specification()
                    + current_agent.handoffs.get_specification()
                )

                complete_dialogs = [current_agent.get_system_prompt()] + dialogs

                # Enable streaming
                stream = completion(
                    model=current_agent.model,
                    messages=complete_dialogs,
                    tools=tools_spec if tools_spec else None,
                    stream=True,  # Key difference!
                )

                chunks = []
                partial_tool_calls = collections.defaultdict(
                    lambda: {"id": "", "name": "", "arguments": ""}
                )

                for chunk in stream:
                    chunks.append(chunk)

                    yield RawChunkEvent(chunk)

                    delta = chunk.choices[0].delta

                    # Yield text content as it arrives
                    delta_msg = delta.content
                    if delta_msg:
                        yield PartialOutputEvent(delta_msg)

                    # Accumulate tool calls from chunks
                    if hasattr(delta, "tool_calls") and delta.tool_calls:
                        for tc_delta in delta.tool_calls:
                            idx = tc_delta.index or 0
                            if hasattr(tc_delta, "id") and tc_delta.id:
                                partial_tool_calls[idx]["id"] = tc_delta.id
                            if tc_delta.function and tc_delta.function.name:
                                partial_tool_calls[idx]["name"] = tc_delta.function.name
                            if tc_delta.function and tc_delta.function.arguments:
                                partial_tool_calls[idx]["arguments"] += tc_delta.function.arguments

                # Rebuild complete response from chunks
                response_restored = stream_chunk_builder(chunks)
                response_message = response_restored.choices[0].message
                self.session.add_new_messages([response_message])

                if not partial_tool_calls:
                    span_manager.end_current()
                    should_stop = True
                    yield FinalResultEvent(final_output=response_message)

                tool_calls = self.convert_to_tool_call_spec_list(partial_tool_calls)

                # Execute tools and yield events
                tools_response = []
                for call_spec in tool_calls:
                    if call_spec.function.name.startswith(HANDOFF_TOOL_PREFIX):
                        continue

                    with span(f"function_call_{call_spec.function.name}"):
                        yield ToolCallEvent(func_request=call_spec)
                        call_result = current_agent.tools_set.execute_function_call(
                            self.context, call_spec
                        )
                        tools_response.append(call_result)
                        yield ToolResponseEvent(call_result)

                self.session.add_new_messages(tools_response)

                # Handle handoffs
                handoff_responses = [
                    call_spec for call_spec in tool_calls
                    if call_spec.function.name.startswith(HANDOFF_TOOL_PREFIX)
                ]
                if handoff_responses:
                    agent_spec = handoff_responses[0]
                    agent_name = agent_spec.function.name[len(HANDOFF_TOOL_PREFIX):]
                    span_manager.switch_to(f"agent_{agent_name}", agent=agent_name)

                    yield AgentHandoffEvent(agent_name=agent_name)

                    current_agent, handoff_response = (
                        current_agent.handoffs.handoff_agent(agent_spec)
                    )
                    self.session.add_new_messages([handoff_response])

                loop_count += 1

            span_manager.end_current()

            if loop_count >= 10:
                yield FinalResultEvent(final_output="Error: Exceeded max iterations")

11.4 Helper: Converting Partial Tool Calls

Tool calls arrive in chunks. We accumulate them and convert to proper specs:

def convert_to_tool_call_spec_list(self, items) -> List[ChatCompletionMessageToolCall]:
    ans = []

    for data in items.values():
        func = Function(
            name=data["name"],
            arguments=data["arguments"],
        )
        call_spec = ChatCompletionMessageToolCall(
            id=data["id"], function=func, type="function"
        )
        ans.append(call_spec)

    return ans

11.5 Usage Example

from agentsilex import Agent, Runner, Session, tool
from agentsilex.stream_event import PartialOutputEvent, ToolCallEvent, FinalResultEvent

@tool
def get_weather(city: str) -> str:
    """Get weather for a city."""
    return f"Weather in {city}: 72°F"

agent = Agent(
    name="assistant",
    model="gpt-4o",
    instructions="You are a helpful weather assistant.",
    tools=[get_weather],
)

session = Session()
runner = Runner(session)

# Stream the response
for event in runner.run_stream(agent, "What's the weather in Tokyo?"):
    if isinstance(event, PartialOutputEvent):
        # Print text as it arrives
        print(event.data["content"], end="", flush=True)
    elif isinstance(event, ToolCallEvent):
        print(f"\n[Calling: {event.data['func_request'].function.name}]")
    elif isinstance(event, FinalResultEvent):
        print("\n[Done]")

Output:

I'll check the weather in Tokyo for you.
[Calling: get_weather]
The weather in Tokyo is 72°F.
[Done]

11.6 CLI Integration

For a CLI app:

import sys

for event in runner.run_stream(agent, user_input):
    if isinstance(event, PartialOutputEvent):
        sys.stdout.write(event.data["content"])
        sys.stdout.flush()
    elif isinstance(event, AgentHandoffEvent):
        print(f"\n[Transferring to {event.data['agent_name']}...]")
    elif isinstance(event, FinalResultEvent):
        print()  # Newline at end

11.7 Web App Integration

For a FastAPI endpoint:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get("/chat")
async def chat(prompt: str):
    def generate():
        for event in runner.run_stream(agent, prompt):
            if isinstance(event, PartialOutputEvent):
                yield event.data["content"]

    return StreamingResponse(generate(), media_type="text/plain")

11.8 Event Flow Diagram

sequenceDiagram
    participant U as User
    participant R as Runner
    participant L as LLM
    participant T as Tool

    U->>R: run_stream(prompt)
    R->>L: completion(stream=True)

    loop For each chunk
        L->>R: chunk
        R->>U: RawChunkEvent
        R->>U: PartialOutputEvent
    end

    alt Has tool calls
        R->>U: ToolCallEvent
        R->>T: Execute tool
        T->>R: Result
        R->>U: ToolResponseEvent
        R->>L: Continue with result
    end

    R->>U: FinalResultEvent

11.9 Stream vs Non-Stream

The relationship between run() and run_stream():

Aspect run() run_stream()
Return type RunResult Generator[Event]
Wait for complete response Yes No
Real-time output No Yes
Use case Background processing Interactive UX

11.10 Handling All Event Types

for event in runner.run_stream(agent, prompt):
    match event.type:
        case "raw_chunk":
            # Raw LLM chunk for debugging
            pass
        case "partial_output":
            print(event.data["content"], end="")
        case "tool_call":
            print(f"\n[Tool: {event.data['func_request'].function.name}]")
        case "tool_response":
            print(f"[Result: {event.data['func_response']['content'][:50]}...]")
        case "agent_handoff":
            print(f"\n[Handoff to: {event.data['agent_name']}]")
        case "final_result":
            break

11.11 Key Design Decisions

Decision Why
Generator-based Memory efficient, natural Python pattern
Event objects Structured, type-safe
Accumulate chunks Need complete response for session
Same loop structure Reuses existing logic
TipPart III Complete!
cd code/v0.8

The framework now has all production features:

  • Observability — Trace every agent action
  • MCP Client — Connect to any MCP server
  • Callbacks — Customize memory management
  • Streaming — Real-time responses

Total: ~1000 lines. A complete, minimal agent framework!