Week 4 Day 25:成本与延迟优化 - 苏格拉底教学
从这一天开始,你的系统已经能跑通核心链路。但跑通和跑好是两回事。一个被用户嫌慢、被老板嫌贵的 Agent,离被关掉不远了。今天你要学会让它又快又省。
第一部分:问题驱动
🤔 问题1:你的 Agent 一个月花多少钱?
引导问题:
- 你调用了哪些模型?每次调用的 input/output tokens 分别多少?
- 如果一天有 1000 个用户,每人平均发 5 条消息,会产生多少次 LLM 调用?
- 成本的大头在哪里——input tokens 还是 output tokens?
答案揭示:
以 GPT-4o 为例(截至 2025 年的参考价格):
- Input: $2.50 / 1M tokens
- Output: $10.00 / 1M tokens
- Embedding (text-embedding-3-small): $0.02 / 1M tokens
估算一次 Customer Support 对话:
System prompt: ~500 tokens
RAG retrieved docs: ~1500 tokens
User message: ~100 tokens
LLM response: ~300 tokens
Input tokens: 2100 → $0.00525
Output tokens: 300 → $0.003
单次对话成本: ~$0.008
月成本推算:
1000 用户/天 × 5 条/用户 × 30 天 = 150,000 次对话
150,000 × $0.008 = $1,200/月
加上 embedding (每文档 ~500 tokens,10K 文档):
10,000 × 500 / 1,000,000 × $0.02 = $0.10(一次性)
反思: 如果把 system prompt 从 500 tokens 压缩到 200 tokens,一个月省多少?
🤔 问题2:用户感受到的延迟从哪里来?
让用户感到慢的三个地方:
- 第一个 token 出现之前(LLM 在"思考")
- 整个响应流传输的时间
- Tool 调用阻塞(搜索、查数据库)
引导问题:
- 如果有 3 个 tool 调用,分别耗时 200ms / 300ms / 150ms,串行 vs 并行差多少?
- 用户是更在乎"开始有字了"还是"全部输出完"?
第二部分:动手实现
✅ 实现1:成本追踪中间件
// internal/metrics/cost_tracker.go
package metrics
import (
"log/slog"
"sync/atomic"
)
// 以 GPT-4o 为基准(单位:微美元,避免浮点累计误差)
const (
GPT4oInputCostPerToken = 2500 // $2.50 / 1M tokens → 2500 micro-USD / 1M → 2.5 micro-USD / 1K
GPT4oOutputCostPerToken = 10000 // $10.00 / 1M tokens
GPT4oMiniInputCost = 150 // $0.15 / 1M tokens
GPT4oMiniOutputCost = 600 // $0.60 / 1M tokens
)
type CostTracker struct {
totalInputTokens atomic.Int64
totalOutputTokens atomic.Int64
totalCostMicroUSD atomic.Int64
callCount atomic.Int64
}
func NewCostTracker() *CostTracker {
return &CostTracker{}
}
func (ct *CostTracker) RecordCall(model string, inputTokens, outputTokens int) {
ct.totalInputTokens.Add(int64(inputTokens))
ct.totalOutputTokens.Add(int64(outputTokens))
ct.callCount.Add(1)
var cost int64
switch model {
case "gpt-4o":
cost = int64(inputTokens)*GPT4oInputCostPerToken/1_000_000 +
int64(outputTokens)*GPT4oOutputCostPerToken/1_000_000
case "gpt-4o-mini":
cost = int64(inputTokens)*GPT4oMiniInputCost/1_000_000 +
int64(outputTokens)*GPT4oMiniOutputCost/1_000_000
}
ct.totalCostMicroUSD.Add(cost)
slog.Info("llm call recorded",
"model", model,
"input_tokens", inputTokens,
"output_tokens", outputTokens,
"cost_usd", float64(cost)/1_000_000,
)
}
func (ct *CostTracker) Summary() map[string]any {
calls := ct.callCount.Load()
totalCost := ct.totalCostMicroUSD.Load()
avgCost := float64(0)
if calls > 0 {
avgCost = float64(totalCost) / float64(calls) / 1_000_000
}
return map[string]any{
"total_calls": calls,
"total_input_tokens": ct.totalInputTokens.Load(),
"total_output_tokens": ct.totalOutputTokens.Load(),
"total_cost_usd": float64(totalCost) / 1_000_000,
"avg_cost_per_call": avgCost,
"estimated_monthly": avgCost * float64(calls) * 30, // 简单外推
}
}
问题: 为什么用 atomic.Int64 而不是 int64 + mutex?
✅ 实现2:Prompt 缓存(相同 query 命中)
思路: 相同的用户 query,如果 LLM 已经回答过,直接返回缓存结果。
// internal/cache/prompt_cache.go
package cache
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"time"
)
type CachedResponse struct {
Answer string `json:"answer"`
Sources []string `json:"sources"`
Model string `json:"model"`
CachedAt time.Time `json:"cached_at"`
TokensUsed int `json:"tokens_used"`
Metadata map[string]string `json:"metadata"`
}
type PromptCache struct {
store TTLStore
ttl time.Duration
}
// TTLStore 抽象,可以是 Redis 或内存 map
type TTLStore interface {
Get(ctx context.Context, key string) ([]byte, error)
Set(ctx context.Context, key string, value []byte, ttl time.Duration) error
Delete(ctx context.Context, key string) error
}
func NewPromptCache(store TTLStore, ttl time.Duration) *PromptCache {
return &PromptCache{store: store, ttl: ttl}
}
// CacheKey 根据 query 和关键参数生成稳定的 key
func (pc *PromptCache) CacheKey(query, systemPrompt, model string) string {
h := sha256.New()
h.Write([]byte(query))
h.Write([]byte("|"))
h.Write([]byte(systemPrompt))
h.Write([]byte("|"))
h.Write([]byte(model))
return "prompt:" + hex.EncodeToString(h.Sum(nil))[:16]
}
func (pc *PromptCache) Get(ctx context.Context, key string) (*CachedResponse, bool) {
data, err := pc.store.Get(ctx, key)
if err != nil || data == nil {
return nil, false
}
var resp CachedResponse
if err := json.Unmarshal(data, &resp); err != nil {
return nil, false
}
return &resp, true
}
func (pc *PromptCache) Set(ctx context.Context, key string, resp *CachedResponse) error {
data, err := json.Marshal(resp)
if err != nil {
return fmt.Errorf("marshal cache response: %w", err)
}
return pc.store.Set(ctx, key, data, pc.ttl)
}
在 Agent 主链路中使用:
// internal/agent/agent.go(片段)
func (a *Agent) Answer(ctx context.Context, query string) (*Response, error) {
cacheKey := a.promptCache.CacheKey(query, a.systemPrompt, a.model)
// 1. 先查缓存
if cached, hit := a.promptCache.Get(ctx, cacheKey); hit {
a.metrics.RecordCacheHit()
return &Response{
Answer: cached.Answer,
Sources: cached.Sources,
FromCache: true,
}, nil
}
// 2. 没有缓存,正常调用 LLM
resp, err := a.callLLM(ctx, query)
if err != nil {
return nil, err
}
// 3. 写入缓存
_ = a.promptCache.Set(ctx, cacheKey, &CachedResponse{
Answer: resp.Answer,
Sources: resp.Sources,
Model: a.model,
CachedAt: time.Now(),
})
return resp, nil
}
问题: TTL 应该设多久?对于 FAQ 问答 vs 实时数据查询,TTL 策略有何不同?
✅ 实现3:模型路由(复杂度路由)
核心思想: 简单问题不需要强模型。节省 90% 的成本,只损失极少精度。
// internal/router/model_router.go
package router
import (
"strings"
"unicode/utf8"
)
type ModelRouter struct {
simpleModel string // 低成本
complexModel string // 高精度
}
func NewModelRouter() *ModelRouter {
return &ModelRouter{
simpleModel: "gpt-4o-mini",
complexModel: "gpt-4o",
}
}
type RouteDecision struct {
Model string
Reason string
}
// Route 根据 query 特征选择模型
func (r *ModelRouter) Route(query string, conversationTurns int) RouteDecision {
// 规则1:对话轮数多 → 上下文复杂 → 用强模型
if conversationTurns > 5 {
return RouteDecision{
Model: r.complexModel,
Reason: "long conversation requires stronger model",
}
}
// 规则2:query 很长(>200字符)→ 可能有复杂需求
if utf8.RuneCountInString(query) > 200 {
return RouteDecision{
Model: r.complexModel,
Reason: "long query indicates complexity",
}
}
// 规则3:包含技术关键词 → 需要精准回答
technicalKeywords := []string{
"配置", "错误", "报错", "exception", "error",
"debug", "deploy", "部署", "权限", "permission",
"API", "代码", "code", "integrate", "集成",
}
queryLower := strings.ToLower(query)
for _, kw := range technicalKeywords {
if strings.Contains(queryLower, kw) {
return RouteDecision{
Model: r.complexModel,
Reason: "technical keyword: " + kw,
}
}
}
// 默认用小模型
return RouteDecision{
Model: r.simpleModel,
Reason: "simple query",
}
}
更进一步: 用 embedding 相似度判断 query 属于哪类(FAQ 类 vs 技术诊断类)。
✅ 实现4:Fallback 机制
// internal/llm/fallback_client.go
package llm
import (
"context"
"fmt"
"log/slog"
"time"
)
type FallbackClient struct {
primary LLMClient
secondary LLMClient
timeout time.Duration
}
func NewFallbackClient(primary, secondary LLMClient, timeout time.Duration) *FallbackClient {
return &FallbackClient{
primary: primary,
secondary: secondary,
timeout: timeout,
}
}
func (fc *FallbackClient) Generate(ctx context.Context, req *GenerateRequest) (*GenerateResponse, error) {
// 给 primary 一个带超时的 context
primaryCtx, cancel := context.WithTimeout(ctx, fc.timeout)
defer cancel()
resp, err := fc.primary.Generate(primaryCtx, req)
if err == nil {
return resp, nil
}
// primary 失败,记录并切换
slog.Warn("primary model failed, falling back",
"error", err,
"primary_model", req.Model,
)
// 切换到备用模型
fallbackReq := *req
fallbackReq.Model = "gpt-4o-mini" // 降级到小模型
resp, err = fc.secondary.Generate(ctx, &fallbackReq)
if err != nil {
return nil, fmt.Errorf("both primary and secondary models failed: %w", err)
}
resp.Metadata = map[string]string{
"used_fallback": "true",
"reason": "primary model unavailable",
}
return resp, nil
}
✅ 实现5:批量 Embedding(Batch API)
问题: 单条 embedding 每次一个网络往返,100 条文档 = 100 次 RTT。
// internal/rag/batch_embedder.go
package rag
import (
"context"
"fmt"
"log/slog"
"time"
)
type BatchEmbedder struct {
client EmbeddingClient
batchSize int
delay time.Duration // 批次间延迟,避免 rate limit
}
func NewBatchEmbedder(client EmbeddingClient) *BatchEmbedder {
return &BatchEmbedder{
client: client,
batchSize: 100, // OpenAI 允许最大 2048,建议 100 稳健
delay: 50 * time.Millisecond,
}
}
// EmbedBatch 批量生成 embedding,返回与输入顺序一致的向量列表
func (be *BatchEmbedder) EmbedBatch(ctx context.Context, texts []string) ([][]float32, error) {
if len(texts) == 0 {
return nil, nil
}
result := make([][]float32, len(texts))
totalBatches := (len(texts) + be.batchSize - 1) / be.batchSize
slog.Info("starting batch embedding",
"total_texts", len(texts),
"batch_size", be.batchSize,
"total_batches", totalBatches,
)
for i := 0; i < len(texts); i += be.batchSize {
end := i + be.batchSize
if end > len(texts) {
end = len(texts)
}
batch := texts[i:end]
batchNum := i/be.batchSize + 1
slog.Debug("embedding batch",
"batch", batchNum,
"size", len(batch),
)
embeddings, err := be.client.EmbedBatch(ctx, batch)
if err != nil {
return nil, fmt.Errorf("batch %d/%d failed: %w", batchNum, totalBatches, err)
}
copy(result[i:end], embeddings)
// 避免 rate limit
if end < len(texts) {
time.Sleep(be.delay)
}
}
slog.Info("batch embedding complete",
"total_embeddings", len(result),
)
return result, nil
}
✅ 实现6:并发 Tool 调用
问题: 一个 query 需要同时调用 搜索知识库 + 查工单状态 + 检索用户信息,串行 vs 并发?
// internal/agent/parallel_tools.go
package agent
import (
"context"
"sync"
)
type ToolResult struct {
ToolName string
Result any
Err error
}
// RunToolsConcurrently 并发执行多个 tool 调用,等待所有完成
func RunToolsConcurrently(ctx context.Context, calls []ToolCall) []ToolResult {
results := make([]ToolResult, len(calls))
var wg sync.WaitGroup
for i, call := range calls {
wg.Add(1)
go func(idx int, tc ToolCall) {
defer wg.Done()
result, err := tc.Execute(ctx)
results[idx] = ToolResult{
ToolName: tc.Name(),
Result: result,
Err: err,
}
}(i, call)
}
wg.Wait()
return results
}
理论加速:
- 串行:200ms + 300ms + 150ms = 650ms
- 并行:max(200, 300, 150) = 300ms(快 2.2x)
✅ 实现7:流式响应(SSE Streaming)
原理: 不等 LLM 全部生成完,有字就输出,用户感知延迟大幅下降。
// internal/server/stream_handler.go
package server
import (
"context"
"encoding/json"
"fmt"
"net/http"
)
type StreamEvent struct {
Type string `json:"type"`
Content string `json:"content"`
Done bool `json:"done"`
}
func (s *Server) StreamChatHandler(w http.ResponseWriter, r *http.Request) {
// 设置 SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
var req ChatRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid request", http.StatusBadRequest)
return
}
// 发送 SSE 事件的辅助函数
sendEvent := func(event StreamEvent) {
data, _ := json.Marshal(event)
fmt.Fprintf(w, "data: %s\n\n", data)
flusher.Flush()
}
// 开始流式 LLM 调用
tokenCh, errCh := s.llmClient.StreamGenerate(r.Context(), req.Message)
for {
select {
case token, ok := <-tokenCh:
if !ok {
// 流结束
sendEvent(StreamEvent{Type: "done", Done: true})
return
}
sendEvent(StreamEvent{Type: "token", Content: token})
case err := <-errCh:
if err != nil {
sendEvent(StreamEvent{Type: "error", Content: err.Error()})
return
}
case <-r.Context().Done():
// 客户端断开
return
}
}
}
第三部分:关键概念
成本优化四板斧
| 策略 |
典型节省 |
风险 |
| Prompt 压缩(去冗余) |
10-30% |
可能丢失关键信息 |
| Prompt 缓存(相同 query) |
20-60% |
陈旧信息返回 |
| 模型路由(简单用小模型) |
40-80% |
精度轻微下降 |
| Batch embedding |
减少 RTT,省时间 |
批量失败影响大 |
延迟优化三板斧
| 策略 |
典型改善 |
适用场景 |
| 并发 tool 调用 |
2-3x |
多 tool 场景 |
| 流式响应 SSE |
首字延迟↓80% |
所有对话场景 |
| 响应缓存(同 query) |
完全消除 LLM 延迟 |
高重复率场景 |
第四部分:配套算法复盘
动态规划(DP)
与 Agent 的关联: 规划工具调用顺序(依赖关系下的最优路径)、编辑距离(fuzzy query 匹配)。
经典题: 最长公共子序列(LCS)
// LCS 标准 DP 模板
func longestCommonSubsequence(text1 string, text2 string) int {
m, n := len(text1), len(text2)
// dp[i][j] = text1[:i] 和 text2[:j] 的 LCS 长度
dp := make([][]int, m+1)
for i := range dp {
dp[i] = make([]int, n+1)
}
for i := 1; i <= m; i++ {
for j := 1; j <= n; j++ {
if text1[i-1] == text2[j-1] {
dp[i][j] = dp[i-1][j-1] + 1
} else {
dp[i][j] = max(dp[i-1][j], dp[i][j-1])
}
}
}
return dp[m][n]
}
func max(a, b int) int {
if a > b {
return a
}
return b
}
面试必知: 状态定义 → 转移方程 → 初始化 → 遍历顺序。能口头描述空间优化(滚动数组)。
图算法
与 Agent 的关联: 工具调用的依赖图(DAG 拓扑排序),知识图谱查询。
// 拓扑排序(Kahn's Algorithm,BFS版)
func topologicalSort(n int, edges [][]int) []int {
inDegree := make([]int, n)
adj := make([][]int, n)
for _, e := range edges {
adj[e[0]] = append(adj[e[0]], e[1])
inDegree[e[1]]++
}
queue := []int{}
for i := 0; i < n; i++ {
if inDegree[i] == 0 {
queue = append(queue, i)
}
}
result := []int{}
for len(queue) > 0 {
node := queue[0]
queue = queue[1:]
result = append(result, node)
for _, neighbor := range adj[node] {
inDegree[neighbor]--
if inDegree[neighbor] == 0 {
queue = append(queue, neighbor)
}
}
}
if len(result) != n {
return nil // 有环,无法排序
}
return result
}
滑动窗口
与 Agent 的关联: Rate limiting(滑动窗口限流),Token 计数窗口。
// 滑动窗口限流器
type SlidingWindowLimiter struct {
maxRequests int
windowSize time.Duration
requests []time.Time
mu sync.Mutex
}
func (s *SlidingWindowLimiter) Allow() bool {
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
cutoff := now.Add(-s.windowSize)
// 移除窗口外的请求
valid := s.requests[:0]
for _, t := range s.requests {
if t.After(cutoff) {
valid = append(valid, t)
}
}
s.requests = valid
if len(s.requests) >= s.maxRequests {
return false
}
s.requests = append(s.requests, now)
return true
}
第五部分:自测清单
第六部分:实战作业
任务1:成本仪表盘
任务2:接入模型路由
任务3:实现并发 tool 调用
任务4:流式响应
第七部分:常见面试问题
Q: 缓存的 invalidation 策略是什么?
A: 对于 FAQ 类内容用较长 TTL(24h),对于实时数据(工单状态)不缓存或用极短 TTL(30s)。文档更新后可以主动 delete 对应 cache key。
Q: 模型路由会不会导致用户体验不一致?
A: 会,这是权衡。工程实践上可以做 A/B test,对比小模型和大模型的用户满意度评分,数据说话。
Q: Streaming 和普通请求的错误处理有什么不同?
A: 普通请求可以在最后返回 HTTP 错误码;Streaming 一旦发出了 200 OK 和第一个 token,就无法再改 HTTP 状态码。需要在 SSE 事件中携带 type: error 字段。
Q: Batch API 的失败怎么处理?
A: 实现部分重试:记录哪些 index 失败,只对失败的 batch 重试,而不是全部重新跑。
下一步:Day 26 预告
明天我们会:
- 系统梳理 Agent 系统的常见 Failure Mode
- 学写 Post-mortem 文档
- 设计防御措施(defense in depth)
准备问题:
- 你的 Agent 在哪些场景下最容易出错?
- 如果 LLM 给出了错误答案,用户会怎么发现?系统会怎么发现?