StreamEventType
| Event | Description |
|---|---|
TEXT | Text content chunk |
TOOL_CALL_START | Agent calling a tool |
TOOL_CALL_END | Tool call completed |
DONE | Stream completed |
ERROR | Error occurred |
Basic Streaming
Copy
Ask AI
from echo import get_llm, LLMConfig, StreamEventType, ConversationContext
llm = get_llm(LLMConfig(provider="openai", model="gpt-4o-mini"))
context = ConversationContext()
# Add user message
context.add_message(Message(
role=MessageRole.USER,
content=[TextMessage(text="Explain diabetes briefly")],
))
# Stream response
async for event in llm.invoke_stream(context, system_prompt="You are helpful."):
if event.type == StreamEventType.TEXT:
print(event.text, end="", flush=True)
elif event.type == StreamEventType.TOOL_CALL_START:
print(f"\n[Tool: {event.json.get('tool_name')}]", end="")
elif event.type == StreamEventType.TOOL_CALL_END:
print(" ✓")
elif event.type == StreamEventType.DONE:
print("\n--- Done ---")
final_response = event.llm_response
context = event.context
elif event.type == StreamEventType.ERROR:
print(f"\n[Error: {event.error}]")
Streaming with Tools
Copy
Ask AI
from echo_agents.tools.elicitation import SelectionElicitationTool
llm = get_llm(LLMConfig(provider="openai", model="gpt-4o-mini"))
tools = [SelectionElicitationTool()]
async for event in llm.invoke_stream(context, tools=tools, system_prompt=prompt):
if event.type == StreamEventType.TEXT:
print(event.text, end="", flush=True)
elif event.type == StreamEventType.TOOL_CALL_START:
print(f"\n🔧 {event.json.get('tool_name')}")
elif event.type == StreamEventType.TOOL_CALL_END:
print(" ✓ Done")
elif event.type == StreamEventType.DONE:
# Check for elicitations
if event.llm_response.elicitations:
elicit = event.llm_response.elicitations[0]
print(f"\n📋 {elicit.details.input.get('text')}")
for opt in elicit.details.input.get('options', []):
print(f" • {opt}")
DONE Event
TheDONE event contains:
Copy
Ask AI
elif event.type == StreamEventType.DONE:
response = event.llm_response
# Final text
print(response.text)
# Verbose tool calls
for item in response.verbose:
if item.type == "tool":
print(f"Tool used: {item.tool_name}")
# Elicitations (UI components)
for elicit in response.elicitations:
print(f"UI: {elicit.details.component}")
# Updated context for next turn
context = event.context

