Request Lifecycle
本文档详细描述一个请求从进入 API 到返回结果的完整生命周期。
整体流程
sequenceDiagram
participant Client
participant FastAPI
participant StreamManager
participant Controller
participant Graph
participant Agent
participant Tool
participant DB
Client->>FastAPI: POST /api/v1/chat (+ Bearer Token)
FastAPI->>FastAPI: JWT 验证 + DB 校验用户状态
FastAPI->>DB: 确保 Conversation 存在(校验 ownership)
FastAPI->>StreamManager: create_stream(thread_id, owner_user_id)
FastAPI->>Client: 返回 {thread_id, stream_url}
FastAPI->>Controller: 后台启动 stream_execute()
Controller->>Graph: astream(initial_state)
loop Agent Execution
Graph->>Agent: agent.stream(messages)
Agent->>Agent: LLM 调用
Agent->>Graph: yield StreamEvent
Graph->>StreamManager: push(llm_chunk)
opt Tool Call
Graph->>Tool: execute_tool()
Tool->>Graph: ToolResult
Graph->>StreamManager: push(tool_complete)
end
end
Graph->>Controller: 最终状态
Controller->>DB: 保存响应
Controller->>StreamManager: push(complete)
Client->>FastAPI: GET /stream/{thread_id} (+ Bearer Token)
FastAPI->>FastAPI: JWT 验证
FastAPI->>StreamManager: consume_events(thread_id, user_id)
StreamManager-->>Client: SSE 事件流
Phase 1: 请求接收与初始化
API 层处理
# POST /api/v1/chat
{
"content": "帮我分析一下 Python 异步编程的最佳实践",
"conversation_id": null, # null 表示新对话
"parent_message_id": null
}
依赖注入链(src/api/dependencies.py):
flowchart LR
subgraph Auth["认证"]
Bearer["HTTPBearer"]
CU["get_current_user()<br/>JWT 解码 + DB 校验"]
end
subgraph Request["请求级别"]
Session["get_db_session()<br/>AsyncSession"]
AM["get_artifact_manager()"]
CM["get_conversation_manager()"]
Ctrl["get_controller()"]
end
subgraph Global["全局单例"]
CP["get_checkpointer()<br/>AsyncSqliteSaver"]
SM["get_stream_manager()<br/>StreamManager"]
end
HTTP["HTTP Request"] --> Bearer --> CU
HTTP --> Session
CU --> Session
Session --> AM
Session --> CM
Session --> Ctrl
HTTP --> CP
HTTP --> SM
get_current_user() 在每个受保护端点的入口执行:解码 JWT → 查 DB 校验 is_active 和最新 role(确保禁用/降权即时生效)。/health 和 /auth/login 不需要认证。
Conversation 创建
如果 conversation_id 为空,Controller 会在 _stream_new_message() 中:
- 调用
conversation_manager.start_conversation_async()生成新 ID - 在数据库创建
Conversation记录 - Artifact session ID 与 conversation ID 相同
# src/core/controller.py - _stream_new_message() 中的逻辑
if not conversation_id:
conversation_id = await self.conversation_manager.start_conversation_async()
else:
await self.conversation_manager.ensure_conversation_exists(conversation_id)
注意:用户归属由 API 层(chat.py)负责,不在 Controller 中处理。Router 层在进入 Controller 之前完成:
- 新建会话时,ensure_conversation_exists(conversation_id, user_id=user_id) 绑定 owner
- 已有会话时,_verify_ownership() 校验 conv.user_id == current_user.user_id,不匹配返回 404
Phase 2: 状态准备
对话历史格式化
ConversationManager 从消息树中提取当前分支的历史:
# 树状结构示例
# msg_1 (root)
# / \
# msg_2 msg_3
# |
# msg_4 (current)
# 格式化结果(从 root 到 current 的路径)
# 返回 List[Dict],用于后续构建 LLM messages
conversation_history = [
{"role": "user", "content": "第一条消息"},
{"role": "assistant", "content": "第一条回复"},
{"role": "user", "content": "第二条消息"},
{"role": "assistant", "content": "第二条回复"}
]
初始状态构建
# src/core/engine.py - create_initial_state()
initial_state = {
"current_task": "帮我分析一下 Python 异步编程的最佳实践",
"session_id": conversation_id, # session_id 与 conversation_id 相同,用于对话与Artifact映射
"thread_id": f"thd-{uuid4().hex}",
"conversation_history": [...], # List[Dict],格式化的对话历史
"phase": ExecutionPhase.LEAD_EXECUTING,
"current_agent": "lead_agent",
"subagent_pending": None,
"pending_tool_call": None,
"agent_memories": {},
"compression_level": "normal",
"user_message_id": f"msg-{uuid4().hex}",
"graph_response": None,
"execution_metrics": create_initial_metrics()
}
Phase 3: Graph 执行
执行模式
ArtifactFlow 使用 LangGraph 的 stream_mode="custom" 实现流式输出:
# src/core/controller.py
async for event in self.graph.astream(
initial_state,
config={"configurable": {"thread_id": thread_id}},
stream_mode="custom"
):
yield event # StreamEvent
Agent 节点执行
每个 Agent 节点执行单轮 LLM 调用:
flowchart TD
A[准备 Messages] --> B[LLM 调用]
B --> C{解析响应}
C -->|tool_call| D[设置 TOOL_EXECUTING]
C -->|call_subagent| E[设置 SUBAGENT_EXECUTING]
C -->|完成| F[设置 COMPLETED]
D --> G[更新状态]
E --> G
F --> G
消息构建(src/core/context_manager.py):
ContextManager.build_agent_messages() 按顺序拼装 messages 列表:
# Part 1: System prompt
messages = [{"role": "system", "content": system_prompt}]
# Part 2: Conversation history(仅 Lead Agent)
# 自动压缩,至少保留最近 4 条(2 轮对话)
if agent.config.name == "lead_agent" and conversation_history:
messages.extend(compressed_history)
# Part 3: Current instruction
messages.append({"role": "user", "content": instruction})
# Part 4: Tool interactions(如果有)
# assistant/user 交替的工具调用历史
if tool_interactions:
messages.extend(compressed_tool_interactions)
# Part 5: Pending tool result(如果有)
if pending_tool_result:
messages.append({"role": "user", "content": formatted_result})
状态路由
路由基于 ExecutionPhase 决定下一个节点(定义在 _add_routing_rules() 中):
# src/core/graph.py - route_func 内部函数
def route_func(state: AgentState) -> str:
phase = state["phase"]
if phase == ExecutionPhase.TOOL_EXECUTING:
return "tool_execution"
elif phase == ExecutionPhase.SUBAGENT_EXECUTING:
return state["subagent_pending"]["target"]
elif phase == ExecutionPhase.LEAD_EXECUTING:
return "lead_agent"
elif phase == ExecutionPhase.COMPLETED:
return END
else:
return END
工具执行节点
统一处理所有工具调用,包括权限检查:
flowchart TD
A[获取 pending_tool_call] --> B[查找工具]
B --> C{检查权限}
C -->|AUTO| D[直接执行]
C -->|CONFIRM| E[interrupt 请求确认]
E --> F{用户响应}
F -->|approved| D
F -->|denied| G[返回拒绝结果]
D --> H[记录 metrics]
H --> I[返回原 Agent]
Phase 4: 事件流与 SSE
事件类型
# src/core/events.py
class StreamEventType(Enum):
# Controller 层
METADATA = "metadata"
COMPLETE = "complete"
ERROR = "error"
# Agent 层
AGENT_START = "agent_start"
LLM_CHUNK = "llm_chunk"
LLM_COMPLETE = "llm_complete"
AGENT_COMPLETE = "agent_complete"
# Graph 层
TOOL_START = "tool_start"
TOOL_COMPLETE = "tool_complete"
PERMISSION_REQUEST = "permission_request"
PERMISSION_RESULT = "permission_result"
事件缓冲
由于 POST 请求立即返回,SSE 连接稍后建立,需要缓冲中间事件:
sequenceDiagram
participant Client
participant POST as POST /chat
participant Queue as StreamManager
participant Graph
participant GET as GET /stream
Client->>POST: 发送消息
POST->>Queue: create_stream(thread_id)
Note over Queue: TTL 计时器启动 (30s)
POST->>Client: 返回 thread_id
POST->>Graph: 后台执行
Graph->>Queue: push(agent_start)
Graph->>Queue: push(llm_chunk)
Client->>GET: 连接 SSE
GET->>Queue: consume_events(thread_id)
Note over Queue: 取消 TTL 计时器
Queue->>GET: yield events
GET->>Client: SSE 推送
Graph->>Queue: push(complete)
Queue->>GET: yield complete
GET->>Client: SSE: complete
Note over Queue: 清理队列
SSE 格式
使用标准 SSE event: 字段区分事件类型,data: JSON 内同时保留 type 字段以便统一解析:
event: metadata
data: {"type":"metadata","timestamp":"...","data":{"conversation_id":"xxx","thread_id":"yyy","message_id":"zzz"}}
event: agent_start
data: {"type":"agent_start","timestamp":"...","agent":"lead_agent","data":{"success":true,"content":"","metadata":{...}}}
event: llm_chunk
data: {"type":"llm_chunk","timestamp":"...","agent":"lead_agent","data":{"success":true,"content":"让我","metadata":{...}}}
event: llm_chunk
data: {"type":"llm_chunk","timestamp":"...","agent":"lead_agent","data":{"success":true,"content":"让我来分析","metadata":{...}}}
event: llm_complete
data: {"type":"llm_complete","timestamp":"...","agent":"lead_agent","data":{"success":true,"content":"...","token_usage":{...}}}
event: agent_complete
data: {"type":"agent_complete","timestamp":"...","agent":"lead_agent","data":{"success":true,"content":"...","routing":{"type":"tool_call",...}}}
event: tool_start
data: {"type":"tool_start","timestamp":"...","agent":"lead_agent","tool":"web_search","data":{"params":{"query":"Python async best practices"}}}
event: tool_complete
data: {"type":"tool_complete","timestamp":"...","agent":"lead_agent","tool":"web_search","data":{"success":true,"duration_ms":1234,"error":null,"params":{"query":"Python async best practices"},"result_data":{...}}}
event: complete
data: {"type":"complete","timestamp":"...","data":{"success":true,"interrupted":false,"response":"...","execution_metrics":{...}}}
关键点:
- llm_chunk.data.content 是累积内容,每次事件包含从头开始的完整文本
- tool_complete 包含 params(工具调用参数)和 result_data(工具返回的业务数据),前端可用于追踪工具执行详情
- complete 事件的 interrupted 字段指示是否需要用户确认权限
详细字段说明见 API Reference。
Phase 5: 权限中断与恢复
当工具需要用户确认时:
中断流程
# src/core/graph.py - tool_execution_node
if tool.permission == ToolPermission.CONFIRM:
# 发送权限请求事件
writer({
"type": StreamEventType.PERMISSION_REQUEST.value,
"agent": from_agent,
"tool": tool_name,
"timestamp": datetime.now().isoformat(),
"data": {
"permission_level": tool.permission.value,
"params": params
}
})
# 暂停执行,等待用户确认
is_approved = interrupt({
"type": "tool_permission",
"agent": from_agent,
"tool_name": tool_name,
"params": params,
"permission_level": tool.permission.value,
"message": f"Tool '{tool_name}' requires {tool.permission.value} permission"
})
恢复流程
# POST /api/v1/chat/{conversation_id}/resume
{
"thread_id": "thd-xxx",
"message_id": "msg-yyy", # 要更新的消息 ID
"approved": true
}
# Controller 恢复执行
await graph.astream(
Command(resume=approved),
config={"configurable": {"thread_id": thread_id}},
stream_mode="custom"
)
Phase 6: 结果持久化
数据持久化采用短事务模式:每次写操作在 Repository 层立即 flush() + commit(),而非在 session 退出时统一提交。这避免了长事务持有 SQLite 写锁导致的 database is locked 错误。
# 更新消息的 graph_response(Repository 内部立即 commit)
await self.conversation_manager.update_response_async(
conv_id=conversation_id,
message_id=message_id,
response=final_response
)
# Artifact 变更已在执行过程中通过工具保存(每次工具调用独立 commit)
# 版本历史自动记录
数据流总结
flowchart TB
Client["Client"]
subgraph API["API Layer"]
FastAPI["FastAPI"]
SSE["SSE Stream"]
end
subgraph Core["Core Layer"]
Controller["Controller"]
Graph["Graph"]
SM["StreamManager"]
end
subgraph Agents["Agents"]
Lead["Lead"]
Search["Search"]
Crawl["Crawl"]
end
Tools["Tools"]
subgraph DB["Database"]
Convos["Conversations"]
Artifacts["Artifacts"]
end
Client -->|HTTP Request| FastAPI
FastAPI --> Controller
Controller --> Graph
Graph --> Agents
Agents --> Tools
Tools --> DB
Controller --> DB
Graph -->|Events| SM
SM --> SSE
SSE -->|SSE| Client