Week 4 Day 25:成本与延迟优化 - 苏格拉底教学

从这一天开始,你的系统已经能跑通核心链路。但跑通和跑好是两回事。一个被用户嫌慢、被老板嫌贵的 Agent,离被关掉不远了。今天你要学会让它又快又省。


第一部分:问题驱动

🤔 问题1:你的 Agent 一个月花多少钱?

引导问题:

  1. 你调用了哪些模型?每次调用的 input/output tokens 分别多少?
  2. 如果一天有 1000 个用户,每人平均发 5 条消息,会产生多少次 LLM 调用?
  3. 成本的大头在哪里——input tokens 还是 output tokens?

答案揭示:

以 GPT-4o 为例(截至 2025 年的参考价格):

  • Input: $2.50 / 1M tokens
  • Output: $10.00 / 1M tokens
  • Embedding (text-embedding-3-small): $0.02 / 1M tokens

估算一次 Customer Support 对话:

System prompt: ~500 tokens RAG retrieved docs: ~1500 tokens User message: ~100 tokens LLM response: ~300 tokens Input tokens: 2100 → $0.00525 Output tokens: 300 → $0.003 单次对话成本: ~$0.008

月成本推算:

1000 用户/天 × 5 条/用户 × 30 天 = 150,000 次对话 150,000 × $0.008 = $1,200/月 加上 embedding (每文档 ~500 tokens,10K 文档): 10,000 × 500 / 1,000,000 × $0.02 = $0.10(一次性)

反思: 如果把 system prompt 从 500 tokens 压缩到 200 tokens,一个月省多少?


🤔 问题2:用户感受到的延迟从哪里来?

让用户感到慢的三个地方:

  1. 第一个 token 出现之前(LLM 在"思考")
  2. 整个响应流传输的时间
  3. Tool 调用阻塞(搜索、查数据库)

引导问题:

  • 如果有 3 个 tool 调用,分别耗时 200ms / 300ms / 150ms,串行 vs 并行差多少?
  • 用户是更在乎"开始有字了"还是"全部输出完"?

第二部分:动手实现

✅ 实现1:成本追踪中间件

// internal/metrics/cost_tracker.go
package metrics

import (
	"log/slog"
	"sync/atomic"
)

// 以 GPT-4o 为基准(单位:微美元,避免浮点累计误差)
const (
	GPT4oInputCostPerToken  = 2500  // $2.50 / 1M tokens → 2500 micro-USD / 1M → 2.5 micro-USD / 1K
	GPT4oOutputCostPerToken = 10000 // $10.00 / 1M tokens
	GPT4oMiniInputCost      = 150   // $0.15 / 1M tokens
	GPT4oMiniOutputCost     = 600   // $0.60 / 1M tokens
)

type CostTracker struct {
	totalInputTokens  atomic.Int64
	totalOutputTokens atomic.Int64
	totalCostMicroUSD atomic.Int64
	callCount         atomic.Int64
}

func NewCostTracker() *CostTracker {
	return &CostTracker{}
}

func (ct *CostTracker) RecordCall(model string, inputTokens, outputTokens int) {
	ct.totalInputTokens.Add(int64(inputTokens))
	ct.totalOutputTokens.Add(int64(outputTokens))
	ct.callCount.Add(1)

	var cost int64
	switch model {
	case "gpt-4o":
		cost = int64(inputTokens)*GPT4oInputCostPerToken/1_000_000 +
			int64(outputTokens)*GPT4oOutputCostPerToken/1_000_000
	case "gpt-4o-mini":
		cost = int64(inputTokens)*GPT4oMiniInputCost/1_000_000 +
			int64(outputTokens)*GPT4oMiniOutputCost/1_000_000
	}

	ct.totalCostMicroUSD.Add(cost)

	slog.Info("llm call recorded",
		"model", model,
		"input_tokens", inputTokens,
		"output_tokens", outputTokens,
		"cost_usd", float64(cost)/1_000_000,
	)
}

func (ct *CostTracker) Summary() map[string]any {
	calls := ct.callCount.Load()
	totalCost := ct.totalCostMicroUSD.Load()

	avgCost := float64(0)
	if calls > 0 {
		avgCost = float64(totalCost) / float64(calls) / 1_000_000
	}

	return map[string]any{
		"total_calls":         calls,
		"total_input_tokens":  ct.totalInputTokens.Load(),
		"total_output_tokens": ct.totalOutputTokens.Load(),
		"total_cost_usd":      float64(totalCost) / 1_000_000,
		"avg_cost_per_call":   avgCost,
		"estimated_monthly":   avgCost * float64(calls) * 30, // 简单外推
	}
}

