跳转至

Core 模块

Core 模块是 ArtifactFlow 的核心引擎,负责状态管理、工作流编排和执行控制。

模块结构

src/core/
├── engine.py             # Pi-style 执行引擎(含状态创建和 ExecutionMetrics)
├── events.py             # 事件类型定义(StreamEventType + ExecutionEvent)
├── controller.py         # 执行控制器
├── context_manager.py    # 上下文管理
└── conversation_manager.py # 对话管理

状态系统 (state.py)

ExecutionPhase

执行阶段枚举,控制工作流路由:

class ExecutionPhase(Enum):
    LEAD_EXECUTING = "lead_executing"          # Lead Agent 执行中
    SUBAGENT_EXECUTING = "subagent_executing"  # SubAgent 执行中
    TOOL_EXECUTING = "tool_executing"          # 工具执行中
    WAITING_PERMISSION = "waiting_permission"  # 等待权限确认
    COMPLETED = "completed"                    # 任务完成

AgentState

LangGraph 全局状态,贯穿整个执行流程:

class AgentState(TypedDict):
    # 基础信息
    current_task: str                           # 当前任务描述
    session_id: str                             # Artifact 会话 ID
    thread_id: str                              # LangGraph 线程 ID

    # 对话上下文
    conversation_history: Optional[List[Dict]]  # 格式化的对话历史

    # 执行控制
    phase: ExecutionPhase                       # 当前执行阶段
    current_agent: str                          # 当前 Agent 名称

    # 路由信息
    subagent_pending: Optional[Dict]            # {"target", "instruction", "subagent_result"}
    pending_tool_call: Optional[Dict]           # {"tool_name", "params", "from_agent", "tool_result"}

    # 记忆系统
    agent_memories: Dict[str, NodeMemory]       # 各 Agent 的记忆(含 tool_round_count)

    # Context 管理
    compression_level: str                      # 压缩级别

    # 用户交互层
    user_message_id: str                        # 当前用户消息 ID
    graph_response: Optional[str]               # Graph 最终响应

    # 可观测性
    execution_metrics: ExecutionMetrics         # 执行指标

其中 NodeMemory 结构:

class NodeMemory(TypedDict):
    tool_interactions: List[Dict]      # assistant-tool 交互历史
    last_response: Optional[Dict]      # 最后的 AgentResponse
    tool_round_count: int              # 当前节点工具调用计数

状态转换函数

merge_agent_response_to_state() 是状态更新的统一入口(原地修改 state):

def merge_agent_response_to_state(
    state: AgentState,
    agent_name: str,
    response: AgentResponse,
    is_resuming: bool = False
) -> None:
    """
    处理 Agent 响应,更新状态(原地修改)
    - 更新 current_agent
    - 清理恢复状态(如果 is_resuming)
    - 更新 agent_memories(tool_interactions, last_response, tool_round_count)
    - 解析 routing 信息,设置 phase 和路由数据
    """

路由逻辑:

flowchart TD
    A[Agent 返回 Response] --> B{检查 routing.type}
    B -->|tool_call| C[phase = TOOL_EXECUTING]
    B -->|subagent| D[phase = SUBAGENT_EXECUTING]
    B -->|无 routing| E{是 Lead Agent?}
    E -->|是| F[phase = COMPLETED]
    E -->|否| G[phase = LEAD_EXECUTING]
    C --> H[设置 pending_tool_call]
    D --> I[设置 subagent_pending]

事件系统 (events.py)

StreamEventType

统一的事件类型定义,贯穿 Agent、Graph、Controller 三层:

