Week 1 Day 6:Retry / Backoff / Idempotency - 苏格拉底教学

💡 一句话核心:分布式系统没有"成功",只有"你以为的成功"。Retry是韧性,Idempotency是安全带——少了任何一个,Agent都会在生产环境制造灾难。

学习目标

  • 分辨哪些错误该重试、哪些必须立刻放弃
  • 实现Exponential backoff并正确处理ctx.Done()
  • 理解"至少一次"语义,用idempotency key解决重复副作用
  • 用Sync.Map / Redis实现去重存储
  • 掌握分布式重试的常见陷阱(retry storm、雪崩、时钟偏移)

第一部分:问题驱动

🤔 问题1:什么错误该重试?

场景: 你的Agent调用OpenAI,返回了以下错误,哪些该重试?

# 错误 HTTP状态 该重试吗?
1 context deadline exceeded - ✅ 可能是网络抖动
2 rate_limit_exceeded (429) 429 ✅ 但要backoff
3 invalid_api_key (401) 401 ❌ 重试100次也是401
4 invalid_request_error: max_tokens exceeded (400) 400 ❌ 参数错了
5 server_error (500) 500 ✅ 服务端临时故障
6 bad_gateway (502/503/504) 5xx ✅ 通常是upstream短暂不可用
7 permission_denied (403) 403 ❌ 权限问题重试无用
8 TCP connection refused - ✅ 目标还没起来

分类原则:

  • Transient(瞬时错误)→ 可重试:timeout、rate_limit、5xx、网络抖动
  • Non-retryable(永久错误)→ 立刻返回:4xx(除429)、invalid input、permission denied、not found

为什么不能全部重试?

  1. 浪费成本:每次LLM调用都是真金白银
  2. 延迟放大:3次重试每次3秒 = 用户等9秒
  3. 故障放大:下游挂了,你重试 = retry storm雪崩

🤔 问题2:为什么要Exponential Backoff?

反问: 如果每次失败立即重试,会发生什么?

假设下游因为过载返回了429,1000个客户端同时失败,如果都立即重试→下游立刻收到1000个重复请求→更崩。

Exponential Backoff = 指数延迟

  • 第1次重试:等1s
  • 第2次重试:等1.5s(1 × 1.5)
  • 第3次重试:等2.25s(1.5 × 1.5)
  • ...

为什么是1.5而不是2?

  • factor=2更激进,适合下游完全宕机的场景
  • factor=1.5温和,适合rate limit类故障
  • 业界常用值:1.5-2.0

Jitter(抖动)不能少: 如果所有客户端都等相同时间,它们会在同一刻再次集体涌入下游(Thundering Herd)。解决方案:加随机jitter。

backoff := baseDelay * factor^attempt
backoff = backoff * (0.5 + rand.Float64()*0.5) // 加50%-100%随机

🤔 问题3:为什么要检查ctx.Done()?

场景: 用户请求超时3秒。Agent第1次调用OpenAI失败,等1s后重试,又失败,等1.5s再重试...

如果不看ctx,你会在用户已经放弃的情况下继续重试,浪费资源,还可能污染后续的状态(创建了工单但用户看不到)。

正确做法:

select {
case <-time.After(backoff):
    // 继续重试
case <-ctx.Done():
    return ctx.Err() // 立即退出
}

🤔 问题4:什么是幂等性?为什么Agent尤其需要?

场景: Agent调用create_ticket(title="login issue"),请求发出但响应因网络抖动丢失。重试后,ticket系统收到2个相同请求,创建了2个工单

幂等性:同一操作执行N次,效果与执行1次相同。

天然幂等 vs 非幂等:

  • GET /user/123 → 幂等
  • DELETE /ticket/456 → 幂等(第2次返回404也算幂等)
  • POST /tickets {...} → 不幂等!每次都创建新工单

解决方案:Idempotency Key

客户端为每个操作生成唯一ID,服务端用该ID去重:

Request 1: POST /tickets Idempotency-Key: abc-123 Server: 创建ticket_id=789, 记录 abc-123 → 789 Request 2 (重试): POST /tickets Idempotency-Key: abc-123 Server: 查到abc-123已处理,返回上次的结果 ticket_id=789

关键: Idempotency Key由客户端生成(基于业务语义),不是服务端。


