Agent 系统
Agent 是 ArtifactFlow 中的智能执行单元,每个 Agent 专注于特定类型的任务。
模块结构
src/agents/
├── base.py # Agent 基类和核心数据结构
├── lead_agent.py # Lead Agent(任务协调)
├── search_agent.py # Search Agent(信息检索)
└── crawl_agent.py # Crawl Agent(内容采集)
核心数据结构 (base.py)
AgentConfig
Agent 配置:
@dataclass
class AgentConfig:
name: str # Agent 名称(唯一标识)
description: str # Agent 描述
# 元信息(用于注册到 Lead、创建 toolkit)
capabilities: List[str] = [] # 能力列表
required_tools: List[str] = [] # 所需工具名称列表
# LLM 配置
model: str = "qwen3.5-flash-no-thinking" # LLM 模型(采样参数跟模型走,配在 MODEL_CONFIGS.extra_params)
max_tool_rounds: int = 3 # 单次执行最大工具调用轮数
streaming: bool = False # 是否默认流式输出
llm_max_retries: int = 3 # LLM 调用最大重试次数
llm_retry_delay: float = 1.0 # 初始重试延迟(秒)
AgentResponse
Agent 单轮执行的响应:
@dataclass
class AgentResponse:
success: bool = True # 执行是否成功
content: str = "" # 回复内容或错误信息
tool_interactions: List[Dict] = [] # assistant-tool 交互历史(用于恢复)
reasoning_content: Optional[str] = None # 思考过程(支持 reasoning model)
metadata: Dict[str, Any] = {} # 元数据
routing: Optional[Dict[str, Any]] = None # 路由信息
token_usage: Optional[Dict[str, Any]] = None # Token 统计
routing 字段结构:
# 工具调用
routing = {
"type": "tool_call",
"tool_name": "web_search",
"params": {"query": "..."}
}
# 调用 SubAgent
routing = {
"type": "subagent",
"target": "search_agent",
"instruction": "搜索相关信息..."
}
# 任务完成(无 routing 字段)
# 当 routing 为 None 时表示 Agent 完成执行
BaseAgent 基类
核心职责
- 单轮 LLM 调用:每次执行只调用一次 LLM
- 响应解析:解析 XML 格式的工具调用
- 流式输出:支持 LLM token 级别的流式
- 错误重试:LLM 调用失败自动重试
抽象方法
子类必须实现:
class BaseAgent(ABC):
@abstractmethod
def build_system_prompt(self, context: Optional[Dict[str, Any]] = None) -> str:
"""
构建系统提示词(不含工具说明)
Args:
context: 动态上下文(如 artifacts_inventory)
Returns:
系统提示词
"""
pass
@abstractmethod
def format_final_response(self, content: str) -> str:
"""
格式化最终响应
Args:
content: LLM 的最终回复
Returns:
格式化后的响应
"""
pass
工具说明由 build_complete_system_prompt() 自动追加到 build_system_prompt() 结果后面。
执行流程
async def stream(
self,
messages: List[Dict],
is_resuming: bool = False
) -> AsyncGenerator[StreamEvent, None]:
"""
流式执行 Agent(单轮 LLM 调用)
Args:
messages: 完整的消息列表(由 ContextManager 构建)
is_resuming: 是否从工具执行恢复
"""
# 1. 发送 Agent 开始事件
yield StreamEvent(type=AGENT_START, agent=self.config.name, data=current_response)
# 2. 流式调用 LLM(带重试)
stream = await self._call_llm_with_retry(messages, streaming=True)
async for chunk in stream:
if chunk["type"] == "content":
response_content += chunk["content"]
yield StreamEvent(type=LLM_CHUNK, agent=self.config.name, data=current_response)
elif chunk["type"] == "reasoning":
reasoning_content += chunk["content"]
yield StreamEvent(type=LLM_CHUNK, agent=self.config.name, data=current_response)
yield StreamEvent(type=LLM_COMPLETE, agent=self.config.name, data=current_response)
# 3. 解析响应(内联处理)
tool_calls = parse_tool_calls(response_content)
if tool_calls:
tool_call = tool_calls[0] # 每轮只处理一个
if tool_call.name == "call_subagent":
result = await self.toolkit.execute_tool("call_subagent", tool_call.params)
if result.success:
current_response.routing = {"type": "subagent", ...}
else:
current_response.routing = {"type": "tool_call", ...} # 验证失败
else:
current_response.routing = {"type": "tool_call", ...}
# 4. 发送 Agent 完成事件
yield StreamEvent(type=AGENT_COMPLETE, agent=self.config.name, data=current_response)
响应解析
响应解析在 _execute_generator() 中内联处理:
# 在 _execute_generator 中
tool_calls = parse_tool_calls(response_content)
if tool_calls:
tool_call = tool_calls[0] # 每轮只处理一个工具调用
if tool_call.name == "call_subagent":
# 先调用 execute() 验证参数
result = await self.toolkit.execute_tool("call_subagent", tool_call.params)
if result.success:
# 验证通过,设置 subagent 路由
current_response.routing = {
"type": "subagent",
"target": result.data["agent_name"],
"instruction": result.data["instruction"]
}
else:
# 验证失败,当作普通 tool_call 让 graph 返回错误
current_response.routing = {
"type": "tool_call",
"tool_name": tool_call.name,
"params": tool_call.params
}
else:
current_response.routing = {
"type": "tool_call",
"tool_name": tool_call.name,
"params": tool_call.params
}
yield StreamEvent(type=AGENT_COMPLETE, ...)
return
# 无工具调用 → Agent 完成
# routing 为 None,由 Graph 层决定是 COMPLETED 还是返回 Lead
final_response = self.format_final_response(content)
current_response.content = final_response
yield StreamEvent(type=AGENT_COMPLETE, ...)
注意:工具轮数限制由 Graph 层控制(读取 agent.config.max_tool_rounds),不在 Agent 内部判断。
Lead Agent
角色定位
Lead Agent 是任务的总协调者:
- 理解用户意图,制定任务计划
- 管理 Task Plan Artifact
- 调度 SubAgent 执行具体任务
- 整合结果到 Result Artifact
- 与用户交互,响应反馈
配置
class LeadAgent(BaseAgent):
def __init__(self, config: Optional[AgentConfig] = None, toolkit=None):
if not config:
config = AgentConfig(
name="lead_agent",
description="Task coordinator and information integrator",
required_tools=[
"create_artifact",
"update_artifact",
"rewrite_artifact",
"read_artifact",
"call_subagent"
],
model="qwen3.5-plus", # 使用思考模型
max_tool_rounds=100, # 需要更多轮次协调
streaming=True
)
super().__init__(config, toolkit)
# 注册的子 Agent 配置(用于生成 system prompt)
self.sub_agents: Dict[str, AgentConfig] = {}
def register_subagent(self, config: AgentConfig):
"""注册子 Agent,使其出现在 system prompt 中"""
self.sub_agents[config.name] = config
系统提示词结构
def build_system_prompt(self, context: Optional[Dict[str, Any]] = None) -> str:
"""
构建 Lead Agent 系统提示词
Args:
context: 包含 artifacts_inventory、user_feedback 等上下文
提示词结构(使用 XML 标签组织):
- <system_time>: 当前时间
- <agent_role>: 角色定义和核心职责
- <execution_flow>: 执行流程和指导原则
- <task_planning_strategy>: 任务规划策略
- <artifact_management>: Artifact 管理规范
- <available_subagents>: 可用的子 Agent 列表(动态生成)
- <current_context>: 当前上下文(artifacts_inventory、user_feedback)
"""
提示词核心结构:
<agent_role>
You are lead_agent, the Lead Agent coordinating a multi-agent system.
## Your Role and Responsibilities
1. Task Planning: Analyze user requests and create structured task plans
2. Coordination: Delegate specific tasks to specialized sub-agents
3. Integration: Synthesize information from various sources
4. Quality Control: Ensure quality and completeness
</agent_role>
<artifact_management>
## Artifact Management
### Task Plan Artifact (ID: "task_plan")
- 固定 ID,用于任务跟踪
### Result Artifacts (Flexible IDs)
- 根据用户需求创建,如 "research_report"、"main.py" 等
</artifact_management>
<available_subagents>
## Available Sub-Agents
(动态生成已注册的子 Agent 列表)
</available_subagents>
Search Agent
角色定位
专注于信息检索:
- 优化搜索查询
- 多轮迭代搜索
- 结果筛选与结构化
配置
class SearchAgent(BaseAgent):
def __init__(self, config: Optional[AgentConfig] = None, toolkit=None):
if not config:
config = AgentConfig(
name="search_agent",
description="Web search and information retrieval specialist",
capabilities=["Web search", "Information retrieval"],
required_tools=["web_search"],
model="qwen3.5-flash-no-thinking",
max_tool_rounds=3, # 最多 3 轮搜索优化
streaming=True
)
super().__init__(config, toolkit)
工作模式
flowchart TD
A[收到搜索指令] --> B[分析搜索意图]
B --> C[构造搜索查询]
C --> D[执行 web_search]
D --> E{结果是否充分?}
E -->|否| F[优化查询]
F --> C
E -->|是| G[整理结构化结果]
G --> H[返回给 Lead Agent]
Crawl Agent
角色定位
专注于内容采集:
- 网页内容抓取
- 内容解析与提取
- 处理异常情况
配置
class CrawlAgent(BaseAgent):
def __init__(self, config: Optional[AgentConfig] = None, toolkit=None):
if not config:
config = AgentConfig(
name="crawl_agent",
description="Web content extraction and cleaning specialist",
capabilities=[
"Deep content extraction",
"Web scraping",
"IMPORTANT: Instructions must include a specific URL to crawl"
],
required_tools=["web_fetch"],
model="qwen3.5-flash-no-thinking",
max_tool_rounds=2, # 通常 1-2 轮即可
streaming=True
)
super().__init__(config, toolkit)
Agent 协作模式
sequenceDiagram
participant User
participant Lead
participant Search
participant Crawl
participant Artifact
User->>Lead: 分析 Python 异步编程最佳实践
Lead->>Artifact: 创建 Task Plan
Lead->>Search: 搜索相关资料
Search->>Search: web_search x N
Search->>Lead: 返回搜索结果
Lead->>Crawl: 抓取重点文章
Crawl->>Crawl: web_fetch x N
Crawl->>Lead: 返回文章内容
Lead->>Artifact: 更新 Result
Lead->>User: 返回分析报告
Agent 记忆系统
每个 Agent 维护独立的记忆(NodeMemory),用于跨轮次保持上下文:
# state["agent_memories"] 结构
{
"lead_agent": {
"tool_interactions": [...], # assistant-tool 交互历史
"last_response": {...}, # 最后的 AgentResponse
"tool_round_count": 2 # 当前工具调用轮数
},
"search_agent": {...},
"crawl_agent": {...}
}
记忆在 merge_agent_response_to_state() 中自动更新:
# 自动合并 tool_interactions
if response.tool_interactions:
memory["tool_interactions"].extend(response.tool_interactions)
# 自动更新 last_response
memory["last_response"] = {
"success": response.success,
"content": response.content,
"metadata": response.metadata,
"reasoning": response.reasoning_content
}
注意:可观测性指标(执行时间、token 统计等)已移至 ExecutionMetrics,由 Graph 层记录。
添加新 Agent
参见 Extension Guide。