sequenceDiagram
participant U as User
participant R as Runner
participant L as LLM
participant T as Tool
U->>R: run_stream(prompt)
R->>L: completion(stream=True)
loop For each chunk
L->>R: chunk
R->>U: RawChunkEvent
R->>U: PartialOutputEvent
end
alt Has tool calls
R->>U: ToolCallEvent
R->>T: Execute tool
T->>R: Result
R->>U: ToolResponseEvent
R->>L: Continue with result
end
R->>U: FinalResultEvent
11 Streaming
Real-time responses for better user experience.
Note
Code Reference: code/v0.8/src/agentsilex/
stream_event.pyrunner.py
11.1 Why Streaming?
Without streaming:
User: "Tell me about AI"
[... 3 seconds of waiting ...]
Assistant: "Artificial Intelligence is a field of..."
With streaming:
User: "Tell me about AI"
Assistant: "A" "r" "t" "i" "f" "i" "c" "i" "a" "l" ...
Users see responses character by character, making the experience feel faster and more interactive.
11.2 Event Types
We define events for different things that happen during execution (stream_event.py):
from typing import Any
class Event:
def __init__(self, type: str, data: Any = None):
self.type = type
self.data = data or {}
def __repr__(self):
return f"Event(type={self.type}, data={self.data})"
class RawChunkEvent(Event):
def __init__(self, chunk: Any):
super().__init__(type="raw_chunk", data={"chunk": chunk})
class PartialOutputEvent(Event):
def __init__(self, content: str):
super().__init__(type="partial_output", data={"content": content})
class AgentHandoffEvent(Event):
def __init__(self, agent_name: str):
super().__init__(type="agent_handoff", data={"agent_name": agent_name})
class ToolCallEvent(Event):
def __init__(self, func_request: Any):
super().__init__(
type="tool_call",
data={"func_request": func_request},
)
class ToolResponseEvent(Event):
def __init__(self, func_response: Any):
super().__init__(
type="tool_response",
data={"func_response": func_response},
)
class FinalResultEvent(Event):
def __init__(self, final_output: Any):
super().__init__(
type="final_result",
data={"final_output": final_output},
)Event types:
| Event | Purpose |
|---|---|
RawChunkEvent |
Raw LLM response chunk |
PartialOutputEvent |
Text content to display |
AgentHandoffEvent |
Agent is transferring control |
ToolCallEvent |
Tool is being called |
ToolResponseEvent |
Tool returned result |
FinalResultEvent |
Execution complete |
11.3 The run_stream Method
Add streaming to Runner (runner.py):
from typing import Generator
import collections
from litellm import stream_chunk_builder
from agentsilex.stream_event import (
Event,
PartialOutputEvent,
RawChunkEvent,
FinalResultEvent,
AgentHandoffEvent,
ToolCallEvent,
ToolResponseEvent,
)
class Runner:
# ... existing code ...
def run_stream(
self,
agent: Agent,
prompt: str,
) -> Generator[Event, None, None]:
with span("workflow_run", run_id=str(uuid.uuid4())):
span_manager.switch_to(f"agent_{agent.name}", agent=agent.name)
current_agent = agent
msg = user_msg(prompt)
self.session.add_new_messages([msg])
loop_count = 0
should_stop = False
while loop_count < 10 and not should_stop:
# callbacks before LLM call
for callback_func in self.before_llm_call_callbacks:
callback_func(self.session)
dialogs = self.session.get_dialogs()
tools_spec = (
current_agent.tools_set.get_specification()
+ current_agent.handoffs.get_specification()
)
complete_dialogs = [current_agent.get_system_prompt()] + dialogs
# Enable streaming
stream = completion(
model=current_agent.model,
messages=complete_dialogs,
tools=tools_spec if tools_spec else None,
stream=True, # Key difference!
)
chunks = []
partial_tool_calls = collections.defaultdict(
lambda: {"id": "", "name": "", "arguments": ""}
)
for chunk in stream:
chunks.append(chunk)
yield RawChunkEvent(chunk)
delta = chunk.choices[0].delta
# Yield text content as it arrives
delta_msg = delta.content
if delta_msg:
yield PartialOutputEvent(delta_msg)
# Accumulate tool calls from chunks
if hasattr(delta, "tool_calls") and delta.tool_calls:
for tc_delta in delta.tool_calls:
idx = tc_delta.index or 0
if hasattr(tc_delta, "id") and tc_delta.id:
partial_tool_calls[idx]["id"] = tc_delta.id
if tc_delta.function and tc_delta.function.name:
partial_tool_calls[idx]["name"] = tc_delta.function.name
if tc_delta.function and tc_delta.function.arguments:
partial_tool_calls[idx]["arguments"] += tc_delta.function.arguments
# Rebuild complete response from chunks
response_restored = stream_chunk_builder(chunks)
response_message = response_restored.choices[0].message
self.session.add_new_messages([response_message])
if not partial_tool_calls:
span_manager.end_current()
should_stop = True
yield FinalResultEvent(final_output=response_message)
tool_calls = self.convert_to_tool_call_spec_list(partial_tool_calls)
# Execute tools and yield events
tools_response = []
for call_spec in tool_calls:
if call_spec.function.name.startswith(HANDOFF_TOOL_PREFIX):
continue
with span(f"function_call_{call_spec.function.name}"):
yield ToolCallEvent(func_request=call_spec)
call_result = current_agent.tools_set.execute_function_call(
self.context, call_spec
)
tools_response.append(call_result)
yield ToolResponseEvent(call_result)
self.session.add_new_messages(tools_response)
# Handle handoffs
handoff_responses = [
call_spec for call_spec in tool_calls
if call_spec.function.name.startswith(HANDOFF_TOOL_PREFIX)
]
if handoff_responses:
agent_spec = handoff_responses[0]
agent_name = agent_spec.function.name[len(HANDOFF_TOOL_PREFIX):]
span_manager.switch_to(f"agent_{agent_name}", agent=agent_name)
yield AgentHandoffEvent(agent_name=agent_name)
current_agent, handoff_response = (
current_agent.handoffs.handoff_agent(agent_spec)
)
self.session.add_new_messages([handoff_response])
loop_count += 1
span_manager.end_current()
if loop_count >= 10:
yield FinalResultEvent(final_output="Error: Exceeded max iterations")11.4 Helper: Converting Partial Tool Calls
Tool calls arrive in chunks. We accumulate them and convert to proper specs:
def convert_to_tool_call_spec_list(self, items) -> List[ChatCompletionMessageToolCall]:
ans = []
for data in items.values():
func = Function(
name=data["name"],
arguments=data["arguments"],
)
call_spec = ChatCompletionMessageToolCall(
id=data["id"], function=func, type="function"
)
ans.append(call_spec)
return ans11.5 Usage Example
from agentsilex import Agent, Runner, Session, tool
from agentsilex.stream_event import PartialOutputEvent, ToolCallEvent, FinalResultEvent
@tool
def get_weather(city: str) -> str:
"""Get weather for a city."""
return f"Weather in {city}: 72°F"
agent = Agent(
name="assistant",
model="gpt-4o",
instructions="You are a helpful weather assistant.",
tools=[get_weather],
)
session = Session()
runner = Runner(session)
# Stream the response
for event in runner.run_stream(agent, "What's the weather in Tokyo?"):
if isinstance(event, PartialOutputEvent):
# Print text as it arrives
print(event.data["content"], end="", flush=True)
elif isinstance(event, ToolCallEvent):
print(f"\n[Calling: {event.data['func_request'].function.name}]")
elif isinstance(event, FinalResultEvent):
print("\n[Done]")Output:
I'll check the weather in Tokyo for you.
[Calling: get_weather]
The weather in Tokyo is 72°F.
[Done]
11.6 CLI Integration
For a CLI app:
import sys
for event in runner.run_stream(agent, user_input):
if isinstance(event, PartialOutputEvent):
sys.stdout.write(event.data["content"])
sys.stdout.flush()
elif isinstance(event, AgentHandoffEvent):
print(f"\n[Transferring to {event.data['agent_name']}...]")
elif isinstance(event, FinalResultEvent):
print() # Newline at end11.7 Web App Integration
For a FastAPI endpoint:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/chat")
async def chat(prompt: str):
def generate():
for event in runner.run_stream(agent, prompt):
if isinstance(event, PartialOutputEvent):
yield event.data["content"]
return StreamingResponse(generate(), media_type="text/plain")11.8 Event Flow Diagram
11.9 Stream vs Non-Stream
The relationship between run() and run_stream():
| Aspect | run() |
run_stream() |
|---|---|---|
| Return type | RunResult |
Generator[Event] |
| Wait for complete response | Yes | No |
| Real-time output | No | Yes |
| Use case | Background processing | Interactive UX |
11.10 Handling All Event Types
for event in runner.run_stream(agent, prompt):
match event.type:
case "raw_chunk":
# Raw LLM chunk for debugging
pass
case "partial_output":
print(event.data["content"], end="")
case "tool_call":
print(f"\n[Tool: {event.data['func_request'].function.name}]")
case "tool_response":
print(f"[Result: {event.data['func_response']['content'][:50]}...]")
case "agent_handoff":
print(f"\n[Handoff to: {event.data['agent_name']}]")
case "final_result":
break11.11 Key Design Decisions
| Decision | Why |
|---|---|
| Generator-based | Memory efficient, natural Python pattern |
| Event objects | Structured, type-safe |
| Accumulate chunks | Need complete response for session |
| Same loop structure | Reuses existing logic |
TipPart III Complete!
cd code/v0.8The framework now has all production features:
- Observability — Trace every agent action
- MCP Client — Connect to any MCP server
- Callbacks — Customize memory management
- Streaming — Real-time responses
Total: ~1000 lines. A complete, minimal agent framework!