第二部分:动手实现

✅ 版本1:错误分类

// internal/retry/errors.go
package retry

import (
	"context"
	"errors"
	"net"
	"net/http"
	"strings"
)

// IsRetryable 判断错误是否可重试
func IsRetryable(err error) bool {
	if err == nil {
		return false
	}

	// Context取消/超时:区分处理
	if errors.Is(err, context.Canceled) {
		return false // 用户主动取消
	}
	if errors.Is(err, context.DeadlineExceeded) {
		// 注意:这里判断的是"传入的ctx超时"还是"单次rpc超时"
		// 如果是外层ctx超时,不该重试;如果是内层单次超时,可重试
		// 实战中用专用error包装
		return false
	}

	// 网络错误
	var netErr net.Error
	if errors.As(err, &netErr) {
		if netErr.Timeout() {
			return true
		}
	}

	// OpenAI SDK的HTTPStatusCode
	var apiErr *HTTPError
	if errors.As(err, &apiErr) {
		return isRetryableStatus(apiErr.StatusCode)
	}

	// 字符串fallback(不推荐,但真实世界很常见)
	msg := err.Error()
	if strings.Contains(msg, "connection reset") ||
		strings.Contains(msg, "broken pipe") {
		return true
	}

	return false
}

type HTTPError struct {
	StatusCode int
	Body       string
}

func (e *HTTPError) Error() string {
	return e.Body
}

func isRetryableStatus(code int) bool {
	switch code {
	case http.StatusTooManyRequests: // 429
		return true
	case http.StatusInternalServerError, // 500
		http.StatusBadGateway,          // 502
		http.StatusServiceUnavailable,  // 503
		http.StatusGatewayTimeout:      // 504
		return true
	}
	return false
}

✅ 版本2:Exponential Backoff + Jitter

// internal/retry/backoff.go
package retry

import (
	"context"
	"fmt"
	"math/rand"
	"time"
)

type Config struct {
	MaxAttempts int           // 最大尝试次数(含首次)
	BaseDelay   time.Duration // 初始延迟
	MaxDelay    time.Duration // 单次最大延迟
	Factor      float64       // 递增因子(推荐1.5)
	Jitter      bool          // 是否加随机
}

func Default() Config {
	return Config{
		MaxAttempts: 3,
		BaseDelay:   500 * time.Millisecond,
		MaxDelay:    30 * time.Second,
		Factor:      1.5,
		Jitter:      true,
	}
}

// Do 执行op,按config重试
func Do(ctx context.Context, cfg Config, op func(ctx context.Context) error) error {
	var lastErr error
	delay := cfg.BaseDelay

	for attempt := 0; attempt < cfg.MaxAttempts; attempt++ {
		// 检查ctx是否已取消
		if err := ctx.Err(); err != nil {
			return err
		}

		err := op(ctx)
		if err == nil {
			return nil
		}
		lastErr = err

		// 不可重试,立即返回
		if !IsRetryable(err) {
			return fmt.Errorf("non-retryable: %w", err)
		}

		// 最后一次尝试失败,不再sleep
		if attempt == cfg.MaxAttempts-1 {
			break
		}

		// 计算等待时间
		sleep := delay
		if cfg.Jitter {
			// jitter: 0.5x - 1.5x of delay
			sleep = time.Duration(float64(delay) * (0.5 + rand.Float64()))
		}

		// 等待,同时监听ctx
		select {
		case <-time.After(sleep):
		case <-ctx.Done():
			return ctx.Err()
		}

		// 下次延迟
		delay = time.Duration(float64(delay) * cfg.Factor)
		if delay > cfg.MaxDelay {
			delay = cfg.MaxDelay
		}
	}

	return fmt.Errorf("exhausted %d attempts, last error: %w", cfg.MaxAttempts, lastErr)
}

使用示例:

err := retry.Do(ctx, retry.Default(), func(ctx context.Context) error {
	return llmClient.Call(ctx, req)
})

反思:

  • 为什么ctx传给op而不是内部替换?→ 让op也能respect cancellation
  • 为什么最后一次不sleep?→ 没意义,立即返回错误
  • MaxDelay的作用?→ 防止指数爆炸(第10次就等分钟级了)

✅ 版本3:Idempotency Key - 内存版

// internal/idempotency/store.go
package idempotency

