Week 3 Day 15:状态机(FSM)驱动 Agent Workflow
💡 一句话核心:当 agent 的执行过程不再是"一个 loop 跑到底",而是"多步骤、可暂停、可恢复、可观测"的时候,你需要的不是更聪明的 LLM,而是一台 有限状态机。
第一部分:问题驱动
🤔 问题1:为什么 agent loop 不够用了?
引导问题:
- Week 1 我们写的 agent loop 是
while !done { think → act → observe },它能处理"订机票+订酒店+发邮件"这种流程吗?
- 如果 agent 跑到第 3 步挂了,下次重启它从哪里继续?
- 如果用户在第 2 步之后想"等一下我要审批再执行",现有的 loop 能停下来吗?
- 当流程分支多达 10+ 种路径时,prompt 里能塞下所有规则吗?
答案揭示:
- Agent loop 适合探索型任务(研究、问答),不适合流程型任务(审批、交易、多系统联动)
- Loop 是 stateless 的,崩溃即丢失;流程是 stateful 的,需要持久化
- 状态机把"下一步做什么"从 LLM 的自由发挥变成 代码约束 + LLM 决策
你应该理解:
- Agent loop ≠ Workflow
- 状态机是一种契约:状态、事件、转移规则都是显式的
- 越是高风险、多步骤的任务,越需要 FSM
🤔 问题2:FSM vs Temporal/Cadence,我要自己写还是用现成的?
引导问题:
- Temporal 的 workflow 是用代码写的,它底层是什么?(event sourcing + 状态重放)
- 一个 10 人团队的 agent 平台,选 Temporal 合适吗?运维成本是多少?
- 你真正需要 Temporal 的哪些能力?(持久化、重试、timer、signal)
- 如果只需要"存状态 + 顺序执行",自己写一个 200 行的 FSM 够不够?
对比:
| 维度 |
自研 FSM |
Temporal |
| 学习成本 |
低(一天) |
高(一周) |
| 运维 |
一张 Postgres 表 |
独立集群(Cassandra/ES) |
| Timer |
需自己实现 |
内置 |
| 重试 |
需自己实现 |
内置 |
| 适合场景 |
<50 步、流程清晰 |
长流程、跨天、高可靠 |
结论: Week 3 我们自研 FSM,理解本质;真要上生产,再评估 Temporal。面试时能讲清楚什么时候选哪个,比会用 Temporal 更值钱。
🤔 问题3:状态机的核心三要素是什么?
引导问题:
- 一个订单从"下单"到"完成"会经过哪些状态?
- 状态之间的跳转由什么触发?(事件)
- 什么情况不允许跳转?(非法转移,比如 completed → pending)
核心三要素:
- State(状态):系统某一时刻的快照,如
pending / running / completed / failed / cancelled
- Event(事件):触发状态变化的输入,如
start / step_done / approve / reject / timeout
- Transition(转移):
(CurrentState, Event) → NextState 的映射
第二部分:动手实现
✅ 版本1:最小 FSM 骨架
// internal/workflow/fsm.go
package workflow
import (
"context"
"errors"
"fmt"
"sync"
)
type State string
const (
StatePending State = "pending"
StateRunning State = "running"
StateCompleted State = "completed"
StateFailed State = "failed"
StateCancelled State = "cancelled"
)
type Event string
const (
EventStart Event = "start"
EventStepDone Event = "step_done"
EventFail Event = "fail"
EventCancel Event = "cancel"
)
// transitions: (currentState, event) -> nextState
var transitions = map[State]map[Event]State{
StatePending: {
EventStart: StateRunning,
EventCancel: StateCancelled,
},
StateRunning: {
EventStepDone: StateCompleted,
EventFail: StateFailed,
EventCancel: StateCancelled,
},
}
type FSM struct {
mu sync.Mutex
state State
history []transition
}
type transition struct {
From State
To State
Event Event
}
func New() *FSM {
return &FSM{state: StatePending}
}
func (f *FSM) Current() State {
f.mu.Lock()
defer f.mu.Unlock()
return f.state
}
func (f *FSM) Fire(ctx context.Context, e Event) error {
f.mu.Lock()
defer f.mu.Unlock()
next, ok := transitions[f.state][e]
if !ok {
return fmt.Errorf("illegal transition: %s --%s--> ?", f.state, e)
}
f.history = append(f.history, transition{From: f.state, To: next, Event: e})
f.state = next
return nil
}
反思: 这个 FSM 只有状态,没有"每个状态要做什么事"。下一步引入 Step。
✅ 版本2:Step 接口 + Workflow Engine
// internal/workflow/step.go
package workflow
import "context"
// Step 是 workflow 的最小执行单元
type Step interface {
Name() string
Run(ctx context.Context, input map[string]any) (output map[string]any, err error)
}
// Workflow 串联多个 Step
type Workflow struct {
ID string
Steps []Step
fsm *FSM
// 每步的输出汇总,给下一步用
ctx map[string]any
}
func NewWorkflow(id string, steps []Step) *Workflow {
return &Workflow{
ID: id,
Steps: steps,
fsm: New(),
ctx: map[string]any{},
}
}
func (w *Workflow) Run(ctx context.Context, input map[string]any) error {
if err := w.fsm.Fire(ctx, EventStart); err != nil {
return err
}
for k, v := range input {
w.ctx[k] = v
}
for _, step := range w.Steps {
out, err := step.Run(ctx, w.ctx)
if err != nil {
_ = w.fsm.Fire(ctx, EventFail)
return fmt.Errorf("step %s failed: %w", step.Name(), err)
}
for k, v := range out {
w.ctx[k] = v
}
}
return w.fsm.Fire(ctx, EventStepDone)
}
举例:一个"工单创建"workflow
type ValidateStep struct{}
func (ValidateStep) Name() string { return "validate" }
func (ValidateStep) Run(ctx context.Context, in map[string]any) (map[string]any, error) {
title, _ := in["title"].(string)
if len(title) < 5 {
return nil, errors.New("title too short")
}
return map[string]any{"validated": true}, nil
}
type DraftStep struct{ llm LLMClient }
func (d DraftStep) Name() string { return "draft" }
func (d DraftStep) Run(ctx context.Context, in map[string]any) (map[string]any, error) {
draft, err := d.llm.Generate(ctx, fmt.Sprintf("Draft ticket for: %v", in["title"]))
return map[string]any{"draft": draft}, err
}
type CreateStep struct{ jira JiraClient }
func (c CreateStep) Name() string { return "create" }
func (c CreateStep) Run(ctx context.Context, in map[string]any) (map[string]any, error) {
id, err := c.jira.Create(ctx, in["draft"].(string))
return map[string]any{"ticket_id": id}, err
}
✅ 版本3:Event-driven + 可暂停
问题: 上面的 Run 是同步跑完的。如果中间需要"等人审批"怎么办?
思路: 把执行拆成"触发事件 → 处理事件 → 决定下一步"。主程序不 block,由外部事件驱动。
// internal/workflow/engine.go
package workflow
type Engine struct {
store Store // 持久化(Day 16 详细讲)
events chan EventMsg
}
type EventMsg struct {
WorkflowID string
Event Event
Payload map[string]any
}
func (e *Engine) Handle(ctx context.Context) {
for msg := range e.events {
wf, err := e.store.Load(ctx, msg.WorkflowID)
if err != nil {
continue
}
if err := wf.fsm.Fire(ctx, msg.Event); err != nil {
// 非法转移,记日志即可
continue
}
// 到达某个状态触发动作
switch wf.fsm.Current() {
case StateRunning:
go e.runNextStep(ctx, wf) // 异步执行
case StateCompleted, StateFailed, StateCancelled:
_ = e.store.Save(ctx, wf)
}
}
}
Event-driven vs Polling:
| 方式 |
优点 |
缺点 |
| Polling |
简单、容错好 |
延迟高、浪费 CPU |
| Event-driven |
实时、高效 |
需要消息队列/channel |
生产建议: 混合 — 用 event-driven 为主,加一个 每分钟扫一次超时任务 的兜底 poller。
第三部分:关键概念
1. Idempotency(幂等性)
同一个 event 触发两次,结果必须一样。实现方式:在 transition 时检查 event_id 是否已处理。
2. Illegal transition
非法转移必须显式拒绝,不能静默。比如 completed 收到 start 事件应该报错。
3. Terminal state(终态)
completed / failed / cancelled 是终态,不再接受任何事件。
4. Compensation(补偿)
如果 step 3 失败了,是否要回滚 step 1、2?简单场景用事务,跨系统用 Saga 模式。
第四部分:自测清单
第五部分:作业
任务1:画图
用纸笔或 mermaid 画出订单状态机:pending → paid → shipped → delivered,包含所有合法事件。
任务2:实现
完成 internal/workflow/{fsm.go,step.go,engine.go},跑通 3-step workflow(validate → draft → create)。
任务3:测试非法转移
写单测:在 StateCompleted 触发 EventStart,应该返回 illegal transition 错误。
任务4:扩展
给 FSM 加一个 OnEnter(state) func() 钩子,能在进入某个状态时自动发通知。
第六部分:常见问题
Q: 状态机的状态数多了(20+)会不会很难维护?
A: 会。这是 FSM 的已知问题,解决方案是 Hierarchical FSM(HSM)— 状态可以嵌套。复杂到一定程度,就该上 Temporal 了。
Q: Workflow 执行到一半机器挂了怎么办?
A: 这是 Day 16 的主题 — Checkpoint。每步完成后把 state 写入 Postgres,重启时从最后一个 checkpoint 恢复。
Q: 多个 workflow 并发,会不会抢资源?
A: Engine 应该有并发控制,比如 semaphore := make(chan struct{}, 10) 限制同时跑 10 个。
Q: LLM 在 FSM 里扮演什么角色?
A: 两种模式:
- LLM as planner:LLM 输出要跑哪些 step(动态 workflow)
- LLM as step:固定 workflow,某一步调 LLM(如 draft 生成)
Week 3 我们先用第二种,更可控。
配套算法题
题1:Minimum Window Substring (Hard)
题目: 给字符串 s 和 t,返回 s 中包含 t 所有字符的最短子串;若不存在返回空串。
思路: 滑动窗口 + 双指针 + 计数器。用 need 记录 t 中每个字符的需求量,have 记录窗口内满足需求的字符数。右指针扩窗口,满足条件后左指针收缩找最短。
func minWindow(s, t string) string {
need := map[byte]int{}
for i := 0; i < len(t); i++ {
need[t[i]]++
}
have := map[byte]int{}
left, count := 0, 0
resStart, resLen := 0, -1
for right := 0; right < len(s); right++ {
c := s[right]
have[c]++
// 该字符数量刚好满足需求时,满足字符种数+1
if need[c] > 0 && have[c] == need[c] {
count++
}
// 窗口满足所有字符需求,尝试收缩左边
for count == len(need) {
if resLen == -1 || right-left+1 < resLen {
resStart, resLen = left, right-left+1
}
lc := s[left]
have[lc]--
if need[lc] > 0 && have[lc] < need[lc] {
count--
}
left++
}
}
if resLen == -1 {
return ""
}
return s[resStart : resStart+resLen]
}
复杂度: 时间 O(|s| + |t|),空间 O(|Σ|)(字符集大小)
变体/面试追问:
- 如果要求返回所有满足条件的最短子串位置怎么办?(记录所有 resStart/resLen)
- t 中有重复字符时如何处理?(need 计数器天然支持,have[c] == need[c] 条件精确匹配)
题2:Next Permutation (Medium)
题目: 将数组重新排列为字典序的下一个排列;若已是最大排列则重排为最小排列(原地修改)。
思路: 从右往左找第一个"升序对"(nums[i] < nums[i+1]),说明 i 位置可以增大;再从右往左找第一个大于 nums[i] 的数 j,交换 nums[i] 和 nums[j];最后反转 i+1 到末尾的部分(变为升序即最小)。
func nextPermutation(nums []int) {
n := len(nums)
// 步骤1:从右往左找第一个下降点 i(nums[i] < nums[i+1])
i := n - 2
for i >= 0 && nums[i] >= nums[i+1] {
i--
}
if i >= 0 {
// 步骤2:从右往左找第一个大于 nums[i] 的位置 j
j := n - 1
for j > i && nums[j] <= nums[i] {
j--
}
// 步骤3:交换
nums[i], nums[j] = nums[j], nums[i]
}
// 步骤4:反转 i+1 到末尾(使后缀变最小)
left, right := i+1, n-1
for left < right {
nums[left], nums[right] = nums[right], nums[left]
left++
right--
}
}
复杂度: 时间 O(n),空间 O(1)
变体/面试追问:
- 如何找第 k 个排列?(LC 60,数学法:用阶乘逐位确定)
- 如果数组中有重复元素,"下一个排列"的定义是否变化?(不变,算法同样适用)
题3:Longest Repeating Character Replacement (Medium)
题目: 给字符串 s 和整数 k,可以将 s 中任意 k 个字符替换为任意字母,求替换后最长的只含同一字母的子串长度。
思路: 滑动窗口 + 维护窗口内最高频字符数 maxFreq。窗口大小 - maxFreq 就是需要替换的字符数。若该值 > k,则左指针右移收缩窗口。关键:maxFreq 只增不减(保证单调性),不需要精确维护。
func characterReplacement(s string, k int) int {
count := [26]int{}
maxFreq := 0
left, result := 0, 0
for right := 0; right < len(s); right++ {
count[s[right]-'A']++
if count[s[right]-'A'] > maxFreq {
maxFreq = count[s[right]-'A']
}
// 窗口内需要替换的字符数 = 窗口长度 - 最高频字符数
for right-left+1-maxFreq > k {
count[s[left]-'A']--
left++
}
if right-left+1 > result {
result = right - left + 1
}
}
return result
}
复杂度: 时间 O(n),空间 O(26) = O(1)
变体/面试追问:
- 如果可以替换的字符种类有限制(只能替换为特定字母)怎么处理?(分别对每个目标字母枚举)
- 为什么 maxFreq 不在窗口收缩时更新也是正确的?(因为答案只在 maxFreq 增大时才可能变大,保守估计不影响最终结果)
下一步:Day 16 预告
明天我们会:
- 把 FSM 的 state 写进 Postgres
- 用 Redis 存 session 缓存
- 实现 checkpoint + resume
- 设计短期 / 长期 memory 分层
准备问题:
- 为什么 state 用 Postgres 而 session 用 Redis?
- Checkpoint 应该在每步之后还是每隔 N 秒?