Skip to main content
Streaming enables agents to return outputs immediately, significantly improving user experience. Instead of waiting for the entire response to be generated, the user can see the response being generated in real-time.

Streaming Responses

In Agency Swarm, streaming is handled through the get_response_stream method. The framework returns StreamEvent objects as they are returned by OpenAI, providing direct access to the underlying streaming events.
async def stream_response(message: str):
    """Stream a response and handle events properly."""
    full_text = ""

    async for event in agency.get_response_stream(message):
        # Handle streaming events with data
        if hasattr(event, "data"):
            data = event.data

            # Only capture actual response text, not tool call arguments
            if hasattr(data, "delta") and hasattr(data, "type"):
                if data.type == "response.output_text.delta":
                    # Stream the actual response text in real-time
                    delta_text = data.delta
                    if delta_text:
                        print(delta_text, end="", flush=True)
                        full_text += delta_text
                # Skip tool call deltas (we don't want to show those to users)
                elif data.type == "response.function_call_arguments.delta":
                    continue

        # Handle validation errors
        elif isinstance(event, dict):
            event_type = event.get("event", event.get("type"))
            if event_type == "error":
                print(f"\n❌ Error: {event.get('content', event.get('data', 'Unknown error'))}")
                break

    print("\n✅ Stream complete")
    return full_text

# Usage
await stream_response("I want you to build me a website")

Streaming from Tools

Tools can inject events into the parent SSE stream while they execute. This is especially useful when a tool runs a sub-agent or a long-running operation and you want the client to receive live progress updates alongside the normal agent output.

Accessing the Streaming Context

Inside any BaseTool.run() method, two properties give you access to the live stream:
FieldTypeWhat it is
self.context.streaming_contextStreamingContext | NoneThe queue that feeds the parent stream. None when the request is not streaming.
self.tool_call_idstr | NoneThe unique ID of this specific tool invocation. Use it to let clients correlate events with the tool call.
streaming_ctx = self.context.streaming_context
tool_call_id  = self.tool_call_id or ""
streaming_ctx is None when the request is not streaming, so always guard before calling put_event. Any object passed to streaming_ctx.put_event(event) is forwarded into the same stream your client is already consuming. There are three ways to use this, depending on how informative you want the events to be.

Method 1: Raw values

The simplest option — emit any plain value directly. Useful for quick progress strings during development.
class MyTool(BaseTool):
    """Does something and reports progress."""

    async def run(self):
        streaming_ctx = self.context.streaming_context

        if streaming_ctx:
            await streaming_ctx.put_event("step 1/3: fetching data…")

        data = await _fetch()

        if streaming_ctx:
            await streaming_ctx.put_event("step 2/3: processing…")

        result = await _process(data)

        if streaming_ctx:
            await streaming_ctx.put_event("step 3/3: done.")

        return result
On the consumer side:
async for event in agency.get_response_stream(message):
    if isinstance(event, str):
        print(f"[tool] {event}")

Method 2: Existing SDK event types

While neither the OpenAI API nor the agents SDK contains event types specifically designed for streaming output from tool execution, you can construct a RawResponsesStreamEvent wrapping a ResponseTextDeltaEvent directly inside the tool. The internal tracking fields (item_id, content_index, etc.) just need placeholder values — the consumer’s existing delta-handling code treats it identically to a normal LLM text delta.
from agents import RawResponsesStreamEvent
from openai.types.responses import ResponseTextDeltaEvent
from agency_swarm.tools import BaseTool
from pydantic import Field


class MyTool(BaseTool):
    """Does something and streams progress as text deltas."""

    task: str = Field(..., description="The task to perform")

    async def run(self):
        streaming_ctx = self.context.streaming_context

        async def emit_delta(text: str) -> None:
            if streaming_ctx:
                await streaming_ctx.put_event(
                    RawResponsesStreamEvent(
                        data=ResponseTextDeltaEvent(
                            content_index=0,
                            output_index=0,
                            sequence_number=0,
                            item_id="tool_progress",
                            logprobs=[],
                            delta=text,
                            type="response.output_text.delta",
                        )
                    )
                )

        await emit_delta("Starting…")
        result = await _perform_task(self.task)
        await emit_delta(" Done.")
        return result
On the consumer side, these events are indistinguishable from regular LLM text deltas unless you inspect the item_id you set on the event:
from agents import RawResponsesStreamEvent
from openai.types.responses import ResponseTextDeltaEvent

async for event in agency.get_response_stream(message):
    if isinstance(event, RawResponsesStreamEvent) and isinstance(event.data, ResponseTextDeltaEvent):
        if event.data.item_id == "tool_progress":
            print(f"[tool] {event.data.delta}", end="", flush=True)
        else:
            print(event.data.delta, end="", flush=True)

Method 3: Custom event model

For the most informative output you can construct your own class that includes structured metadata — status, tool call ID for correlation, error details, etc. No built-in SDK type covers tool execution progress, so this is the recommended approach for production use, but it will require explicit checks on the consumer side to separate these events from standard SDK events.
from typing import Literal
from pydantic import BaseModel, Field
from agency_swarm.tools import BaseTool


class ToolProgressEvent(BaseModel):
    """Streaming progress event emitted by a tool invocation."""

    call_id: str
    status: Literal["in_progress", "completed", "failed"] = "in_progress"
    delta: str | None = None
    error: str | None = None


class LongRunningTool(BaseTool):
    """A tool that reports live progress through the parent stream."""

    task: str = Field(..., description="The task to perform")

    async def run(self):
        streaming_ctx = self.context.streaming_context
        tool_call_id = self.tool_call_id or ""

        async def emit(status: str, delta: str | None = None, error: str | None = None) -> None:
            if streaming_ctx:
                await streaming_ctx.put_event(
                    ToolProgressEvent(call_id=tool_call_id, status=status, delta=delta, error=error)
                )

        await emit("in_progress", delta="Starting…")

        try:
            result = await _perform_task(self.task)
            await emit("completed")
            return result
        except Exception as exc:
            await emit("failed", error=str(exc))
            raise
On the consumer side, filter by type:
async for event in agency.get_response_stream(message):
    if isinstance(event, ToolProgressEvent):
        print(f"[{event.status}] {event.delta or ''}")
Regardless of the chosen method, events emitted via put_event() are only visible to the client consuming the stream — the calling agent never sees them. From the agent’s perspective, the tool returns only its final return value, which is what gets added to the conversation history and passed back to the model.