import (
	"context"
	"encoding/json"
	"errors"
	"sync"
	"time"
)

var ErrInProgress = errors.New("request in progress")

type Entry struct {
	Result    json.RawMessage
	Err       string // 空字符串表示成功
	CreatedAt time.Time
	Status    string // "pending" | "done"
}

type MemStore struct {
	mu   sync.Mutex
	data map[string]*Entry
	ttl  time.Duration
}

func NewMemStore(ttl time.Duration) *MemStore {
	s := &MemStore{
		data: make(map[string]*Entry),
		ttl:  ttl,
	}
	go s.gc()
	return s
}

// AcquireOrGet 占坑或返回已有结果
// 返回 (entry, isNew)。isNew=true表示本goroutine应该执行实际操作
func (s *MemStore) AcquireOrGet(key string) (*Entry, bool) {
	s.mu.Lock()
	defer s.mu.Unlock()

	if e, ok := s.data[key]; ok {
		return e, false
	}
	e := &Entry{Status: "pending", CreatedAt: time.Now()}
	s.data[key] = e
	return e, true
}

// Complete 写入最终结果
func (s *MemStore) Complete(key string, result json.RawMessage, opErr error) {
	s.mu.Lock()
	defer s.mu.Unlock()

	e, ok := s.data[key]
	if !ok {
		return
	}
	e.Result = result
	e.Status = "done"
	if opErr != nil {
		e.Err = opErr.Error()
	}
}

func (s *MemStore) gc() {
	ticker := time.NewTicker(time.Minute)
	defer ticker.Stop()
	for range ticker.C {
		s.mu.Lock()
		for k, e := range s.data {
			if time.Since(e.CreatedAt) > s.ttl {
				delete(s.data, k)
			}
		}
		s.mu.Unlock()
	}
}

使用:

func CreateTicket(ctx context.Context, store *idempotency.MemStore, key string, req TicketReq) (*TicketResp, error) {
	entry, isNew := store.AcquireOrGet(key)

	if !isNew {
		// 已有结果
		if entry.Status == "done" {
			if entry.Err != "" {
				return nil, errors.New(entry.Err)
			}
			var resp TicketResp
			json.Unmarshal(entry.Result, &resp)
			return &resp, nil
		}
		// pending,说明另一goroutine在处理——取决于策略:等、或者拒绝
		return nil, idempotency.ErrInProgress
	}

	// 新请求,实际创建
	resp, err := actuallyCreateTicket(ctx, req)
	raw, _ := json.Marshal(resp)
	store.Complete(key, raw, err)
	return resp, err
}

关键陷阱:

  • AcquireOrGet必须原子。锁不能漏。
  • Complete必须覆盖pending状态,否则其他请求永远卡在pending。
  • Panic安全:如果操作中途panic,status永远pending。用defer兜底。

✅ 版本4:Redis版Idempotency(生产推荐)

为什么要Redis?

  • 多实例部署时,内存Store在每个实例独立 → 去重失败
  • Redis有原子操作(SETNX)、过期、跨进程共享
// internal/idempotency/redis_store.go
package idempotency

import (
	"context"
	"encoding/json"
	"time"

	"github.com/redis/go-redis/v9"
)

type RedisStore struct {
	rdb *redis.Client
	ttl time.Duration
}

func NewRedisStore(rdb *redis.Client, ttl time.Duration) *RedisStore {
	return &RedisStore{rdb: rdb, ttl: ttl}
}

// TryAcquire 原子占坑;返回true表示获得锁
func (s *RedisStore) TryAcquire(ctx context.Context, key string) (bool, error) {
	// SETNX: set if not exists
	ok, err := s.rdb.SetNX(ctx, "idem:"+key, "pending", s.ttl).Result()
	return ok, err
}

// Get 查询已存结果
func (s *RedisStore) Get(ctx context.Context, key string) (*Entry, error) {
	val, err := s.rdb.Get(ctx, "idem:"+key).Bytes()
	if err == redis.Nil {
		return nil, nil
	}
	if err != nil {
		return nil, err
	}
	if string(val) == "pending" {
		return &Entry{Status: "pending"}, nil
	}
	var e Entry
	if err := json.Unmarshal(val, &e); err != nil {
		return nil, err
	}
	return &e, nil
}

