Week 3 Day 15:状态机(FSM)驱动 Agent Workflow

💡 一句话核心:当 agent 的执行过程不再是"一个 loop 跑到底",而是"多步骤、可暂停、可恢复、可观测"的时候,你需要的不是更聪明的 LLM,而是一台 有限状态机


第一部分:问题驱动

🤔 问题1:为什么 agent loop 不够用了?

引导问题:

  1. Week 1 我们写的 agent loop 是 while !done { think → act → observe },它能处理"订机票+订酒店+发邮件"这种流程吗?
  2. 如果 agent 跑到第 3 步挂了,下次重启它从哪里继续?
  3. 如果用户在第 2 步之后想"等一下我要审批再执行",现有的 loop 能停下来吗?
  4. 当流程分支多达 10+ 种路径时,prompt 里能塞下所有规则吗?

答案揭示:

  • Agent loop 适合探索型任务(研究、问答),不适合流程型任务(审批、交易、多系统联动)
  • Loop 是 stateless 的,崩溃即丢失;流程是 stateful 的,需要持久化
  • 状态机把"下一步做什么"从 LLM 的自由发挥变成 代码约束 + LLM 决策

你应该理解:

  • Agent loop ≠ Workflow
  • 状态机是一种契约:状态、事件、转移规则都是显式的
  • 越是高风险、多步骤的任务,越需要 FSM

🤔 问题2:FSM vs Temporal/Cadence,我要自己写还是用现成的?

引导问题:

  1. Temporal 的 workflow 是用代码写的,它底层是什么?(event sourcing + 状态重放)
  2. 一个 10 人团队的 agent 平台,选 Temporal 合适吗?运维成本是多少?
  3. 你真正需要 Temporal 的哪些能力?(持久化、重试、timer、signal)
  4. 如果只需要"存状态 + 顺序执行",自己写一个 200 行的 FSM 够不够?

对比:

维度 自研 FSM Temporal
学习成本 低(一天) 高(一周)
运维 一张 Postgres 表 独立集群(Cassandra/ES)
Timer 需自己实现 内置
重试 需自己实现 内置
适合场景 <50 步、流程清晰 长流程、跨天、高可靠

结论: Week 3 我们自研 FSM,理解本质;真要上生产,再评估 Temporal。面试时能讲清楚什么时候选哪个,比会用 Temporal 更值钱。


🤔 问题3:状态机的核心三要素是什么?

引导问题:

  1. 一个订单从"下单"到"完成"会经过哪些状态?
  2. 状态之间的跳转由什么触发?(事件)
  3. 什么情况不允许跳转?(非法转移,比如 completed → pending)

核心三要素:

  • State(状态):系统某一时刻的快照,如 pending / running / completed / failed / cancelled
  • Event(事件):触发状态变化的输入,如 start / step_done / approve / reject / timeout
  • Transition(转移)(CurrentState, Event) → NextState 的映射

第二部分:动手实现

✅ 版本1:最小 FSM 骨架

// internal/workflow/fsm.go
package workflow

import (
	"context"
	"errors"
	"fmt"
	"sync"
)

type State string

const (
	StatePending   State = "pending"
	StateRunning   State = "running"
	StateCompleted State = "completed"
	StateFailed    State = "failed"
	StateCancelled State = "cancelled"
)

type Event string

const (
	EventStart    Event = "start"
	EventStepDone Event = "step_done"
	EventFail     Event = "fail"
	EventCancel   Event = "cancel"
)

// transitions: (currentState, event) -> nextState
var transitions = map[State]map[Event]State{
	StatePending: {
		EventStart:  StateRunning,
		EventCancel: StateCancelled,
	},
	StateRunning: {
		EventStepDone: StateCompleted,
		EventFail:     StateFailed,
		EventCancel:   StateCancelled,
	},
}

type FSM struct {
	mu      sync.Mutex
	state   State
	history []transition
}