class StreamEventType(Enum):
    # Controller 层事件
    METADATA = "metadata"          # 初始元数据
    COMPLETE = "complete"          # 执行完成
    ERROR = "error"                # 执行错误

    # Agent 层事件
    AGENT_START = "agent_start"    # Agent 开始执行
    LLM_CHUNK = "llm_chunk"        # LLM 流式输出片段
    LLM_COMPLETE = "llm_complete"  # LLM 输出完成
    AGENT_COMPLETE = "agent_complete"  # Agent 执行完成

    # Graph 层事件
    TOOL_START = "tool_start"        # 工具开始执行
    TOOL_COMPLETE = "tool_complete"  # 工具执行完成
    PERMISSION_REQUEST = "permission_request"  # 请求权限确认
    PERMISSION_RESULT = "permission_result"    # 权限确认结果

ExecutionMetrics

可观测性指标,记录执行过程(使用 TypedDict):

class ExecutionMetrics(TypedDict):
    started_at: str                               # ISO timestamp
    completed_at: Optional[str]                   # ISO timestamp
    total_duration_ms: Optional[int]
    agent_executions: List[AgentExecutionRecord]  # append-only
    tool_calls: List[ToolCallRecord]              # append-only

class AgentExecutionRecord(TypedDict):
    agent_name: str
    model: str
    token_usage: TokenUsage
    llm_duration_ms: int
    started_at: str
    completed_at: str

class ToolCallRecord(TypedDict):
    tool_name: str
    success: bool
    duration_ms: int
    called_at: str
    completed_at: str
    agent: str  # 调用方 agent

使用示例:

# 记录 Agent 执行
append_agent_execution(
    metrics=state["execution_metrics"],
    agent_name="lead_agent",
    model="qwen3.5-plus",
    token_usage={"input_tokens": 1000, "output_tokens": 500, "total_tokens": 1500},
    started_at=start_time.isoformat(),
    completed_at=end_time.isoformat(),
    llm_duration_ms=1200
)

# 记录工具调用
append_tool_call(
    metrics=state["execution_metrics"],
    tool_name="web_search",
    success=True,
    duration_ms=1500,
    called_at=start_time.isoformat(),
    completed_at=end_time.isoformat(),
    agent="search_agent"
)

工作流引擎 (graph.py)

ExtendableGraph

基于 LangGraph 的可扩展工作流:

class ExtendableGraph:
    def __init__(self):
        self.workflow = StateGraph(AgentState)
        self.agents: Dict[str, BaseAgent] = {}

    def register_agent(self, agent: BaseAgent):
        """注册 Agent 并创建对应节点"""

    async def compile(
        self,
        checkpointer: Optional[Any] = None,
        interrupt_before: Optional[list] = None,
        db_path: str = "data/langgraph.db"
    ) -> CompiledGraph:
        """编译 Graph,注入 Checkpointer"""

节点类型

Agent 节点:执行单轮 LLM 调用

def _create_agent_node(self, agent_name: str) -> Callable:
    async def agent_node(state: AgentState, writer: StreamWriter) -> AgentState:
        agent = self.agents[agent_name]

        # 构建 messages
        messages = ContextManager.build_agent_messages(
            agent=agent,
            state=state,
            instruction=instruction,
            tool_interactions=tool_interactions,
            pending_tool_result=pending_tool_result
        )

        # 流式执行 Agent
        async for event in agent.stream(messages, is_resuming):
            writer({
                "type": event.type.value,
                "agent": event.agent,
                "timestamp": event.timestamp.isoformat(),
                "data": self._serialize_event_data(event.data)
            })

        # 更新状态(原地修改)
        merge_agent_response_to_state(state, agent_name, final_response, is_resuming)
        return state

    return agent_node

工具执行节点:统一处理工具调用

async def tool_execution_node(state: AgentState, writer: StreamWriter) -> AgentState:
    pending = state.get("pending_tool_call")
    from_agent = pending["from_agent"]
    tool_name = pending["tool_name"]
    params = pending["params"]

    # 获取工具(通过 agent 的 toolkit)
    agent = self.agents.get(from_agent)
    tool = agent.toolkit.get_tool(tool_name)

    # 权限检查
    if tool.permission == ToolPermission.CONFIRM:
        writer({...})  # PERMISSION_REQUEST
        is_approved = interrupt({...})
        if not is_approved:
            pending["tool_result"] = ToolResult(success=False, error="Permission denied")
            state["phase"] = self._get_return_phase(from_agent)
            return state

    # 执行工具
    writer({...})  # TOOL_START
    tool_result = await agent.toolkit.execute_tool(tool_name, params)
    writer({...})  # TOOL_COMPLETE

    # 保存结果并返回原 Agent
    pending["tool_result"] = tool_result
    state["phase"] = self._get_return_phase(from_agent)
    return state