// Complete 覆盖为最终结果
func (s *RedisStore) Complete(ctx context.Context, key string, result json.RawMessage, opErr error) error {
	e := Entry{Result: result, Status: "done", CreatedAt: time.Now()}
	if opErr != nil {
		e.Err = opErr.Error()
	}
	raw, _ := json.Marshal(e)
	return s.rdb.Set(ctx, "idem:"+key, raw, s.ttl).Err()
}

完整调用链:

func CreateTicketIdempotent(ctx context.Context, s *idempotency.RedisStore, key string, req TicketReq) (*TicketResp, error) {
	// 1. 查是否已有结果
	if e, err := s.Get(ctx, key); err == nil && e != nil && e.Status == "done" {
		if e.Err != "" {
			return nil, errors.New(e.Err)
		}
		var resp TicketResp
		json.Unmarshal(e.Result, &resp)
		return &resp, nil
	}

	// 2. 原子占坑
	acquired, err := s.TryAcquire(ctx, key)
	if err != nil {
		return nil, err
	}
	if !acquired {
		// 别的goroutine在处理或刚完成;轮询等待
		return waitForResult(ctx, s, key, 10*time.Second)
	}

	// 3. 执行并写回
	resp, opErr := actuallyCreateTicket(ctx, req)
	raw, _ := json.Marshal(resp)
	s.Complete(ctx, key, raw, opErr)

	return resp, opErr
}

✅ 版本5:Idempotency Key怎么生成?

不推荐: uuid.New() - 每次都不一样,达不到去重效果

推荐: 基于业务语义的hash

// internal/idempotency/key.go
package idempotency

import (
	"crypto/sha256"
	"encoding/hex"
	"fmt"
)

// KeyFor 为 create_ticket 生成key
// 业务语义:同一user的同一request_id在一段窗口内视为同一操作
func KeyFor(userID, requestID, operation string) string {
	raw := fmt.Sprintf("%s|%s|%s", userID, requestID, operation)
	h := sha256.Sum256([]byte(raw))
	return hex.EncodeToString(h[:])
}

关键决策: 哪些字段进key?

  • ✅ user_id + request_id + operation
  • ❌ timestamp(每次都不同)
  • ❌ 整个request body(某个无关字段变了就漏了)

✅ 版本6:组合使用——Retry + Idempotency

func (a *Agent) CreateTicketSafe(ctx context.Context, userID, reqID string, req TicketReq) (*TicketResp, error) {
	key := idempotency.KeyFor(userID, reqID, "create_ticket")

	var resp *TicketResp
	err := retry.Do(ctx, retry.Default(), func(ctx context.Context) error {
		r, err := CreateTicketIdempotent(ctx, a.store, key, req)
		if err != nil {
			return err
		}
		resp = r
		return nil
	})
	return resp, err
}

为什么这样组合安全?

  • Retry可能导致N次实际请求到下游
  • Idempotency保证下游只真正执行1次
  • 第2-N次重试会看到"done"状态,返回相同结果

第三部分:关键概念

1. At-least-once vs Exactly-once

语义 含义 实现代价
At-most-once 最多1次,可能丢 简单,不可靠
At-least-once 至少1次,可能重复 Retry天然提供
Exactly-once 恰好1次 不存在于分布式,只能在应用层伪装(idempotency)

Agent系统的正确模型:at-least-once delivery + idempotent handlers = "effective exactly-once"


2. Retry Storm与Circuit Breaker

Retry Storm:下游挂了,上游所有客户端都在重试 → 下游永远起不来。

防御:

  1. 限制总延迟:所有重试加起来不超过上游SLA的50%
  2. Circuit Breaker:连续N次失败后,直接拒绝请求一段时间(让下游喘气)
  3. Load Shedding:overload时主动放弃低优先级请求

(Day 17会专门讲Circuit Breaker)


3. Context的三层含义