type transition struct {
	From  State
	To    State
	Event Event
}

func New() *FSM {
	return &FSM{state: StatePending}
}

func (f *FSM) Current() State {
	f.mu.Lock()
	defer f.mu.Unlock()
	return f.state
}

func (f *FSM) Fire(ctx context.Context, e Event) error {
	f.mu.Lock()
	defer f.mu.Unlock()

	next, ok := transitions[f.state][e]
	if !ok {
		return fmt.Errorf("illegal transition: %s --%s--> ?", f.state, e)
	}
	f.history = append(f.history, transition{From: f.state, To: next, Event: e})
	f.state = next
	return nil
}

反思: 这个 FSM 只有状态,没有"每个状态要做什么事"。下一步引入 Step。


✅ 版本2:Step 接口 + Workflow Engine

// internal/workflow/step.go
package workflow

import "context"

// Step 是 workflow 的最小执行单元
type Step interface {
	Name() string
	Run(ctx context.Context, input map[string]any) (output map[string]any, err error)
}

// Workflow 串联多个 Step
type Workflow struct {
	ID    string
	Steps []Step
	fsm   *FSM
	// 每步的输出汇总,给下一步用
	ctx map[string]any
}

func NewWorkflow(id string, steps []Step) *Workflow {
	return &Workflow{
		ID:    id,
		Steps: steps,
		fsm:   New(),
		ctx:   map[string]any{},
	}
}

func (w *Workflow) Run(ctx context.Context, input map[string]any) error {
	if err := w.fsm.Fire(ctx, EventStart); err != nil {
		return err
	}
	for k, v := range input {
		w.ctx[k] = v
	}
	for _, step := range w.Steps {
		out, err := step.Run(ctx, w.ctx)
		if err != nil {
			_ = w.fsm.Fire(ctx, EventFail)
			return fmt.Errorf("step %s failed: %w", step.Name(), err)
		}
		for k, v := range out {
			w.ctx[k] = v
		}
	}
	return w.fsm.Fire(ctx, EventStepDone)
}

举例:一个"工单创建"workflow

type ValidateStep struct{}
func (ValidateStep) Name() string { return "validate" }
func (ValidateStep) Run(ctx context.Context, in map[string]any) (map[string]any, error) {
	title, _ := in["title"].(string)
	if len(title) < 5 {
		return nil, errors.New("title too short")
	}
	return map[string]any{"validated": true}, nil
}

type DraftStep struct{ llm LLMClient }
func (d DraftStep) Name() string { return "draft" }
func (d DraftStep) Run(ctx context.Context, in map[string]any) (map[string]any, error) {
	draft, err := d.llm.Generate(ctx, fmt.Sprintf("Draft ticket for: %v", in["title"]))
	return map[string]any{"draft": draft}, err
}

type CreateStep struct{ jira JiraClient }
func (c CreateStep) Name() string { return "create" }
func (c CreateStep) Run(ctx context.Context, in map[string]any) (map[string]any, error) {
	id, err := c.jira.Create(ctx, in["draft"].(string))
	return map[string]any{"ticket_id": id}, err
}

✅ 版本3:Event-driven + 可暂停

问题: 上面的 Run 是同步跑完的。如果中间需要"等人审批"怎么办?

思路: 把执行拆成"触发事件 → 处理事件 → 决定下一步"。主程序不 block,由外部事件驱动。

// internal/workflow/engine.go
package workflow

type Engine struct {
	store  Store          // 持久化(Day 16 详细讲)
	events chan EventMsg
}

type EventMsg struct {
	WorkflowID string
	Event      Event
	Payload    map[string]any
}

func (e *Engine) Handle(ctx context.Context) {
	for msg := range e.events {
		wf, err := e.store.Load(ctx, msg.WorkflowID)
		if err != nil {
			continue
		}
		if err := wf.fsm.Fire(ctx, msg.Event); err != nil {
			// 非法转移,记日志即可
			continue
		}
		// 到达某个状态触发动作
		switch wf.fsm.Current() {
		case StateRunning:
			go e.runNextStep(ctx, wf) // 异步执行
		case StateCompleted, StateFailed, StateCancelled:
			_ = e.store.Save(ctx, wf)
		}
	}
}

