Week 3 Day 16:State / Memory / Checkpoint

💡 一句话核心:Agent 不是 request/response 的无状态函数,它是一个有记忆的进程。记忆分层、状态持久化、崩溃可恢复,这三件事决定了 agent 能不能上生产。


第一部分:问题驱动

🤔 问题1:为什么需要持久化 state?

引导问题:

  1. 用户问 "帮我订机票",agent 正在调 API 时进程挂了,用户重试会发生什么?(重复订单)
  2. 一个 workflow 跑了 20 分钟到第 8 步失败,要从头来吗?
  3. 用户昨天说 "我喜欢靠窗座位",今天订票 agent 记得吗?
  4. 生产事故排查时,你想回放 agent 的每一步吗?

答案:

  • Crash recovery:防止重复执行 / 丢失进度
  • Debugging:事后能回放
  • Multi-turn:跨对话记忆
  • Audit:合规要求每步可追溯

🤔 问题2:Postgres vs Redis,哪个存什么?

对比:

维度 Postgres Redis
持久性 强(fsync) 弱(默认异步)
延迟 ms 级 微秒级
查询 SQL / index KV / 有限数据结构
事务 ACID 简单 MULTI
适合 Workflow state、审计日志 Session、限流计数

分层设计:

  • Postgres:workflow 实例、每步输出、最终结果(长期、强一致)
  • Redis:当前对话上下文、锁、rate limit(短期、高性能)

🤔 问题3:Memory 怎么分层?

引导问题:

  1. 你和朋友聊天,刚说过的话和三年前说过的话存储方式一样吗?
  2. 如果把 user 的所有历史对话都塞进 LLM prompt,会发生什么?(爆 context)
  3. 什么信息应该被"蒸馏"成 facts,而不是原样保存?

三层 Memory:

存什么 存哪 TTL
Working 本轮对话最近 N 条 Redis 小时级
Short-term 本次 session 全部 Redis / Postgres 天级
Long-term 蒸馏后的 facts、preferences Postgres + Vector DB 永久

第二部分:动手实现

✅ 版本1:Postgres Schema