路由规则

def _add_routing_rules(self, agent_name: str):
    def route_func(state: AgentState) -> str:
        phase = state["phase"]
        if phase == ExecutionPhase.TOOL_EXECUTING:
            return "tool_execution"
        elif phase == ExecutionPhase.SUBAGENT_EXECUTING:
            return state["subagent_pending"]["target"]
        elif phase == ExecutionPhase.LEAD_EXECUTING:
            return "lead_agent"
        elif phase == ExecutionPhase.COMPLETED:
            return END
        return END

    # Agent 节点后的路由
    self.workflow.add_conditional_edges(
        agent_name,
        route_func,
        {
            "tool_execution": "tool_execution",
            "lead_agent": "lead_agent",
            "search_agent": "search_agent",
            "crawl_agent": "crawl_agent",
            END: END
        }
    )

状态机图示

以 Lead → Search 调用为例:

stateDiagram-v2
    [*] --> lead_agent: START

    lead_agent --> tool_execution: 工具调用
    lead_agent --> search_agent: 调用 SubAgent
    lead_agent --> [*]: COMPLETED

    search_agent --> tool_execution: 工具调用
    search_agent --> lead_agent: 返回结果

    tool_execution --> lead_agent: 返回原 Agent
    tool_execution --> search_agent: 返回原 Agent

其他 SubAgent(如 crawl_agent)的流程与 search_agent 相同。

执行控制器 (controller.py)

ExecutionController

协调整个执行流程的入口:

class ExecutionController:
    def __init__(
        self,
        compiled_graph,
        artifact_manager: Optional[ArtifactManager] = None,
        conversation_manager: Optional[ConversationManager] = None
    ):
        self.graph = compiled_graph
        self.conversation_manager = conversation_manager or ConversationManager()
        self.artifact_manager = artifact_manager or ArtifactManager()

    async def stream_execute(
        self,
        content: Optional[str] = None,
        thread_id: Optional[str] = None,
        conversation_id: Optional[str] = None,
        parent_message_id: Any = _UNSET,  # _UNSET=auto-detect, None=root, str=specific parent
        message_id: Optional[str] = None,
        resume_data: Optional[Dict] = None
    ) -> AsyncGenerator[Dict[str, Any], None]:
        """流式执行,yield 事件字典"""

    async def execute(self, ...) -> Dict[str, Any]:
        """批量执行,等待完成"""

执行流程

async def _stream_new_message(self, content, conversation_id, parent_message_id):
    # 1. 确保 conversation 存在
    if not conversation_id:
        conversation_id = await self.conversation_manager.start_conversation_async()
    else:
        await self.conversation_manager.ensure_conversation_exists(conversation_id)

    # 2. 自动设置 parent_message_id(仅当未显式提供时)
    # _UNSET = 未传,自动检测;None = 显式创建根消息;str = 指定父消息
    if parent_message_id is _UNSET:
        parent_message_id = await self.conversation_manager.get_active_branch(conversation_id)
    resolved_parent = parent_message_id if isinstance(parent_message_id, str) else None

    # 3. 格式化对话历史
    conversation_history = await self.conversation_manager.format_conversation_history_async(
        conv_id=conversation_id,
        to_message_id=parent_message_id
    )

    # 4. 生成 ID
    message_id = f"msg-{uuid4().hex}"
    thread_id = f"thd-{uuid4().hex}"
    session_id = self._get_or_create_session(conversation_id)

    # 5. 创建初始状态
    initial_state = create_initial_state(
        task=content,
        session_id=session_id,
        thread_id=thread_id,
        message_id=message_id,
        conversation_history=conversation_history
    )

    # 6. 添加消息到 conversation
    await self.conversation_manager.add_message_async(
        conv_id=conversation_id,
        message_id=message_id,
        content=content,
        thread_id=thread_id,
        parent_id=parent_message_id
    )

    # 7. 发送元数据事件
    yield {
        "type": StreamEventType.METADATA.value,
        "timestamp": datetime.now().isoformat(),
        "data": {"conversation_id": conversation_id, "message_id": message_id, "thread_id": thread_id}
    }

    # 8. 执行 Graph
    async for chunk in self.graph.astream(initial_state, config, stream_mode="custom"):
        yield chunk

    # 9. 保存响应
    await self.conversation_manager.update_response_async(
        conv_id=conversation_id,
        message_id=message_id,
        response=response
    )

    # 10. 发送完成事件
    yield {"type": StreamEventType.COMPLETE.value, "data": {...}}