Event-driven vs Polling:

方式 优点 缺点
Polling 简单、容错好 延迟高、浪费 CPU
Event-driven 实时、高效 需要消息队列/channel

生产建议: 混合 — 用 event-driven 为主,加一个 每分钟扫一次超时任务 的兜底 poller。


第三部分:关键概念

1. Idempotency(幂等性)

同一个 event 触发两次,结果必须一样。实现方式:在 transition 时检查 event_id 是否已处理。

2. Illegal transition

非法转移必须显式拒绝,不能静默。比如 completed 收到 start 事件应该报错。

3. Terminal state(终态)

completed / failed / cancelled 是终态,不再接受任何事件。

4. Compensation(补偿)

如果 step 3 失败了,是否要回滚 step 1、2?简单场景用事务,跨系统用 Saga 模式


第四部分:自测清单

  • 我能画出"工单创建"流程的状态转移图吗?
  • 为什么 transitionsmap[State]map[Event]State 而不是 if/else?(可扩展、可导出为可视化)
  • FSM 的 Fire 为什么要加锁?
  • Terminal state 为什么不能再转移?
  • Agent loop 和 FSM workflow 的分界线在哪里?

第五部分:作业

任务1:画图

用纸笔或 mermaid 画出订单状态机:pending → paid → shipped → delivered,包含所有合法事件。

任务2:实现

完成 internal/workflow/{fsm.go,step.go,engine.go},跑通 3-step workflow(validate → draft → create)。

任务3:测试非法转移

写单测:在 StateCompleted 触发 EventStart,应该返回 illegal transition 错误。

任务4:扩展

给 FSM 加一个 OnEnter(state) func() 钩子,能在进入某个状态时自动发通知。


第六部分:常见问题

Q: 状态机的状态数多了(20+)会不会很难维护?

A: 会。这是 FSM 的已知问题,解决方案是 Hierarchical FSM(HSM)— 状态可以嵌套。复杂到一定程度,就该上 Temporal 了。

Q: Workflow 执行到一半机器挂了怎么办?

A: 这是 Day 16 的主题 — Checkpoint。每步完成后把 state 写入 Postgres,重启时从最后一个 checkpoint 恢复。

Q: 多个 workflow 并发,会不会抢资源?

A: Engine 应该有并发控制,比如 semaphore := make(chan struct{}, 10) 限制同时跑 10 个。

Q: LLM 在 FSM 里扮演什么角色?

A: 两种模式:

  • LLM as planner:LLM 输出要跑哪些 step(动态 workflow)
  • LLM as step:固定 workflow,某一步调 LLM(如 draft 生成) Week 3 我们先用第二种,更可控。

配套算法题

题1:Minimum Window Substring (Hard)

题目: 给字符串 s 和 t,返回 s 中包含 t 所有字符的最短子串;若不存在返回空串。

思路: 滑动窗口 + 双指针 + 计数器。用 need 记录 t 中每个字符的需求量,have 记录窗口内满足需求的字符数。右指针扩窗口,满足条件后左指针收缩找最短。

func minWindow(s, t string) string {
	need := map[byte]int{}
	for i := 0; i < len(t); i++ {
		need[t[i]]++
	}
	have := map[byte]int{}
	left, count := 0, 0
	resStart, resLen := 0, -1

	for right := 0; right < len(s); right++ {
		c := s[right]
		have[c]++
		// 该字符数量刚好满足需求时,满足字符种数+1
		if need[c] > 0 && have[c] == need[c] {
			count++
		}
		// 窗口满足所有字符需求,尝试收缩左边
		for count == len(need) {
			if resLen == -1 || right-left+1 < resLen {
				resStart, resLen = left, right-left+1
			}
			lc := s[left]
			have[lc]--
			if need[lc] > 0 && have[lc] < need[lc] {
				count--
			}
			left++
		}
	}
	if resLen == -1 {
		return ""
	}
	return s[resStart : resStart+resLen]
}