-- migrations/001_workflow.sql
CREATE TABLE workflows (
    id UUID PRIMARY KEY,
    user_id TEXT NOT NULL,
    type TEXT NOT NULL,             -- e.g. "ticket_creation"
    state TEXT NOT NULL,            -- pending/running/completed/failed
    input JSONB NOT NULL,
    output JSONB,
    error TEXT,
    created_at TIMESTAMPTZ DEFAULT now(),
    updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX idx_workflows_user ON workflows(user_id);
CREATE INDEX idx_workflows_state ON workflows(state) WHERE state IN ('pending', 'running');

CREATE TABLE workflow_steps (
    id BIGSERIAL PRIMARY KEY,
    workflow_id UUID REFERENCES workflows(id) ON DELETE CASCADE,
    step_name TEXT NOT NULL,
    step_index INT NOT NULL,
    input JSONB,
    output JSONB,
    status TEXT NOT NULL,           -- pending/running/completed/failed
    started_at TIMESTAMPTZ,
    finished_at TIMESTAMPTZ,
    error TEXT,
    UNIQUE(workflow_id, step_index)
);

CREATE TABLE memories (
    id BIGSERIAL PRIMARY KEY,
    user_id TEXT NOT NULL,
    kind TEXT NOT NULL,             -- "fact" / "preference" / "summary"
    content TEXT NOT NULL,
    embedding VECTOR(1536),         -- pgvector 扩展
    created_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX idx_memories_user_kind ON memories(user_id, kind);

为什么 step 单独一张表?

  • 每一步都能独立查(debug 友好)
  • 可做 partition by time(旧数据归档)
  • 作为 checkpoint 的天然存储

✅ 版本2:State Store 实现

// internal/workflow/store.go
package workflow

import (
	"context"
	"database/sql"
	"encoding/json"
	"time"

	"github.com/google/uuid"
)

type Store struct {
	db *sql.DB
}

type Instance struct {
	ID        uuid.UUID
	UserID    string
	Type      string
	State     State
	Input     map[string]any
	Output    map[string]any
	CreatedAt time.Time
	UpdatedAt time.Time
}

func (s *Store) Create(ctx context.Context, inst *Instance) error {
	in, _ := json.Marshal(inst.Input)
	_, err := s.db.ExecContext(ctx, `
		INSERT INTO workflows (id, user_id, type, state, input)
		VALUES ($1, $2, $3, $4, $5)`,
		inst.ID, inst.UserID, inst.Type, inst.State, in)
	return err
}

func (s *Store) Load(ctx context.Context, id uuid.UUID) (*Instance, error) {
	var inst Instance
	var inBytes, outBytes []byte
	err := s.db.QueryRowContext(ctx, `
		SELECT id, user_id, type, state, input, output, created_at, updated_at
		FROM workflows WHERE id = $1`, id).
		Scan(&inst.ID, &inst.UserID, &inst.Type, &inst.State,
			&inBytes, &outBytes, &inst.CreatedAt, &inst.UpdatedAt)
	if err != nil {
		return nil, err
	}
	_ = json.Unmarshal(inBytes, &inst.Input)
	if outBytes != nil {
		_ = json.Unmarshal(outBytes, &inst.Output)
	}
	return &inst, nil
}

func (s *Store) UpdateState(ctx context.Context, id uuid.UUID, state State) error {
	_, err := s.db.ExecContext(ctx, `
		UPDATE workflows SET state=$1, updated_at=now() WHERE id=$2`,
		state, id)
	return err
}

// SaveStep 写入 checkpoint
func (s *Store) SaveStep(ctx context.Context, wfID uuid.UUID, idx int,
	name string, out map[string]any, err error) error {
	outBytes, _ := json.Marshal(out)
	status := "completed"
	var errStr sql.NullString
	if err != nil {
		status = "failed"
		errStr = sql.NullString{String: err.Error(), Valid: true}
	}
	_, dbErr := s.db.ExecContext(ctx, `
		INSERT INTO workflow_steps
			(workflow_id, step_name, step_index, output, status, finished_at, error)
		VALUES ($1, $2, $3, $4, $5, now(), $6)
		ON CONFLICT (workflow_id, step_index) DO UPDATE
			SET output = EXCLUDED.output, status = EXCLUDED.status,
			    finished_at = now(), error = EXCLUDED.error`,
		wfID, name, idx, outBytes, status, errStr)
	return dbErr
}

关键点:

  • ON CONFLICT DO UPDATE 保证 idempotent — 重跑同一步不会插入两条
  • SELECT ... FOR UPDATE 避免两个 worker 同时处理同一 workflow(看任务4)

✅ 版本3:Checkpoint + Resume

// internal/workflow/engine.go (续 Day 15)
func (e *Engine) RunWithCheckpoint(ctx context.Context, wfID uuid.UUID,
	steps []Step, input map[string]any) error {

	// 1. 查已完成的步数
	completed, err := e.store.LoadCompletedSteps(ctx, wfID)
	if err != nil {
		return err
	}
	startIdx := len(completed)

	// 2. 恢复 context
	ctxData := input
	for _, s := range completed {
		for k, v := range s.Output {
			ctxData[k] = v
		}
	}

	// 3. 从 startIdx 开始执行
	for i := startIdx; i < len(steps); i++ {
		out, err := steps[i].Run(ctx, ctxData)
		// 无论成功失败都写 checkpoint
		_ = e.store.SaveStep(ctx, wfID, i, steps[i].Name(), out, err)
		if err != nil {
			_ = e.store.UpdateState(ctx, wfID, StateFailed)
			return err
		}
		for k, v := range out {
			ctxData[k] = v
		}
	}
	return e.store.UpdateState(ctx, wfID, StateCompleted)
}

测试 resume:

func TestResume(t *testing.T) {
	// 第一次跑 — 在 step 2 故意 panic
	err := engine.RunWithCheckpoint(ctx, wfID, stepsWithPanic, input)
	assert.Error(t, err)

	// 第二次跑 — 从 step 2 继续
	err = engine.RunWithCheckpoint(ctx, wfID, stepsFixed, input)
	assert.NoError(t, err)

	// 验证 step 1 没有重跑
	calls := stepsFixed[0].(*MockStep).CallCount
	assert.Equal(t, 1, calls)
}

✅ 版本4:Redis Session + Memory 蒸馏

// internal/memory/redis.go
package memory

import (
	"context"
	"encoding/json"
	"time"

	"github.com/redis/go-redis/v9"
)

type Session struct {
	rdb *redis.Client
}

type Turn struct {
	Role    string    `json:"role"`
	Content string    `json:"content"`
	Time    time.Time `json:"time"`
}

func (s *Session) Append(ctx context.Context, userID string, turn Turn) error {
	b, _ := json.Marshal(turn)
	key := "sess:" + userID
	pipe := s.rdb.Pipeline()
	pipe.RPush(ctx, key, b)
	pipe.LTrim(ctx, key, -50, -1)  // 只保留最近 50 轮
	pipe.Expire(ctx, key, 24*time.Hour)
	_, err := pipe.Exec(ctx)
	return err
}

func (s *Session) Recent(ctx context.Context, userID string, n int) ([]Turn, error) {
	items, err := s.rdb.LRange(ctx, "sess:"+userID, int64(-n), -1).Result()
	if err != nil {
		return nil, err
	}
	out := make([]Turn, 0, len(items))
	for _, it := range items {
		var t Turn
		_ = json.Unmarshal([]byte(it), &t)
		out = append(out, t)
	}
	return out, nil
}

Long-term memory 蒸馏(伪代码):

// 每 10 轮对话触发一次蒸馏
func (m *MemoryManager) Distill(ctx context.Context, userID string) error {
	turns, _ := m.session.Recent(ctx, userID, 10)
	prompt := buildDistillPrompt(turns)
	facts, err := m.llm.ExtractFacts(ctx, prompt)
	if err != nil {
		return err
	}
	for _, f := range facts {
		emb, _ := m.embed.Embed(ctx, f)
		_ = m.pg.InsertMemory(ctx, userID, "fact", f, emb)
	}
	return nil
}

第三部分:关键概念

1. Idempotency Key

每个请求带 X-Idempotency-Key,服务端查 Redis 是否已处理过,防止重复。

2. Optimistic Locking

UPDATE ... WHERE version = $old_version,冲突时重试。避免两个 worker 抢同一 workflow。

3. Write-Ahead Log

先写 intent(我打算做什么),再执行,最后写 result。崩溃时能判断"是不是已经做过"。

4. Memory consolidation

睡眠时大脑把短期记忆转成长期 — 我们每 N 轮触发 LLM 蒸馏。


第四部分:自测清单

  • Postgres 和 Redis 分别存什么?为什么?
  • Checkpoint 写在每步前还是每步后?为什么?
  • 两个 worker 并发处理同一 workflow 怎么办?
  • Long-term memory 为什么需要 embedding?
  • session 为什么要 LTRIM?不限制会怎样?
  • 如果 Postgres 挂了、Redis 也挂了,你的 agent 还能部分工作吗?

第五部分:作业

任务1:建 schema

docker-compose up postgres redis,跑 migration。

任务2:集成到 Day 15 的 Engine

RunWithCheckpoint 能跑通,手动 kill 进程后 resume 成功。

任务3:Redis session

实现 Append / Recent,写一个 handler /chat 使用它。

任务4:幂等测试

同一个 idempotency key 重复调用 /workflow,只创建一条记录。


第六部分:常见问题

Q: 为什么不把 session 也放 Postgres,一套到底?

A: 可以,但 session 访问频率高(每条消息一次),Postgres 顶不住。Redis 微秒级延迟是刚需。

Q: Checkpoint 太频繁会不会拖慢 workflow?

A: 每步写一条 log 通常 <10ms,对整个 workflow(秒-分钟级)可忽略。如果某步本身很快(<50ms),可考虑批量写。

Q: Long-term memory 会不会越存越多?

A: 会。策略:

  • 给 memory 打 confidence 分数,定期清低分
  • 做 summarization:10 条 facts 合成 1 条
  • Archive 到冷存储(S3)

Q: 用 event sourcing(只存事件)不是更优雅?

A: 是。但实现复杂度高 — 需要 replay、版本兼容。Temporal 就是 event sourcing。自研阶段用 snapshot + step log 够用。


配套算法题

题1:Accounts Merge (Medium)

题目: 给定账户列表,每个账户第一个元素是姓名,其余是邮件地址。属于同一人的账户(有相同邮件)需要合并,返回合并后的账户列表,每个账户的邮件排序。

思路: 并查集(Union-Find)。以 email 为节点,有相同账户内的 email 互相 union。最后按根节点分组,每组加上姓名输出。

import "sort"

func accountsMerge(accounts [][]string) [][]string {
	parent := map[string]string{}
	emailToName := map[string]string{}

	var find func(x string) string
	find = func(x string) string {
		if parent[x] != x {
			parent[x] = find(parent[x]) // 路径压缩
		}
		return parent[x]
	}

	union := func(a, b string) {
		ra, rb := find(a), find(b)
		if ra != rb {
			parent[ra] = rb
		}
	}

	// 初始化并查集
	for _, acc := range accounts {
		name := acc[0]
		for _, email := range acc[1:] {
			if _, ok := parent[email]; !ok {
				parent[email] = email
			}
			emailToName[email] = name
			// 将该账户所有 email 与第一个 email union
			union(email, acc[1])
		}
	}

	// 按根节点分组
	groups := map[string][]string{}
	for email := range parent {
		root := find(email)
		groups[root] = append(groups[root], email)
	}

	var res [][]string
	for root, emails := range groups {
		sort.Strings(emails)
		res = append(res, append([]string{emailToName[root]}, emails...))
	}
	return res
}

复杂度: 时间 O(N·α(N))(α 为反阿克曼函数,近似 O(N)),空间 O(N)

变体/面试追问:

  • 如果需要知道每个合并组共包含几个原始账户,怎么记录?(在 union 时额外维护 size 数组)
  • 并查集 vs DFS 哪个更快?(时间复杂度相近,并查集常数更小;DFS 代码更直观)

题2:Number of Connected Components in Undirected Graph (Medium)

题目: 给 n 个节点(0 到 n-1)和边列表 edges,返回图中连通分量的数量。

思路: 并查集。初始每个节点自为一组,count = n。每次 union 两个不同组时 count--。最终 count 即为答案。也可用 DFS/BFS 逐个节点遍历未访问的节点。

func countComponents(n int, edges [][]int) int {
	parent := make([]int, n)
	rank := make([]int, n)
	for i := range parent {
		parent[i] = i
	}
	count := n

	var find func(x int) int
	find = func(x int) int {
		if parent[x] != x {
			parent[x] = find(parent[x]) // 路径压缩
		}
		return parent[x]
	}

	union := func(a, b int) {
		ra, rb := find(a), find(b)
		if ra == rb {
			return
		}
		// 按秩合并
		if rank[ra] < rank[rb] {
			parent[ra] = rb
		} else if rank[ra] > rank[rb] {
			parent[rb] = ra
		} else {
			parent[rb] = ra
			rank[ra]++
		}
		count--
	}

	for _, e := range edges {
		union(e[0], e[1])
	}
	return count
}

复杂度: 时间 O((N+E)·α(N)),空间 O(N)

变体/面试追问:

  • 如果边是动态增加的(在线查询),并查集是否比 DFS 更适合?(是,并查集支持增量更新,DFS 需要重新遍历)
  • 如何判断加入某条边后是否形成环?(union 前检查两端节点的 root 是否相同)

题3:Binary Tree Right Side View (Medium)

题目: 给二叉树根节点,返回从右侧观察时每层能看到的节点值列表(即每层最右边的节点)。

思路: BFS 层序遍历,每层处理完后取队列中最后一个元素加入结果。也可用 DFS,优先遍历右子树,首次到达某层时记录该层值。

func rightSideView(root *TreeNode) []int {
	if root == nil {
		return nil
	}
	var result []int
	queue := []*TreeNode{root}

	for len(queue) > 0 {
		levelSize := len(queue)
		for i := 0; i < levelSize; i++ {
			node := queue[0]
			queue = queue[1:]
			// 每层最后一个节点
			if i == levelSize-1 {
				result = append(result, node.Val)
			}
			if node.Left != nil {
				queue = append(queue, node.Left)
			}
			if node.Right != nil {
				queue = append(queue, node.Right)
			}
		}
	}
	return result
}

// DFS 写法(优先遍历右子树)
func rightSideViewDFS(root *TreeNode) []int {
	var result []int
	var dfs func(node *TreeNode, depth int)
	dfs = func(node *TreeNode, depth int) {
		if node == nil {
			return
		}
		// 首次到达该层(从右侧优先),记录该层值
		if depth == len(result) {
			result = append(result, node.Val)
		}
		dfs(node.Right, depth+1) // 优先右子树
		dfs(node.Left, depth+1)
	}
	dfs(root, 0)
	return result
}

复杂度: 时间 O(N),空间 O(N)(队列/递归栈)

变体/面试追问:

  • 如果要求左侧视图怎么改?(BFS 取每层第一个;DFS 优先遍历左子树)
  • 如果树极度不平衡(退化为链表),BFS 和 DFS 的空间复杂度各是多少?(BFS O(1),DFS O(N) 递归栈)

下一步:Day 17 预告

明天我们做 Human Approval Flow — 让 agent 生成 draft,人批准后才执行。

  • Draft-Execute 模式
  • Approval API 和超时
  • 异步通知(email/slack)

准备问题:

  • 为什么高风险操作必须人工批准?
  • Draft 存哪?等多久算超时?