Streaming enables agents to return outputs immidiately, significantly improving user experience. Instead of waiting for the entire response to be generated, the user can see the response being generated in real-time.
Streaming Responses
To stream the conversation between agents, you can use the get_completion_stream
method with your own event handler. The process follows a similar pattern to the official OpenAI documentation.
The only difference is that you must extend the AgencyEventHandler
class, which has 2 additional properties: agent_name
and recipient_agent_name
, to get the names of the agents communicating with each other. (See the on_text_created
below.)
from typing_extensions import override
from agency_swarm import AgencyEventHandler
class EventHandler(AgencyEventHandler):
@override
def on_text_created(self, text) -> None:
# Get the name of the agent that is sending the message
print(f"\n{self.recipient_agent_name} @ {self.agent_name} > ", end="", flush=True)
@override
def on_text_delta(self, delta, snapshot):
print(delta.value, end="", flush=True)
def on_tool_call_created(self, tool_call):
print(f"\n{self.recipient_agent_name} > {tool_call.type}\n", flush=True)
def on_tool_call_delta(self, delta, snapshot):
if delta.type == 'code_interpreter':
if delta.code_interpreter.input:
print(delta.code_interpreter.input, end="", flush=True)
if delta.code_interpreter.outputs:
print(f"\n\noutput >", flush=True)
for output in delta.code_interpreter.outputs:
if output.type == "logs":
print(f"\n{output.logs}", flush=True)
@classmethod
def on_all_streams_end(cls):
print("\n\nAll streams have ended.") # Conversation is over and message is returned to the user.
response = agency.get_completion_stream("I want you to build me a website", event_handler=EventHandler)
The on_all_streams_end
class method is called when all streams have ended. This is particularly important since your
event handler might be called multiple times and possibly by multiple agents, unlike in the official OpenAI
documentation.