Week 1 Day 6:Retry / Backoff / Idempotency - 苏格拉底教学
💡 一句话核心:分布式系统没有"成功",只有"你以为的成功"。Retry是韧性,Idempotency是安全带——少了任何一个,Agent都会在生产环境制造灾难。
学习目标
- 分辨哪些错误该重试、哪些必须立刻放弃
- 实现Exponential backoff并正确处理
ctx.Done()
- 理解"至少一次"语义,用idempotency key解决重复副作用
- 用Sync.Map / Redis实现去重存储
- 掌握分布式重试的常见陷阱(retry storm、雪崩、时钟偏移)
第一部分:问题驱动
🤔 问题1:什么错误该重试?
场景: 你的Agent调用OpenAI,返回了以下错误,哪些该重试?
| # |
错误 |
HTTP状态 |
该重试吗? |
| 1 |
context deadline exceeded |
- |
✅ 可能是网络抖动 |
| 2 |
rate_limit_exceeded (429) |
429 |
✅ 但要backoff |
| 3 |
invalid_api_key (401) |
401 |
❌ 重试100次也是401 |
| 4 |
invalid_request_error: max_tokens exceeded (400) |
400 |
❌ 参数错了 |
| 5 |
server_error (500) |
500 |
✅ 服务端临时故障 |
| 6 |
bad_gateway (502/503/504) |
5xx |
✅ 通常是upstream短暂不可用 |
| 7 |
permission_denied (403) |
403 |
❌ 权限问题重试无用 |
| 8 |
TCP connection refused |
- |
✅ 目标还没起来 |
分类原则:
- Transient(瞬时错误)→ 可重试:timeout、rate_limit、5xx、网络抖动
- Non-retryable(永久错误)→ 立刻返回:4xx(除429)、invalid input、permission denied、not found
为什么不能全部重试?
- 浪费成本:每次LLM调用都是真金白银
- 延迟放大:3次重试每次3秒 = 用户等9秒
- 故障放大:下游挂了,你重试 = retry storm雪崩
🤔 问题2:为什么要Exponential Backoff?
反问: 如果每次失败立即重试,会发生什么?
假设下游因为过载返回了429,1000个客户端同时失败,如果都立即重试→下游立刻收到1000个重复请求→更崩。
Exponential Backoff = 指数延迟:
- 第1次重试:等1s
- 第2次重试:等1.5s(1 × 1.5)
- 第3次重试:等2.25s(1.5 × 1.5)
- ...
为什么是1.5而不是2?
- factor=2更激进,适合下游完全宕机的场景
- factor=1.5温和,适合rate limit类故障
- 业界常用值:1.5-2.0
Jitter(抖动)不能少:
如果所有客户端都等相同时间,它们会在同一刻再次集体涌入下游(Thundering Herd)。解决方案:加随机jitter。
backoff := baseDelay * factor^attempt
backoff = backoff * (0.5 + rand.Float64()*0.5) // 加50%-100%随机
🤔 问题3:为什么要检查ctx.Done()?
场景: 用户请求超时3秒。Agent第1次调用OpenAI失败,等1s后重试,又失败,等1.5s再重试...
如果不看ctx,你会在用户已经放弃的情况下继续重试,浪费资源,还可能污染后续的状态(创建了工单但用户看不到)。
正确做法:
select {
case <-time.After(backoff):
// 继续重试
case <-ctx.Done():
return ctx.Err() // 立即退出
}
🤔 问题4:什么是幂等性?为什么Agent尤其需要?
场景: Agent调用create_ticket(title="login issue"),请求发出但响应因网络抖动丢失。重试后,ticket系统收到2个相同请求,创建了2个工单。
幂等性:同一操作执行N次,效果与执行1次相同。
天然幂等 vs 非幂等:
- GET /user/123 → 幂等
- DELETE /ticket/456 → 幂等(第2次返回404也算幂等)
- POST /tickets {...} → 不幂等!每次都创建新工单
解决方案:Idempotency Key
客户端为每个操作生成唯一ID,服务端用该ID去重:
Request 1: POST /tickets Idempotency-Key: abc-123
Server: 创建ticket_id=789, 记录 abc-123 → 789
Request 2 (重试): POST /tickets Idempotency-Key: abc-123
Server: 查到abc-123已处理,返回上次的结果 ticket_id=789
关键: Idempotency Key由客户端生成(基于业务语义),不是服务端。
第二部分:动手实现
✅ 版本1:错误分类
// internal/retry/errors.go
package retry
import (
"context"
"errors"
"net"
"net/http"
"strings"
)
// IsRetryable 判断错误是否可重试
func IsRetryable(err error) bool {
if err == nil {
return false
}
// Context取消/超时:区分处理
if errors.Is(err, context.Canceled) {
return false // 用户主动取消
}
if errors.Is(err, context.DeadlineExceeded) {
// 注意:这里判断的是"传入的ctx超时"还是"单次rpc超时"
// 如果是外层ctx超时,不该重试;如果是内层单次超时,可重试
// 实战中用专用error包装
return false
}
// 网络错误
var netErr net.Error
if errors.As(err, &netErr) {
if netErr.Timeout() {
return true
}
}
// OpenAI SDK的HTTPStatusCode
var apiErr *HTTPError
if errors.As(err, &apiErr) {
return isRetryableStatus(apiErr.StatusCode)
}
// 字符串fallback(不推荐,但真实世界很常见)
msg := err.Error()
if strings.Contains(msg, "connection reset") ||
strings.Contains(msg, "broken pipe") {
return true
}
return false
}
type HTTPError struct {
StatusCode int
Body string
}
func (e *HTTPError) Error() string {
return e.Body
}
func isRetryableStatus(code int) bool {
switch code {
case http.StatusTooManyRequests: // 429
return true
case http.StatusInternalServerError, // 500
http.StatusBadGateway, // 502
http.StatusServiceUnavailable, // 503
http.StatusGatewayTimeout: // 504
return true
}
return false
}
✅ 版本2:Exponential Backoff + Jitter
// internal/retry/backoff.go
package retry
import (
"context"
"fmt"
"math/rand"
"time"
)
type Config struct {
MaxAttempts int // 最大尝试次数(含首次)
BaseDelay time.Duration // 初始延迟
MaxDelay time.Duration // 单次最大延迟
Factor float64 // 递增因子(推荐1.5)
Jitter bool // 是否加随机
}
func Default() Config {
return Config{
MaxAttempts: 3,
BaseDelay: 500 * time.Millisecond,
MaxDelay: 30 * time.Second,
Factor: 1.5,
Jitter: true,
}
}
// Do 执行op,按config重试
func Do(ctx context.Context, cfg Config, op func(ctx context.Context) error) error {
var lastErr error
delay := cfg.BaseDelay
for attempt := 0; attempt < cfg.MaxAttempts; attempt++ {
// 检查ctx是否已取消
if err := ctx.Err(); err != nil {
return err
}
err := op(ctx)
if err == nil {
return nil
}
lastErr = err
// 不可重试,立即返回
if !IsRetryable(err) {
return fmt.Errorf("non-retryable: %w", err)
}
// 最后一次尝试失败,不再sleep
if attempt == cfg.MaxAttempts-1 {
break
}
// 计算等待时间
sleep := delay
if cfg.Jitter {
// jitter: 0.5x - 1.5x of delay
sleep = time.Duration(float64(delay) * (0.5 + rand.Float64()))
}
// 等待,同时监听ctx
select {
case <-time.After(sleep):
case <-ctx.Done():
return ctx.Err()
}
// 下次延迟
delay = time.Duration(float64(delay) * cfg.Factor)
if delay > cfg.MaxDelay {
delay = cfg.MaxDelay
}
}
return fmt.Errorf("exhausted %d attempts, last error: %w", cfg.MaxAttempts, lastErr)
}
使用示例:
err := retry.Do(ctx, retry.Default(), func(ctx context.Context) error {
return llmClient.Call(ctx, req)
})
反思:
- 为什么
ctx传给op而不是内部替换?→ 让op也能respect cancellation
- 为什么最后一次不sleep?→ 没意义,立即返回错误
- MaxDelay的作用?→ 防止指数爆炸(第10次就等分钟级了)
✅ 版本3:Idempotency Key - 内存版
// internal/idempotency/store.go
package idempotency
import (
"context"
"encoding/json"
"errors"
"sync"
"time"
)
var ErrInProgress = errors.New("request in progress")
type Entry struct {
Result json.RawMessage
Err string // 空字符串表示成功
CreatedAt time.Time
Status string // "pending" | "done"
}
type MemStore struct {
mu sync.Mutex
data map[string]*Entry
ttl time.Duration
}
func NewMemStore(ttl time.Duration) *MemStore {
s := &MemStore{
data: make(map[string]*Entry),
ttl: ttl,
}
go s.gc()
return s
}
// AcquireOrGet 占坑或返回已有结果
// 返回 (entry, isNew)。isNew=true表示本goroutine应该执行实际操作
func (s *MemStore) AcquireOrGet(key string) (*Entry, bool) {
s.mu.Lock()
defer s.mu.Unlock()
if e, ok := s.data[key]; ok {
return e, false
}
e := &Entry{Status: "pending", CreatedAt: time.Now()}
s.data[key] = e
return e, true
}
// Complete 写入最终结果
func (s *MemStore) Complete(key string, result json.RawMessage, opErr error) {
s.mu.Lock()
defer s.mu.Unlock()
e, ok := s.data[key]
if !ok {
return
}
e.Result = result
e.Status = "done"
if opErr != nil {
e.Err = opErr.Error()
}
}
func (s *MemStore) gc() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for range ticker.C {
s.mu.Lock()
for k, e := range s.data {
if time.Since(e.CreatedAt) > s.ttl {
delete(s.data, k)
}
}
s.mu.Unlock()
}
}
使用:
func CreateTicket(ctx context.Context, store *idempotency.MemStore, key string, req TicketReq) (*TicketResp, error) {
entry, isNew := store.AcquireOrGet(key)
if !isNew {
// 已有结果
if entry.Status == "done" {
if entry.Err != "" {
return nil, errors.New(entry.Err)
}
var resp TicketResp
json.Unmarshal(entry.Result, &resp)
return &resp, nil
}
// pending,说明另一goroutine在处理——取决于策略:等、或者拒绝
return nil, idempotency.ErrInProgress
}
// 新请求,实际创建
resp, err := actuallyCreateTicket(ctx, req)
raw, _ := json.Marshal(resp)
store.Complete(key, raw, err)
return resp, err
}
关键陷阱:
AcquireOrGet必须原子。锁不能漏。
Complete必须覆盖pending状态,否则其他请求永远卡在pending。
- Panic安全:如果操作中途panic,status永远pending。用
defer兜底。
✅ 版本4:Redis版Idempotency(生产推荐)
为什么要Redis?
- 多实例部署时,内存Store在每个实例独立 → 去重失败
- Redis有原子操作(SETNX)、过期、跨进程共享
// internal/idempotency/redis_store.go
package idempotency
import (
"context"
"encoding/json"
"time"
"github.com/redis/go-redis/v9"
)
type RedisStore struct {
rdb *redis.Client
ttl time.Duration
}
func NewRedisStore(rdb *redis.Client, ttl time.Duration) *RedisStore {
return &RedisStore{rdb: rdb, ttl: ttl}
}
// TryAcquire 原子占坑;返回true表示获得锁
func (s *RedisStore) TryAcquire(ctx context.Context, key string) (bool, error) {
// SETNX: set if not exists
ok, err := s.rdb.SetNX(ctx, "idem:"+key, "pending", s.ttl).Result()
return ok, err
}
// Get 查询已存结果
func (s *RedisStore) Get(ctx context.Context, key string) (*Entry, error) {
val, err := s.rdb.Get(ctx, "idem:"+key).Bytes()
if err == redis.Nil {
return nil, nil
}
if err != nil {
return nil, err
}
if string(val) == "pending" {
return &Entry{Status: "pending"}, nil
}
var e Entry
if err := json.Unmarshal(val, &e); err != nil {
return nil, err
}
return &e, nil
}
// Complete 覆盖为最终结果
func (s *RedisStore) Complete(ctx context.Context, key string, result json.RawMessage, opErr error) error {
e := Entry{Result: result, Status: "done", CreatedAt: time.Now()}
if opErr != nil {
e.Err = opErr.Error()
}
raw, _ := json.Marshal(e)
return s.rdb.Set(ctx, "idem:"+key, raw, s.ttl).Err()
}
完整调用链:
func CreateTicketIdempotent(ctx context.Context, s *idempotency.RedisStore, key string, req TicketReq) (*TicketResp, error) {
// 1. 查是否已有结果
if e, err := s.Get(ctx, key); err == nil && e != nil && e.Status == "done" {
if e.Err != "" {
return nil, errors.New(e.Err)
}
var resp TicketResp
json.Unmarshal(e.Result, &resp)
return &resp, nil
}
// 2. 原子占坑
acquired, err := s.TryAcquire(ctx, key)
if err != nil {
return nil, err
}
if !acquired {
// 别的goroutine在处理或刚完成;轮询等待
return waitForResult(ctx, s, key, 10*time.Second)
}
// 3. 执行并写回
resp, opErr := actuallyCreateTicket(ctx, req)
raw, _ := json.Marshal(resp)
s.Complete(ctx, key, raw, opErr)
return resp, opErr
}
✅ 版本5:Idempotency Key怎么生成?
不推荐: uuid.New() - 每次都不一样,达不到去重效果
推荐: 基于业务语义的hash
// internal/idempotency/key.go
package idempotency
import (
"crypto/sha256"
"encoding/hex"
"fmt"
)
// KeyFor 为 create_ticket 生成key
// 业务语义:同一user的同一request_id在一段窗口内视为同一操作
func KeyFor(userID, requestID, operation string) string {
raw := fmt.Sprintf("%s|%s|%s", userID, requestID, operation)
h := sha256.Sum256([]byte(raw))
return hex.EncodeToString(h[:])
}
关键决策: 哪些字段进key?
- ✅ user_id + request_id + operation
- ❌ timestamp(每次都不同)
- ❌ 整个request body(某个无关字段变了就漏了)
✅ 版本6:组合使用——Retry + Idempotency
func (a *Agent) CreateTicketSafe(ctx context.Context, userID, reqID string, req TicketReq) (*TicketResp, error) {
key := idempotency.KeyFor(userID, reqID, "create_ticket")
var resp *TicketResp
err := retry.Do(ctx, retry.Default(), func(ctx context.Context) error {
r, err := CreateTicketIdempotent(ctx, a.store, key, req)
if err != nil {
return err
}
resp = r
return nil
})
return resp, err
}
为什么这样组合安全?
- Retry可能导致N次实际请求到下游
- Idempotency保证下游只真正执行1次
- 第2-N次重试会看到"done"状态,返回相同结果
第三部分:关键概念
1. At-least-once vs Exactly-once
| 语义 |
含义 |
实现代价 |
| At-most-once |
最多1次,可能丢 |
简单,不可靠 |
| At-least-once |
至少1次,可能重复 |
Retry天然提供 |
| Exactly-once |
恰好1次 |
不存在于分布式,只能在应用层伪装(idempotency) |
Agent系统的正确模型:at-least-once delivery + idempotent handlers = "effective exactly-once"
2. Retry Storm与Circuit Breaker
Retry Storm:下游挂了,上游所有客户端都在重试 → 下游永远起不来。
防御:
- 限制总延迟:所有重试加起来不超过上游SLA的50%
- Circuit Breaker:连续N次失败后,直接拒绝请求一段时间(让下游喘气)
- Load Shedding:overload时主动放弃低优先级请求
(Day 17会专门讲Circuit Breaker)
3. Context的三层含义
ctx, cancel := context.WithTimeout(parentCtx, 3*time.Second)
defer cancel()
ctx.Done() chan:关闭即表示取消
ctx.Err():取消原因(Canceled or DeadlineExceeded)
ctx.Value(key):传递request-scoped数据(如request_id)
规则:
- 永远把ctx作为函数第一个参数
- 永远
defer cancel()
- 不要在struct里存ctx(除非你知道自己在做什么)
4. 时钟偏移与TTL
Redis的TTL依赖时间;如果多个服务器时钟偏差大,会出现奇怪行为。生产环境确保NTP同步,TTL不要设置过短(<1分钟时要小心)。
第四部分:自测清单
第五部分:作业
任务1:实现retry package
任务2:实现idempotency(内存+Redis二选一或都做)
任务3:集成到create_ticket tool
任务4:思考题
- 如果下游create_ticket超时了但实际已创建成功(常见!),idempotency key此时记录的是什么?下次重试会怎样?
- 如果Redis挂了,你的服务应该怎么降级?
第六部分:常见问题解答
Q1: IsRetryable里为什么DeadlineExceeded不重试?
A: 这里有微妙区分:
- 外层ctx(用户总超时) DeadlineExceeded → 绝不重试,用户已经等不及
- 内层单次rpc超时 → 可重试
实战中用包装error区分:
var ErrRPCTimeout = errors.New("single rpc timeout")
if errors.Is(err, ErrRPCTimeout) { return true }
或者在Do循环中首先checkctx.Err(),只要外层ctx挂了就立刻退出。本文版本2的Do已经这样做。
Q2: Idempotency key的TTL设多长?
A: 权衡:
- 太短(<5分钟):重试窗口可能错过已存记录 → 去重失效
- 太长(>24小时):Redis空间浪费,且业务语义可能已变
推荐:24小时,对齐客户端重试窗口上限。
Q3: Pending状态的请求,其他请求应该等还是拒绝?
A: 两种策略:
- Wait:poll直到done或超时。用户体验好,但可能超时
- Reject with 409 Conflict:立刻返回,客户端自己决定是否重试
生产推荐:短poll(最多等5秒),超时后返回ErrInProgress,客户端用相同key重试。
Q4: 如果处理过程中进程crash了,pending永远不会变成done,后续请求永远卡住?
A: 这就是为什么要有TTL。即使进程崩溃,key会在TTL后自动过期,下次请求重新占坑。代价:这段时间内可能被重复执行1次。
更严格的方案:用Redlock或leased lock,handler定期续期,崩溃后快速释放。但90%场景TTL足够。
Q5: 为什么不用数据库的UNIQUE约束做去重?
A: 可以!如果你已经把idempotency_key写到DB的unique column上:
CREATE UNIQUE INDEX idx_idem ON tickets(idempotency_key);
第二次INSERT会失败 → 捕获unique error → 读取已有记录返回。这是更强的保证(和业务数据ACID),但仅限业务数据落DB的场景。Redis方案更通用。
配套算法题
今天的主题:拓扑排序 / 多源BFS / 反向思维。对应重试依赖调度的思想。
题1:Course Schedule (LeetCode 207) - Medium
// 判断课程依赖图是否有环
func canFinish(numCourses int, prerequisites [][]int) bool {
graph := make([][]int, numCourses)
indegree := make([]int, numCourses)
for _, p := range prerequisites {
graph[p[1]] = append(graph[p[1]], p[0])
indegree[p[0]]++
}
queue := []int{}
for i := 0; i < numCourses; i++ {
if indegree[i] == 0 {
queue = append(queue, i)
}
}
visited := 0
for len(queue) > 0 {
c := queue[0]
queue = queue[1:]
visited++
for _, next := range graph[c] {
indegree[next]--
if indegree[next] == 0 {
queue = append(queue, next)
}
}
}
return visited == numCourses
}
讲解: Kahn拓扑排序。核心思想:不断移除入度为0的节点。如果所有节点都能移除 → 无环。
联系Agent: Tool调用依赖图(tool A的输出是tool B的输入)判断能否完成执行。
题2:Rotting Oranges (LeetCode 994) - Medium
// 多源BFS:每分钟腐烂的橘子扩散到相邻
func orangesRotting(grid [][]int) int {
m, n := len(grid), len(grid[0])
queue := [][2]int{}
fresh := 0
for i := 0; i < m; i++ {
for j := 0; j < n; j++ {
if grid[i][j] == 2 {
queue = append(queue, [2]int{i, j})
} else if grid[i][j] == 1 {
fresh++
}
}
}
dirs := [][2]int{{1, 0}, {-1, 0}, {0, 1}, {0, -1}}
minutes := 0
for len(queue) > 0 && fresh > 0 {
size := len(queue)
for i := 0; i < size; i++ {
c := queue[0]
queue = queue[1:]
for _, d := range dirs {
ni, nj := c[0]+d[0], c[1]+d[1]
if ni < 0 || ni >= m || nj < 0 || nj >= n || grid[ni][nj] != 1 {
continue
}
grid[ni][nj] = 2
fresh--
queue = append(queue, [2]int{ni, nj})
}
}
minutes++
}
if fresh > 0 {
return -1
}
return minutes
}
讲解: 多源BFS的经典模板。关键:所有初始腐烂橘子同时作为起点入队,然后按"分钟"一层层扩散。
联系Agent: 多个重试任务同时在backoff,每轮调度对应BFS的"一层"。
题3:Pacific Atlantic Water Flow (LeetCode 417) - Medium
// 找出所有既能流向太平洋也能流向大西洋的点
func pacificAtlantic(heights [][]int) [][]int {
m, n := len(heights), len(heights[0])
pacific := make([][]bool, m)
atlantic := make([][]bool, m)
for i := range pacific {
pacific[i] = make([]bool, n)
atlantic[i] = make([]bool, n)
}
var dfs func(i, j int, visited [][]bool)
dfs = func(i, j int, visited [][]bool) {
visited[i][j] = true
dirs := [][2]int{{1, 0}, {-1, 0}, {0, 1}, {0, -1}}
for _, d := range dirs {
ni, nj := i+d[0], j+d[1]
if ni < 0 || ni >= m || nj < 0 || nj >= n || visited[ni][nj] {
continue
}
if heights[ni][nj] >= heights[i][j] { // 反向:高度非递减
dfs(ni, nj, visited)
}
}
}
for i := 0; i < m; i++ {
dfs(i, 0, pacific)
dfs(i, n-1, atlantic)
}
for j := 0; j < n; j++ {
dfs(0, j, pacific)
dfs(m-1, j, atlantic)
}
var res [][]int
for i := 0; i < m; i++ {
for j := 0; j < n; j++ {
if pacific[i][j] && atlantic[i][j] {
res = append(res, []int{i, j})
}
}
}
return res
}
讲解: 反向思维!不从每个内部点搜索能不能到达海洋(O(m²n²)),而是从海洋反向DFS到内陆(O(mn))。两个海洋各一次,取交集。
联系Agent: 错误恢复时,不从每个失败请求正向猜原因,而是从"已知可重试错误"反向匹配——更高效。
题4(选做):Reconstruct Itinerary (LeetCode 332) - Hard
欧拉路径 + DFS。机票列表拼成行程。用priority queue保证字典序。
下一步:Day 7 预告
明天是第1周Mock,你将:
- 验收v0.1交付物清单
- 自查代码:race condition / nil panic / goroutine泄漏 / deadlock
- 完整回答"设计一个客服Agent系统"
- 练习30秒和2分钟的pitch
- Week 1知识自测 + Week 2预告
准备:
- 把Day 1-6的代码跑起来,
go test ./...全绿
- 想想:如果面试官问你"v0.1解决了什么问题?",你30秒能讲完吗?