Streaming enables agents to return outputs immediately, significantly improving user experience. Instead of waiting for the entire response to be generated, the user can see the response being generated in real-time.
Streaming Responses
In Agency Swarm, streaming is handled through the get_response_stream method. The framework returns StreamEvent objects as they are returned by OpenAI, providing direct access to the underlying streaming events.
async def stream_response(message: str):
"""Stream a response and handle events properly."""
full_text = ""
async for event in agency.get_response_stream(message):
# Handle streaming events with data
if hasattr(event, "data"):
data = event.data
# Only capture actual response text, not tool call arguments
if hasattr(data, "delta") and hasattr(data, "type"):
if data.type == "response.output_text.delta":
# Stream the actual response text in real-time
delta_text = data.delta
if delta_text:
print(delta_text, end="", flush=True)
full_text += delta_text
# Skip tool call deltas (we don't want to show those to users)
elif data.type == "response.function_call_arguments.delta":
continue
# Handle validation errors
elif isinstance(event, dict):
event_type = event.get("event", event.get("type"))
if event_type == "error":
print(f"\n❌ Error: {event.get('content', event.get('data', 'Unknown error'))}")
break
print("\n✅ Stream complete")
return full_text
# Usage
await stream_response("I want you to build me a website")
Tools can inject events into the parent SSE stream while they execute. This is especially useful when a tool runs a sub-agent or a long-running operation and you want the client to receive live progress updates alongside the normal agent output.
Accessing the Streaming Context
Inside any BaseTool.run() method, two properties give you access to the live stream:
| Field | Type | What it is |
|---|
self.context.streaming_context | StreamingContext | None | The queue that feeds the parent stream. None when the request is not streaming. |
self.tool_call_id | str | None | The unique ID of this specific tool invocation. Use it to let clients correlate events with the tool call. |
streaming_ctx = self.context.streaming_context
tool_call_id = self.tool_call_id or ""
streaming_ctx is None when the request is not streaming, so always guard before calling put_event.
Any object passed to streaming_ctx.put_event(event) is forwarded into the same stream your client is already consuming. There are three ways to use this, depending on how informative you want the events to be.
Method 1: Raw values
The simplest option — emit any plain value directly. Useful for quick progress strings during development.
class MyTool(BaseTool):
"""Does something and reports progress."""
async def run(self):
streaming_ctx = self.context.streaming_context
if streaming_ctx:
await streaming_ctx.put_event("step 1/3: fetching data…")
data = await _fetch()
if streaming_ctx:
await streaming_ctx.put_event("step 2/3: processing…")
result = await _process(data)
if streaming_ctx:
await streaming_ctx.put_event("step 3/3: done.")
return result
On the consumer side:
async for event in agency.get_response_stream(message):
if isinstance(event, str):
print(f"[tool] {event}")
Method 2: Existing SDK event types
While neither the OpenAI API nor the agents SDK contains event types specifically designed for streaming output from tool execution, you can construct a RawResponsesStreamEvent wrapping a ResponseTextDeltaEvent directly inside the tool. The internal tracking fields (item_id, content_index, etc.) just need placeholder values — the consumer’s existing delta-handling code treats it identically to a normal LLM text delta.
from agents import RawResponsesStreamEvent
from openai.types.responses import ResponseTextDeltaEvent
from agency_swarm.tools import BaseTool
from pydantic import Field
class MyTool(BaseTool):
"""Does something and streams progress as text deltas."""
task: str = Field(..., description="The task to perform")
async def run(self):
streaming_ctx = self.context.streaming_context
async def emit_delta(text: str) -> None:
if streaming_ctx:
await streaming_ctx.put_event(
RawResponsesStreamEvent(
data=ResponseTextDeltaEvent(
content_index=0,
output_index=0,
sequence_number=0,
item_id="tool_progress",
logprobs=[],
delta=text,
type="response.output_text.delta",
)
)
)
await emit_delta("Starting…")
result = await _perform_task(self.task)
await emit_delta(" Done.")
return result
On the consumer side, these events are indistinguishable from regular LLM text deltas unless you inspect the item_id you set on the event:
from agents import RawResponsesStreamEvent
from openai.types.responses import ResponseTextDeltaEvent
async for event in agency.get_response_stream(message):
if isinstance(event, RawResponsesStreamEvent) and isinstance(event.data, ResponseTextDeltaEvent):
if event.data.item_id == "tool_progress":
print(f"[tool] {event.data.delta}", end="", flush=True)
else:
print(event.data.delta, end="", flush=True)
Method 3: Custom event model
For the most informative output you can construct your own class that includes structured metadata — status, tool call ID for correlation, error details, etc. No built-in SDK type covers tool execution progress, so this is the recommended approach for production use, but it will require explicit checks on the consumer side to separate these events from standard SDK events.
from typing import Literal
from pydantic import BaseModel, Field
from agency_swarm.tools import BaseTool
class ToolProgressEvent(BaseModel):
"""Streaming progress event emitted by a tool invocation."""
call_id: str
status: Literal["in_progress", "completed", "failed"] = "in_progress"
delta: str | None = None
error: str | None = None
class LongRunningTool(BaseTool):
"""A tool that reports live progress through the parent stream."""
task: str = Field(..., description="The task to perform")
async def run(self):
streaming_ctx = self.context.streaming_context
tool_call_id = self.tool_call_id or ""
async def emit(status: str, delta: str | None = None, error: str | None = None) -> None:
if streaming_ctx:
await streaming_ctx.put_event(
ToolProgressEvent(call_id=tool_call_id, status=status, delta=delta, error=error)
)
await emit("in_progress", delta="Starting…")
try:
result = await _perform_task(self.task)
await emit("completed")
return result
except Exception as exc:
await emit("failed", error=str(exc))
raise
On the consumer side, filter by type:
async for event in agency.get_response_stream(message):
if isinstance(event, ToolProgressEvent):
print(f"[{event.status}] {event.delta or ''}")
Regardless of the chosen method, events emitted via put_event() are only visible to the client consuming the stream — the calling agent never sees them. From the agent’s perspective, the tool returns only its final return value, which is what gets added to the conversation history and passed back to the model.