问题: 为什么用 atomic.Int64 而不是 int64 + mutex


✅ 实现2:Prompt 缓存(相同 query 命中)

思路: 相同的用户 query,如果 LLM 已经回答过,直接返回缓存结果。

// internal/cache/prompt_cache.go
package cache

import (
	"context"
	"crypto/sha256"
	"encoding/hex"
	"encoding/json"
	"fmt"
	"time"
)

type CachedResponse struct {
	Answer     string            `json:"answer"`
	Sources    []string          `json:"sources"`
	Model      string            `json:"model"`
	CachedAt   time.Time         `json:"cached_at"`
	TokensUsed int               `json:"tokens_used"`
	Metadata   map[string]string `json:"metadata"`
}

type PromptCache struct {
	store TTLStore
	ttl   time.Duration
}

// TTLStore 抽象,可以是 Redis 或内存 map
type TTLStore interface {
	Get(ctx context.Context, key string) ([]byte, error)
	Set(ctx context.Context, key string, value []byte, ttl time.Duration) error
	Delete(ctx context.Context, key string) error
}

func NewPromptCache(store TTLStore, ttl time.Duration) *PromptCache {
	return &PromptCache{store: store, ttl: ttl}
}

// CacheKey 根据 query 和关键参数生成稳定的 key
func (pc *PromptCache) CacheKey(query, systemPrompt, model string) string {
	h := sha256.New()
	h.Write([]byte(query))
	h.Write([]byte("|"))
	h.Write([]byte(systemPrompt))
	h.Write([]byte("|"))
	h.Write([]byte(model))
	return "prompt:" + hex.EncodeToString(h.Sum(nil))[:16]
}

func (pc *PromptCache) Get(ctx context.Context, key string) (*CachedResponse, bool) {
	data, err := pc.store.Get(ctx, key)
	if err != nil || data == nil {
		return nil, false
	}

	var resp CachedResponse
	if err := json.Unmarshal(data, &resp); err != nil {
		return nil, false
	}
	return &resp, true
}

func (pc *PromptCache) Set(ctx context.Context, key string, resp *CachedResponse) error {
	data, err := json.Marshal(resp)
	if err != nil {
		return fmt.Errorf("marshal cache response: %w", err)
	}
	return pc.store.Set(ctx, key, data, pc.ttl)
}

在 Agent 主链路中使用:

// internal/agent/agent.go(片段)

func (a *Agent) Answer(ctx context.Context, query string) (*Response, error) {
	cacheKey := a.promptCache.CacheKey(query, a.systemPrompt, a.model)

	// 1. 先查缓存
	if cached, hit := a.promptCache.Get(ctx, cacheKey); hit {
		a.metrics.RecordCacheHit()
		return &Response{
			Answer:   cached.Answer,
			Sources:  cached.Sources,
			FromCache: true,
		}, nil
	}

	// 2. 没有缓存,正常调用 LLM
	resp, err := a.callLLM(ctx, query)
	if err != nil {
		return nil, err
	}

	// 3. 写入缓存
	_ = a.promptCache.Set(ctx, cacheKey, &CachedResponse{
		Answer:   resp.Answer,
		Sources:  resp.Sources,
		Model:    a.model,
		CachedAt: time.Now(),
	})

	return resp, nil
}

问题: TTL 应该设多久?对于 FAQ 问答 vs 实时数据查询,TTL 策略有何不同?


✅ 实现3:模型路由(复杂度路由)

核心思想: 简单问题不需要强模型。节省 90% 的成本,只损失极少精度。

// internal/router/model_router.go
package router

import (
	"strings"
	"unicode/utf8"
)

type ModelRouter struct {
	simpleModel  string // 低成本
	complexModel string // 高精度
}

func NewModelRouter() *ModelRouter {
	return &ModelRouter{
		simpleModel:  "gpt-4o-mini",
		complexModel: "gpt-4o",
	}
}

type RouteDecision struct {
	Model  string
	Reason string
}