上下文管理 (context_manager.py)

ContextManager

为 Agent 准备执行上下文:

class ContextManager:
    @classmethod
    def prepare_agent_context(cls, state: Dict[str, Any]) -> Dict[str, Any]:
        """
        构建路由上下文(用于系统提示注入):
        - session_id, thread_id, user_message_id
        - artifacts_inventory(Artifact 列表)
        """

    @classmethod
    def build_agent_messages(
        cls,
        agent: Any,  # BaseAgent 实例
        state: Dict[str, Any],
        instruction: str,
        tool_interactions: Optional[List[Dict]] = None,
        pending_tool_result: Optional[Tuple[str, Any]] = None
    ) -> List[Dict]:
        """
        统一构建 Agent messages,拼接顺序:
        system → conversation_history → instruction → tool_interactions → tool_result
        """

消息压缩

基于字符数限制的消息压缩:

COMPRESSION_LEVELS = {
    'full': 160000,
    'normal': 80000,
    'compact': 40000,
    'minimal': 20000
}

@classmethod
def should_compress(cls, messages: List[Dict], level: str = "normal") -> bool:
    """判断是否需要压缩"""

@classmethod
def compress_messages(
    cls,
    messages: List[Dict],
    level: str = "normal",
    preserve_recent: int = 5
) -> List[Dict]:
    """
    压缩消息历史(同步方法,基于字符数截断)
    - 保留最近 N 条完整消息
    - 超出部分添加截断标记
    """

对话管理 (conversation_manager.py)

ConversationManager

管理对话树结构:

class ConversationManager:
    def __init__(self, repository: Optional[ConversationRepository] = None):
        self.repository = repository
        self._cache: Dict[str, ConversationCache] = {}

    async def start_conversation_async(
        self,
        conversation_id: Optional[str] = None
    ) -> str:
        """创建新对话,返回对话 ID"""

    async def add_message_async(
        self,
        conv_id: str,
        message_id: str,
        content: str,
        thread_id: str,
        parent_id: Optional[str] = None
    ) -> Dict:
        """添加消息到对话树"""

    async def format_conversation_history_async(
        self,
        conv_id: str,
        to_message_id: Optional[str] = None
    ) -> List[Dict]:
        """
        格式化从 root 到指定消息的对话历史
        返回: [{"role": "user", "content": ...}, {"role": "assistant", ...}, ...]
        """

    async def get_active_branch(self, conv_id: str) -> Optional[str]:
        """获取对话的活跃分支(当前最新消息 ID)"""

    async def list_conversations_async(
        self,
        limit: int = 50,
        offset: int = 0
    ) -> List[Dict]:
        """列出所有对话"""

树状消息结构

Conversation
    └── Message (root, parent_id=None)
            ├── Message (branch 1)
            │       │
            │       └── Message (leaf)
            └── Message (branch 2)
                    └── Message (leaf)

支持从任意消息节点创建分支,实现对话版本控制。