ctx, cancel := context.WithTimeout(parentCtx, 3*time.Second)
defer cancel()
  • ctx.Done() chan:关闭即表示取消
  • ctx.Err():取消原因(Canceled or DeadlineExceeded
  • ctx.Value(key):传递request-scoped数据(如request_id)

规则:

  • 永远把ctx作为函数第一个参数
  • 永远defer cancel()
  • 不要在struct里存ctx(除非你知道自己在做什么)

4. 时钟偏移与TTL

Redis的TTL依赖时间;如果多个服务器时钟偏差大,会出现奇怪行为。生产环境确保NTP同步,TTL不要设置过短(<1分钟时要小心)。


第四部分:自测清单

  • 我能分类5种常见错误的retryable性
  • 写出exponential backoff的公式,解释为什么要jitter
  • 解释context.Canceledcontext.DeadlineExceeded的区别
  • 描述idempotency key在Redis里的生命周期
  • 如果Complete之前进程崩溃,会发生什么?怎么恢复?
  • 两个goroutine同时TryAcquire同一key,谁赢?靠什么保证?

第五部分:作业

任务1:实现retry package

  • 完成internal/retry/errors.gobackoff.go
  • Table-driven test覆盖:成功、立即失败、N次失败后成功、ctx超时退出

任务2:实现idempotency(内存+Redis二选一或都做)

  • 原子占坑
  • Status转换:pending → done
  • TTL过期
  • 并发测试:100 goroutines 同key → 只有1个实际执行

任务3:集成到create_ticket tool

  • Agent调用create_ticket时自动走idempotency
  • 手动测试:同一query重复发送2次 → 只创建1个ticket

任务4:思考题

  • 如果下游create_ticket超时了但实际已创建成功(常见!),idempotency key此时记录的是什么?下次重试会怎样?
  • 如果Redis挂了,你的服务应该怎么降级?

第六部分:常见问题解答

Q1: IsRetryable里为什么DeadlineExceeded不重试?

A: 这里有微妙区分:

  • 外层ctx(用户总超时) DeadlineExceeded → 绝不重试,用户已经等不及
  • 内层单次rpc超时 → 可重试

实战中用包装error区分:

var ErrRPCTimeout = errors.New("single rpc timeout")
if errors.Is(err, ErrRPCTimeout) { return true }

或者在Do循环中首先checkctx.Err(),只要外层ctx挂了就立刻退出。本文版本2的Do已经这样做。


Q2: Idempotency key的TTL设多长?

A: 权衡:

  • 太短(<5分钟):重试窗口可能错过已存记录 → 去重失效
  • 太长(>24小时):Redis空间浪费,且业务语义可能已变

推荐:24小时,对齐客户端重试窗口上限。


Q3: Pending状态的请求,其他请求应该等还是拒绝?

A: 两种策略:

  • Wait:poll直到done或超时。用户体验好,但可能超时
  • Reject with 409 Conflict:立刻返回,客户端自己决定是否重试

生产推荐:短poll(最多等5秒),超时后返回ErrInProgress,客户端用相同key重试。


Q4: 如果处理过程中进程crash了,pending永远不会变成done,后续请求永远卡住?

A: 这就是为什么要有TTL。即使进程崩溃,key会在TTL后自动过期,下次请求重新占坑。代价:这段时间内可能被重复执行1次。

更严格的方案:用Redlock或leased lock,handler定期续期,崩溃后快速释放。但90%场景TTL足够。


Q5: 为什么不用数据库的UNIQUE约束做去重?

A: 可以!如果你已经把idempotency_key写到DB的unique column上:

CREATE UNIQUE INDEX idx_idem ON tickets(idempotency_key);

第二次INSERT会失败 → 捕获unique error → 读取已有记录返回。这是更强的保证(和业务数据ACID),但仅限业务数据落DB的场景。Redis方案更通用。


配套算法题

今天的主题:拓扑排序 / 多源BFS / 反向思维。对应重试依赖调度的思想。

题1:Course Schedule (LeetCode 207) - Medium

// 判断课程依赖图是否有环
func canFinish(numCourses int, prerequisites [][]int) bool {
	graph := make([][]int, numCourses)
	indegree := make([]int, numCourses)
	for _, p := range prerequisites {
		graph[p[1]] = append(graph[p[1]], p[0])
		indegree[p[0]]++
	}

	queue := []int{}
	for i := 0; i < numCourses; i++ {
		if indegree[i] == 0 {
			queue = append(queue, i)
		}
	}

	visited := 0
	for len(queue) > 0 {
		c := queue[0]
		queue = queue[1:]
		visited++
		for _, next := range graph[c] {
			indegree[next]--
			if indegree[next] == 0 {
				queue = append(queue, next)
			}
		}
	}
	return visited == numCourses
}

讲解: Kahn拓扑排序。核心思想:不断移除入度为0的节点。如果所有节点都能移除 → 无环。

联系Agent: Tool调用依赖图(tool A的输出是tool B的输入)判断能否完成执行。


题2:Rotting Oranges (LeetCode 994) - Medium

// 多源BFS:每分钟腐烂的橘子扩散到相邻
func orangesRotting(grid [][]int) int {
	m, n := len(grid), len(grid[0])
	queue := [][2]int{}
	fresh := 0

	for i := 0; i < m; i++ {
		for j := 0; j < n; j++ {
			if grid[i][j] == 2 {
				queue = append(queue, [2]int{i, j})
			} else if grid[i][j] == 1 {
				fresh++
			}
		}
	}

	dirs := [][2]int{{1, 0}, {-1, 0}, {0, 1}, {0, -1}}
	minutes := 0
	for len(queue) > 0 && fresh > 0 {
		size := len(queue)
		for i := 0; i < size; i++ {
			c := queue[0]
			queue = queue[1:]
			for _, d := range dirs {
				ni, nj := c[0]+d[0], c[1]+d[1]
				if ni < 0 || ni >= m || nj < 0 || nj >= n || grid[ni][nj] != 1 {
					continue
				}
				grid[ni][nj] = 2
				fresh--
				queue = append(queue, [2]int{ni, nj})
			}
		}
		minutes++
	}

	if fresh > 0 {
		return -1
	}
	return minutes
}

讲解: 多源BFS的经典模板。关键:所有初始腐烂橘子同时作为起点入队,然后按"分钟"一层层扩散。

联系Agent: 多个重试任务同时在backoff,每轮调度对应BFS的"一层"。


题3:Pacific Atlantic Water Flow (LeetCode 417) - Medium

// 找出所有既能流向太平洋也能流向大西洋的点
func pacificAtlantic(heights [][]int) [][]int {
	m, n := len(heights), len(heights[0])
	pacific := make([][]bool, m)
	atlantic := make([][]bool, m)
	for i := range pacific {
		pacific[i] = make([]bool, n)
		atlantic[i] = make([]bool, n)
	}

	var dfs func(i, j int, visited [][]bool)
	dfs = func(i, j int, visited [][]bool) {
		visited[i][j] = true
		dirs := [][2]int{{1, 0}, {-1, 0}, {0, 1}, {0, -1}}
		for _, d := range dirs {
			ni, nj := i+d[0], j+d[1]
			if ni < 0 || ni >= m || nj < 0 || nj >= n || visited[ni][nj] {
				continue
			}
			if heights[ni][nj] >= heights[i][j] { // 反向:高度非递减
				dfs(ni, nj, visited)
			}
		}
	}

	for i := 0; i < m; i++ {
		dfs(i, 0, pacific)
		dfs(i, n-1, atlantic)
	}
	for j := 0; j < n; j++ {
		dfs(0, j, pacific)
		dfs(m-1, j, atlantic)
	}

	var res [][]int
	for i := 0; i < m; i++ {
		for j := 0; j < n; j++ {
			if pacific[i][j] && atlantic[i][j] {
				res = append(res, []int{i, j})
			}
		}
	}
	return res
}

讲解: 反向思维!不从每个内部点搜索能不能到达海洋(O(m²n²)),而是从海洋反向DFS到内陆(O(mn))。两个海洋各一次,取交集。

联系Agent: 错误恢复时,不从每个失败请求正向猜原因,而是从"已知可重试错误"反向匹配——更高效。


题4(选做):Reconstruct Itinerary (LeetCode 332) - Hard

欧拉路径 + DFS。机票列表拼成行程。用priority queue保证字典序。


下一步:Day 7 预告

明天是第1周Mock,你将:

  1. 验收v0.1交付物清单
  2. 自查代码:race condition / nil panic / goroutine泄漏 / deadlock
  3. 完整回答"设计一个客服Agent系统"
  4. 练习30秒和2分钟的pitch
  5. Week 1知识自测 + Week 2预告

准备:

  • 把Day 1-6的代码跑起来,go test ./...全绿
  • 想想:如果面试官问你"v0.1解决了什么问题?",你30秒能讲完吗?