// Route 根据 query 特征选择模型
func (r *ModelRouter) Route(query string, conversationTurns int) RouteDecision {
	// 规则1:对话轮数多 → 上下文复杂 → 用强模型
	if conversationTurns > 5 {
		return RouteDecision{
			Model:  r.complexModel,
			Reason: "long conversation requires stronger model",
		}
	}

	// 规则2:query 很长(>200字符)→ 可能有复杂需求
	if utf8.RuneCountInString(query) > 200 {
		return RouteDecision{
			Model:  r.complexModel,
			Reason: "long query indicates complexity",
		}
	}

	// 规则3:包含技术关键词 → 需要精准回答
	technicalKeywords := []string{
		"配置", "错误", "报错", "exception", "error",
		"debug", "deploy", "部署", "权限", "permission",
		"API", "代码", "code", "integrate", "集成",
	}
	queryLower := strings.ToLower(query)
	for _, kw := range technicalKeywords {
		if strings.Contains(queryLower, kw) {
			return RouteDecision{
				Model:  r.complexModel,
				Reason: "technical keyword: " + kw,
			}
		}
	}

	// 默认用小模型
	return RouteDecision{
		Model:  r.simpleModel,
		Reason: "simple query",
	}
}

更进一步: 用 embedding 相似度判断 query 属于哪类(FAQ 类 vs 技术诊断类)。


✅ 实现4:Fallback 机制

// internal/llm/fallback_client.go
package llm

import (
	"context"
	"fmt"
	"log/slog"
	"time"
)

type FallbackClient struct {
	primary   LLMClient
	secondary LLMClient
	timeout   time.Duration
}

func NewFallbackClient(primary, secondary LLMClient, timeout time.Duration) *FallbackClient {
	return &FallbackClient{
		primary:   primary,
		secondary: secondary,
		timeout:   timeout,
	}
}

func (fc *FallbackClient) Generate(ctx context.Context, req *GenerateRequest) (*GenerateResponse, error) {
	// 给 primary 一个带超时的 context
	primaryCtx, cancel := context.WithTimeout(ctx, fc.timeout)
	defer cancel()

	resp, err := fc.primary.Generate(primaryCtx, req)
	if err == nil {
		return resp, nil
	}

	// primary 失败,记录并切换
	slog.Warn("primary model failed, falling back",
		"error", err,
		"primary_model", req.Model,
	)

	// 切换到备用模型
	fallbackReq := *req
	fallbackReq.Model = "gpt-4o-mini" // 降级到小模型

	resp, err = fc.secondary.Generate(ctx, &fallbackReq)
	if err != nil {
		return nil, fmt.Errorf("both primary and secondary models failed: %w", err)
	}

	resp.Metadata = map[string]string{
		"used_fallback": "true",
		"reason":        "primary model unavailable",
	}
	return resp, nil
}

✅ 实现5:批量 Embedding(Batch API)

问题: 单条 embedding 每次一个网络往返,100 条文档 = 100 次 RTT。

// internal/rag/batch_embedder.go
package rag

import (
	"context"
	"fmt"
	"log/slog"
	"time"
)

type BatchEmbedder struct {
	client    EmbeddingClient
	batchSize int
	delay     time.Duration // 批次间延迟,避免 rate limit
}

func NewBatchEmbedder(client EmbeddingClient) *BatchEmbedder {
	return &BatchEmbedder{
		client:    client,
		batchSize: 100, // OpenAI 允许最大 2048,建议 100 稳健
		delay:     50 * time.Millisecond,
	}
}

// EmbedBatch 批量生成 embedding,返回与输入顺序一致的向量列表
func (be *BatchEmbedder) EmbedBatch(ctx context.Context, texts []string) ([][]float32, error) {
	if len(texts) == 0 {
		return nil, nil
	}

	result := make([][]float32, len(texts))
	totalBatches := (len(texts) + be.batchSize - 1) / be.batchSize

	slog.Info("starting batch embedding",
		"total_texts", len(texts),
		"batch_size", be.batchSize,
		"total_batches", totalBatches,
	)

	for i := 0; i < len(texts); i += be.batchSize {
		end := i + be.batchSize
		if end > len(texts) {
			end = len(texts)
		}

		batch := texts[i:end]
		batchNum := i/be.batchSize + 1

		slog.Debug("embedding batch",
			"batch", batchNum,
			"size", len(batch),
		)

		embeddings, err := be.client.EmbedBatch(ctx, batch)
		if err != nil {
			return nil, fmt.Errorf("batch %d/%d failed: %w", batchNum, totalBatches, err)
		}

		copy(result[i:end], embeddings)

		// 避免 rate limit
		if end < len(texts) {
			time.Sleep(be.delay)
		}
	}

	slog.Info("batch embedding complete",
		"total_embeddings", len(result),
	)
	return result, nil
}

