Basic Agent
Copy
Ask AI
import asyncio
from echo import (
GenericAgent, AgentConfig, LLMConfig, PersonaConfig, TaskConfig,
ConversationContext, Message, MessageRole, TextMessage
)
async def main():
agent = GenericAgent(
agent_config=AgentConfig(
persona=PersonaConfig(
role="Health Educator",
goal="Explain health topics simply",
backstory="A friendly health educator",
),
task=TaskConfig(
description="Answer health questions clearly",
expected_output="Simple, accurate explanations",
),
),
llm_config=LLMConfig(provider="openai", model="gpt-4o-mini"),
tools=[],
)
context = ConversationContext()
context.add_message(Message(
role=MessageRole.USER,
content=[TextMessage(text="What is cholesterol?")],
))
result = await agent.run(context)
print(result.llm_response.text)
asyncio.run(main())
Streaming with Elicitation
Copy
Ask AI
import asyncio
from echo import get_llm, LLMConfig, StreamEventType, ConversationContext
from echo_agents.tools.elicitation import SelectionElicitationTool
async def main():
llm = get_llm(LLMConfig(provider="openai", model="gpt-4o-mini"))
tools = [SelectionElicitationTool()]
context = ConversationContext()
system_prompt = """You are a medical assistant collecting symptoms.
Use elicit_selection to present symptom options as buttons."""
context.add_message(Message(
role=MessageRole.USER,
content=[TextMessage(text="I'm not feeling well")],
))
print("Assistant: ", end="", flush=True)
async for event in llm.invoke_stream(context, tools=tools, system_prompt=system_prompt):
if event.type == StreamEventType.TEXT:
print(event.text, end="", flush=True)
elif event.type == StreamEventType.TOOL_CALL_START:
print(f"\n[Tool: {event.json.get('tool_name')}]")
elif event.type == StreamEventType.DONE:
print()
if event.llm_response.elicitations:
elicit = event.llm_response.elicitations[0]
print(f"\n📋 {elicit.details.input.get('text')}")
for opt in elicit.details.input.get('options', []):
print(f" • {opt}")
asyncio.run(main())
MCP Agent
Copy
Ask AI
import asyncio
import os
from echo import (
GenericAgent, AgentConfig, LLMConfig, PersonaConfig, TaskConfig,
MCPConnectionManager, MCPServerConfig, MCPTransport,
ConversationContext, Message, MessageRole, TextMessage
)
async def main():
# Connect to Eka MCP
manager = MCPConnectionManager(MCPServerConfig(
transport=MCPTransport.STREAMABLE_HTTP,
url="https://mcp.eka.care/mcp",
headers={"x-eka-jwt-payload": os.getenv("EKA_JWT_PAYLOAD")},
))
mcp_tools = await manager.get_tools()
print(f"Tools: {[t.name for t in mcp_tools]}")
agent = GenericAgent(
agent_config=AgentConfig(
persona=PersonaConfig(
role="Medical Assistant",
goal="Help with patient queries using tools",
backstory="A helpful medical AI with access to patient data",
),
task=TaskConfig(
description="Use tools to answer queries accurately",
expected_output="Information from tools when appropriate",
),
),
tools=mcp_tools,
llm_config=LLMConfig(provider="openai", model="gpt-4o-mini", max_iterations=5),
)
context = ConversationContext()
context.add_message(Message(
role=MessageRole.USER,
content=[TextMessage(text="Search for patient Kumar")],
))
result = await agent.run(context)
# Show tool calls
for item in result.llm_response.verbose:
if item.type == "tool":
print(f"[Tool] {item.tool_name}")
print(f"\nAssistant: {result.llm_response.text}")
await MCPConnectionManager.cleanup_all()
asyncio.run(main())
Multi-Turn Conversation
Copy
Ask AI
async def conversation():
agent = GenericAgent(
agent_config=agent_config,
llm_config=llm_config,
tools=tools,
)
context = ConversationContext()
for i in range(10):
user_input = input(f"[{i+1}] You: ").strip()
if user_input.lower() == "quit":
break
context.add_message(Message(
role=MessageRole.USER,
content=[TextMessage(text=user_input)],
))
result = await agent.run(context)
print(f"Assistant: {result.llm_response.text}\n")
# Update context for next turn
context = result.context