复杂度: 时间 O(|s| + |t|),空间 O(|Σ|)(字符集大小)

变体/面试追问:

  • 如果要求返回所有满足条件的最短子串位置怎么办?(记录所有 resStart/resLen)
  • t 中有重复字符时如何处理?(need 计数器天然支持,have[c] == need[c] 条件精确匹配)

题2:Next Permutation (Medium)

题目: 将数组重新排列为字典序的下一个排列;若已是最大排列则重排为最小排列(原地修改)。

思路: 从右往左找第一个"升序对"(nums[i] < nums[i+1]),说明 i 位置可以增大;再从右往左找第一个大于 nums[i] 的数 j,交换 nums[i] 和 nums[j];最后反转 i+1 到末尾的部分(变为升序即最小)。

func nextPermutation(nums []int) {
	n := len(nums)
	// 步骤1:从右往左找第一个下降点 i(nums[i] < nums[i+1])
	i := n - 2
	for i >= 0 && nums[i] >= nums[i+1] {
		i--
	}
	if i >= 0 {
		// 步骤2:从右往左找第一个大于 nums[i] 的位置 j
		j := n - 1
		for j > i && nums[j] <= nums[i] {
			j--
		}
		// 步骤3:交换
		nums[i], nums[j] = nums[j], nums[i]
	}
	// 步骤4:反转 i+1 到末尾(使后缀变最小)
	left, right := i+1, n-1
	for left < right {
		nums[left], nums[right] = nums[right], nums[left]
		left++
		right--
	}
}

复杂度: 时间 O(n),空间 O(1)

变体/面试追问:

  • 如何找第 k 个排列?(LC 60,数学法:用阶乘逐位确定)
  • 如果数组中有重复元素,"下一个排列"的定义是否变化?(不变,算法同样适用)

题3:Longest Repeating Character Replacement (Medium)

题目: 给字符串 s 和整数 k,可以将 s 中任意 k 个字符替换为任意字母,求替换后最长的只含同一字母的子串长度。

思路: 滑动窗口 + 维护窗口内最高频字符数 maxFreq。窗口大小 - maxFreq 就是需要替换的字符数。若该值 > k,则左指针右移收缩窗口。关键:maxFreq 只增不减(保证单调性),不需要精确维护。

func characterReplacement(s string, k int) int {
	count := [26]int{}
	maxFreq := 0
	left, result := 0, 0

	for right := 0; right < len(s); right++ {
		count[s[right]-'A']++
		if count[s[right]-'A'] > maxFreq {
			maxFreq = count[s[right]-'A']
		}
		// 窗口内需要替换的字符数 = 窗口长度 - 最高频字符数
		for right-left+1-maxFreq > k {
			count[s[left]-'A']--
			left++
		}
		if right-left+1 > result {
			result = right - left + 1
		}
	}
	return result
}

复杂度: 时间 O(n),空间 O(26) = O(1)

变体/面试追问:

  • 如果可以替换的字符种类有限制(只能替换为特定字母)怎么处理?(分别对每个目标字母枚举)
  • 为什么 maxFreq 不在窗口收缩时更新也是正确的?(因为答案只在 maxFreq 增大时才可能变大,保守估计不影响最终结果)

下一步:Day 16 预告

明天我们会:

  1. 把 FSM 的 state 写进 Postgres
  2. 用 Redis 存 session 缓存
  3. 实现 checkpoint + resume
  4. 设计短期 / 长期 memory 分层

准备问题:

  • 为什么 state 用 Postgres 而 session 用 Redis?
  • Checkpoint 应该在每步之后还是每隔 N 秒?