✅ 实现6:并发 Tool 调用

问题: 一个 query 需要同时调用 搜索知识库 + 查工单状态 + 检索用户信息,串行 vs 并发?

// internal/agent/parallel_tools.go
package agent

import (
	"context"
	"sync"
)

type ToolResult struct {
	ToolName string
	Result   any
	Err      error
}

// RunToolsConcurrently 并发执行多个 tool 调用,等待所有完成
func RunToolsConcurrently(ctx context.Context, calls []ToolCall) []ToolResult {
	results := make([]ToolResult, len(calls))
	var wg sync.WaitGroup

	for i, call := range calls {
		wg.Add(1)
		go func(idx int, tc ToolCall) {
			defer wg.Done()
			result, err := tc.Execute(ctx)
			results[idx] = ToolResult{
				ToolName: tc.Name(),
				Result:   result,
				Err:      err,
			}
		}(i, call)
	}

	wg.Wait()
	return results
}

理论加速:

  • 串行:200ms + 300ms + 150ms = 650ms
  • 并行:max(200, 300, 150) = 300ms(快 2.2x)

✅ 实现7:流式响应(SSE Streaming)

原理: 不等 LLM 全部生成完,有字就输出,用户感知延迟大幅下降。

// internal/server/stream_handler.go
package server

import (
	"context"
	"encoding/json"
	"fmt"
	"net/http"
)

type StreamEvent struct {
	Type    string `json:"type"`
	Content string `json:"content"`
	Done    bool   `json:"done"`
}

func (s *Server) StreamChatHandler(w http.ResponseWriter, r *http.Request) {
	// 设置 SSE headers
	w.Header().Set("Content-Type", "text/event-stream")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")
	w.Header().Set("Access-Control-Allow-Origin", "*")

	flusher, ok := w.(http.Flusher)
	if !ok {
		http.Error(w, "streaming not supported", http.StatusInternalServerError)
		return
	}

	var req ChatRequest
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		http.Error(w, "invalid request", http.StatusBadRequest)
		return
	}

	// 发送 SSE 事件的辅助函数
	sendEvent := func(event StreamEvent) {
		data, _ := json.Marshal(event)
		fmt.Fprintf(w, "data: %s\n\n", data)
		flusher.Flush()
	}

	// 开始流式 LLM 调用
	tokenCh, errCh := s.llmClient.StreamGenerate(r.Context(), req.Message)

	for {
		select {
		case token, ok := <-tokenCh:
			if !ok {
				// 流结束
				sendEvent(StreamEvent{Type: "done", Done: true})
				return
			}
			sendEvent(StreamEvent{Type: "token", Content: token})

		case err := <-errCh:
			if err != nil {
				sendEvent(StreamEvent{Type: "error", Content: err.Error()})
				return
			}

		case <-r.Context().Done():
			// 客户端断开
			return
		}
	}
}

第三部分:关键概念

成本优化四板斧

策略 典型节省 风险
Prompt 压缩(去冗余) 10-30% 可能丢失关键信息
Prompt 缓存(相同 query) 20-60% 陈旧信息返回
模型路由(简单用小模型) 40-80% 精度轻微下降
Batch embedding 减少 RTT,省时间 批量失败影响大

延迟优化三板斧

策略 典型改善 适用场景
并发 tool 调用 2-3x 多 tool 场景
流式响应 SSE 首字延迟↓80% 所有对话场景
响应缓存(同 query) 完全消除 LLM 延迟 高重复率场景

第四部分:配套算法复盘

动态规划(DP)

与 Agent 的关联: 规划工具调用顺序(依赖关系下的最优路径)、编辑距离(fuzzy query 匹配)。

经典题: 最长公共子序列(LCS)

// LCS 标准 DP 模板
func longestCommonSubsequence(text1 string, text2 string) int {
	m, n := len(text1), len(text2)
	// dp[i][j] = text1[:i] 和 text2[:j] 的 LCS 长度
	dp := make([][]int, m+1)
	for i := range dp {
		dp[i] = make([]int, n+1)
	}

	for i := 1; i <= m; i++ {
		for j := 1; j <= n; j++ {
			if text1[i-1] == text2[j-1] {
				dp[i][j] = dp[i-1][j-1] + 1
			} else {
				dp[i][j] = max(dp[i-1][j], dp[i][j-1])
			}
		}
	}
	return dp[m][n]
}

func max(a, b int) int {
	if a > b {
		return a
	}
	return b
}

面试必知: 状态定义 → 转移方程 → 初始化 → 遍历顺序。能口头描述空间优化(滚动数组)。


图算法

与 Agent 的关联: 工具调用的依赖图(DAG 拓扑排序),知识图谱查询。

// 拓扑排序(Kahn's Algorithm,BFS版)
func topologicalSort(n int, edges [][]int) []int {
	inDegree := make([]int, n)
	adj := make([][]int, n)

	for _, e := range edges {
		adj[e[0]] = append(adj[e[0]], e[1])
		inDegree[e[1]]++
	}

	queue := []int{}
	for i := 0; i < n; i++ {
		if inDegree[i] == 0 {
			queue = append(queue, i)
		}
	}

	result := []int{}
	for len(queue) > 0 {
		node := queue[0]
		queue = queue[1:]
		result = append(result, node)

		for _, neighbor := range adj[node] {
			inDegree[neighbor]--
			if inDegree[neighbor] == 0 {
				queue = append(queue, neighbor)
			}
		}
	}

	if len(result) != n {
		return nil // 有环,无法排序
	}
	return result
}

滑动窗口

与 Agent 的关联: Rate limiting(滑动窗口限流),Token 计数窗口。

// 滑动窗口限流器
type SlidingWindowLimiter struct {
	maxRequests int
	windowSize  time.Duration
	requests    []time.Time
	mu          sync.Mutex
}

func (s *SlidingWindowLimiter) Allow() bool {
	s.mu.Lock()
	defer s.mu.Unlock()

	now := time.Now()
	cutoff := now.Add(-s.windowSize)

	// 移除窗口外的请求
	valid := s.requests[:0]
	for _, t := range s.requests {
		if t.After(cutoff) {
			valid = append(valid, t)
		}
	}
	s.requests = valid

	if len(s.requests) >= s.maxRequests {
		return false
	}

	s.requests = append(s.requests, now)
	return true
}

第五部分:自测清单

  • 我能估算出 Agent 每次对话的 token 用量和成本
  • 我能解释 Prompt Cache 的 key 生成策略(为什么用 hash?)
  • 我能说出模型路由的至少 3 条规则
  • 我能画出 Fallback 的调用链(primary 失败→ secondary)
  • 我能解释 SSE 和 WebSocket 的区别(以及各自适用场景)
  • DP 问题:能写出 LCS 的状态转移方程
  • 图问题:能用 BFS 实现拓扑排序
  • 滑动窗口:能实现基于时间的 Rate Limiter

第六部分:实战作业

任务1:成本仪表盘

  • /admin/metrics 路由返回 CostTracker 的 Summary
  • 包含本小时、今天、本月的推算成本

任务2:接入模型路由

  • 修改 Agent.Answer,先调用 ModelRouter.Route
  • 在日志里记录每次用了哪个模型和原因

任务3:实现并发 tool 调用

  • 找出 ReAct 循环中哪些 tool 调用可以并行
  • RunToolsConcurrently 替换串行调用
  • 对比前后的 P99 延迟

任务4:流式响应

  • 实现 /chat/stream 端点,支持 SSE
  • 在浏览器控制台用 EventSource 测试

第七部分:常见面试问题

Q: 缓存的 invalidation 策略是什么? A: 对于 FAQ 类内容用较长 TTL(24h),对于实时数据(工单状态)不缓存或用极短 TTL(30s)。文档更新后可以主动 delete 对应 cache key。

Q: 模型路由会不会导致用户体验不一致? A: 会,这是权衡。工程实践上可以做 A/B test,对比小模型和大模型的用户满意度评分,数据说话。

Q: Streaming 和普通请求的错误处理有什么不同? A: 普通请求可以在最后返回 HTTP 错误码;Streaming 一旦发出了 200 OK 和第一个 token,就无法再改 HTTP 状态码。需要在 SSE 事件中携带 type: error 字段。

Q: Batch API 的失败怎么处理? A: 实现部分重试:记录哪些 index 失败,只对失败的 batch 重试,而不是全部重新跑。


下一步:Day 26 预告

明天我们会:

  1. 系统梳理 Agent 系统的常见 Failure Mode
  2. 学写 Post-mortem 文档
  3. 设计防御措施(defense in depth)

准备问题:

  • 你的 Agent 在哪些场景下最容易出错?
  • 如果 LLM 给出了错误答案,用户会怎么发现?系统会怎么发现?