Week 1 Day 3:Tool Registry - 苏格拉底教学

💡 一句话核心:Agent的能力边界 = 它注册了哪些tool。Registry是Agent的"能力清单",也是风险控制的第一道闸门。

学习目标

  • 理解tool system在Agent架构中的位置
  • 设计生产级 ToolDefinition(带风险等级、超时、schema)
  • 实现Registry(Register/Get/Execute/List)
  • 把内部Tool自动转换为OpenAI function calling格式
  • 处理执行的 timeout / cancellation / panic recovery

第一部分:问题驱动

🤔 问题1:为什么Agent需要"工具系统"?Prompt里直接描述不行吗?

引导问题:

  1. 你让GPT给用户"创建一个工单",它能真的访问你的Jira吗?
  2. GPT输出一段文字描述"我应该查询用户信息",谁来真正执行这个查询?
  3. 如果LLM想调用10个不同的系统,你是在prompt里写10段"如果要X,请返回这样的JSON"吗?
  4. 人类是怎么"操作世界"的?(通过工具:键盘、鼠标、API)

答案揭示:

  • LLM是纯函数:文本进、文本出,不能碰外部世界
  • Tool System = LLM的"手",把模型的"决策"转成真实世界的"动作"
  • OpenAI的 function calling 就是标准化的tool协议
  • 没有tool系统,Agent=聊天机器人;有tool系统,Agent=能做事的员工

🤔 问题2:Tool描述应该包含什么?

引导问题:

  1. LLM怎么知道一个tool是做什么的?(Description)
  2. LLM怎么知道要传什么参数?(InputSchema)
  3. 工具出错怎么办?卡死怎么办?(Timeout)
  4. get_user_infodelete_all_data 是同一个危险等级吗?(RiskLevel)
  5. 工具执行逻辑写在哪?(Execute函数)

答案揭示:

生产级ToolDefinition至少包含:

字段 用途
Name 唯一标识,LLM用它call
Description 给LLM看的自然语言描述
InputSchema JSON Schema,定义参数形状
RiskLevel low/medium/high,决定是否要审批
Timeout 单次执行的最大时长
Execute 真正的业务逻辑

🤔 问题3:LLM幻觉了一个不存在的tool,怎么办?

引导问题:

  1. LLM返回 {"name":"delete_customer","arguments":"..."},但你没注册这个tool,该怎么响应?
  2. 直接panic?返回空字符串?给LLM一个错误消息?
  3. 如果你返回错误,LLM下一轮会怎么做?(可能道歉并重试别的工具)

答案揭示:

  • Registry.Execute 必须 永不panic
  • Tool不存在 → 返回明确的错误字符串,喂回给LLM
  • 这样LLM下一轮能self-correct

第二部分:动手实现

✅ 版本1:ToolDefinition基础

// internal/tools/types.go
package tools

import (
	"context"
	"encoding/json"
	"time"
)

// RiskLevel:工具的危险等级
type RiskLevel string

const (
	RiskLow    RiskLevel = "low"    // 只读(get_user_info)
	RiskMedium RiskLevel = "medium" // 可撤销的写(create_draft)
	RiskHigh   RiskLevel = "high"   // 不可撤销(send_email、charge_card)
)

// ToolResult:统一的执行结果
type ToolResult struct {
	Output   string            `json:"output"`            // 喂回给LLM的文本
	Data     json.RawMessage   `json:"data,omitempty"`    // 结构化数据(便于审计)
	Metadata map[string]string `json:"metadata,omitempty"`
	Error    string            `json:"error,omitempty"`   // 非空则表示失败
}

// ExecuteFn:工具的真实逻辑
type ExecuteFn func(ctx context.Context, args json.RawMessage) (*ToolResult, error)

// ToolDefinition:工具定义
type ToolDefinition struct {
	Name        string         `json:"name"`
	Description string         `json:"description"`
	InputSchema map[string]any `json:"input_schema"` // JSON Schema
	RiskLevel   RiskLevel      `json:"risk_level"`
	Timeout     time.Duration  `json:"timeout"`
	Execute     ExecuteFn      `json:"-"` // 不序列化
}

反思题:

  • 为什么 ToolResult.Output 是string而不是any?(LLM只吃文本。结构化数据另存 Data 字段用于审计)
  • 为什么有 error 返回值还要有 ToolResult.Error 字段?(区分"系统错误"vs"业务错误"——系统错误要重试,业务错误要喂回LLM)

✅ 版本2:JSON Schema与OpenAI格式转换

// internal/tools/schema.go
package tools

// ToOpenAIFormat:把内部Tool转换成OpenAI function calling需要的格式
// OpenAI要求:
// {
//   "type": "function",
//   "function": {
//     "name": "...",
//     "description": "...",
//     "parameters": {...JSON Schema...}
//   }
// }
func (t *ToolDefinition) ToOpenAIFormat() map[string]any {
	return map[string]any{
		"type": "function",
		"function": map[string]any{
			"name":        t.Name,
			"description": t.Description,
			"parameters":  t.InputSchema,
		},
	}
}

// ExampleSchema:辅助构造JSON Schema
// schema := ObjectSchema(map[string]any{
//     "query": StringSchema("用户搜索词"),
//     "limit": IntSchema("返回条数", 1, 10),
// }, []string{"query"})
func ObjectSchema(properties map[string]any, required []string) map[string]any {
	return map[string]any{
		"type":                 "object",
		"properties":           properties,
		"required":             required,
		"additionalProperties": false,
	}
}

func StringSchema(description string) map[string]any {
	return map[string]any{"type": "string", "description": description}
}

func IntSchema(description string, min, max int) map[string]any {
	return map[string]any{
		"type":        "integer",
		"description": description,
		"minimum":     min,
		"maximum":     max,
	}
}

func EnumSchema(description string, values ...string) map[string]any {
	return map[string]any{
		"type":        "string",
		"description": description,
		"enum":        values,
	}
}

✅ 版本3:Registry核心

// internal/tools/registry.go
package tools

import (
	"context"
	"encoding/json"
	"fmt"
	"runtime/debug"
	"sort"
	"sync"
	"time"

	"log/slog"
)

// Registry:工具注册中心
type Registry struct {
	mu    sync.RWMutex
	tools map[string]*ToolDefinition
}

func NewRegistry() *Registry {
	return &Registry{tools: make(map[string]*ToolDefinition)}
}

// Register:注册工具(同名会报错,避免静默覆盖)
func (r *Registry) Register(t *ToolDefinition) error {
	if t.Name == "" {
		return fmt.Errorf("tool name required")
	}
	if t.Execute == nil {
		return fmt.Errorf("tool %q missing Execute", t.Name)
	}
	if t.Timeout == 0 {
		t.Timeout = 10 * time.Second // 默认10秒
	}
	if t.RiskLevel == "" {
		t.RiskLevel = RiskLow
	}

	r.mu.Lock()
	defer r.mu.Unlock()
	if _, exists := r.tools[t.Name]; exists {
		return fmt.Errorf("tool %q already registered", t.Name)
	}
	r.tools[t.Name] = t
	return nil
}

// Get:按名称获取
func (r *Registry) Get(name string) (*ToolDefinition, bool) {
	r.mu.RLock()
	defer r.mu.RUnlock()
	t, ok := r.tools[name]
	return t, ok
}

// List:返回所有tools(按name排序,稳定)
func (r *Registry) List() []*ToolDefinition {
	r.mu.RLock()
	defer r.mu.RUnlock()
	out := make([]*ToolDefinition, 0, len(r.tools))
	for _, t := range r.tools {
		out = append(out, t)
	}
	sort.Slice(out, func(i, j int) bool { return out[i].Name < out[j].Name })
	return out
}

// ToOpenAIFormat:把所有tool导出成OpenAI function calling数组
func (r *Registry) ToOpenAIFormat() []map[string]any {
	tools := r.List()
	out := make([]map[string]any, 0, len(tools))
	for _, t := range tools {
		out = append(out, t.ToOpenAIFormat())
	}
	return out
}

// Execute:执行tool,带timeout、panic recovery、统一error处理
func (r *Registry) Execute(ctx context.Context, name string, args json.RawMessage) (result *ToolResult) {
	t, ok := r.Get(name)
	if !ok {
		return &ToolResult{Error: fmt.Sprintf("tool %q not found", name)}
	}

	// 1. 带timeout的context
	execCtx, cancel := context.WithTimeout(ctx, t.Timeout)
	defer cancel()

	// 2. panic recovery
	defer func() {
		if r := recover(); r != nil {
			slog.Error("tool panic",
				"tool", name,
				"panic", r,
				"stack", string(debug.Stack()),
			)
			result = &ToolResult{Error: fmt.Sprintf("tool %q panicked: %v", name, r)}
		}
	}()

	start := time.Now()
	slog.Info("tool.exec.start", "tool", name, "args", string(args))

	res, err := t.Execute(execCtx, args)

	duration := time.Since(start)

	// 3. 错误归因
	if err != nil {
		if execCtx.Err() == context.DeadlineExceeded {
			slog.Warn("tool.timeout", "tool", name, "timeout", t.Timeout)
			return &ToolResult{Error: fmt.Sprintf("tool %q timed out after %s", name, t.Timeout)}
		}
		slog.Error("tool.exec.error", "tool", name, "error", err, "duration_ms", duration.Milliseconds())
		return &ToolResult{Error: err.Error()}
	}

	slog.Info("tool.exec.ok", "tool", name, "duration_ms", duration.Milliseconds())
	return res
}

关键设计点:

  • Execute 永不panic(defer recover 兜底)
  • Timeout通过 context.WithTimeout 强制实施
  • 所有异常路径统一返回 *ToolResult{Error: ...}LLM能读懂错误

✅ 版本4:三个Mock Tool

// internal/tools/builtin/search_kb.go
package builtin

import (
	"context"
	"encoding/json"
	"fmt"
	"strings"
	"time"

	"yourname/agent-runtime/internal/tools"
)

// 假的知识库
var kb = map[string]string{
	"sso":     "重置SSO密码:访问 sso.company.com,点击Forgot Password。",
	"ticket":  "创建工单需要L2以上支持人员审批。",
	"refund":  "退款流程:客户发起 → 财务审批 → 3个工作日到账。",
	"vacation": "年假政策:每年15天,可累计至下一年度。",
}

type SearchKBArgs struct {
	Query string `json:"query"`
	Limit int    `json:"limit,omitempty"`
}

func NewSearchKB() *tools.ToolDefinition {
	return &tools.ToolDefinition{
		Name:        "search_kb",
		Description: "在企业知识库中搜索信息。用于回答常见问题、公司政策、流程说明。",
		InputSchema: tools.ObjectSchema(
			map[string]any{
				"query": tools.StringSchema("搜索关键词,支持中英文"),
				"limit": tools.IntSchema("返回条数", 1, 10),
			},
			[]string{"query"},
		),
		RiskLevel: tools.RiskLow,
		Timeout:   3 * time.Second,
		Execute: func(ctx context.Context, raw json.RawMessage) (*tools.ToolResult, error) {
			var args SearchKBArgs
			if err := json.Unmarshal(raw, &args); err != nil {
				return &tools.ToolResult{Error: "invalid arguments: " + err.Error()}, nil
			}
			if args.Limit == 0 {
				args.Limit = 3
			}

			// 模拟一次慢查询
			select {
			case <-time.After(100 * time.Millisecond):
			case <-ctx.Done():
				return nil, ctx.Err()
			}

			var hits []string
			q := strings.ToLower(args.Query)
			for key, val := range kb {
				if strings.Contains(key, q) || strings.Contains(strings.ToLower(val), q) {
					hits = append(hits, fmt.Sprintf("[%s] %s", key, val))
					if len(hits) >= args.Limit {
						break
					}
				}
			}
			if len(hits) == 0 {
				return &tools.ToolResult{Output: "未找到相关文档"}, nil
			}
			return &tools.ToolResult{Output: strings.Join(hits, "\n")}, nil
		},
	}
}
// internal/tools/builtin/create_ticket.go
package builtin

import (
	"context"
	"encoding/json"
	"fmt"
	"sync/atomic"
	"time"

	"yourname/agent-runtime/internal/tools"
)

var ticketCounter int64

type CreateTicketArgs struct {
	Title    string `json:"title"`
	Body     string `json:"body"`
	Priority string `json:"priority"`
}

func NewCreateTicket() *tools.ToolDefinition {
	return &tools.ToolDefinition{
		Name:        "create_ticket",
		Description: "创建一个客户支持工单。会触发人工审批流程。",
		InputSchema: tools.ObjectSchema(
			map[string]any{
				"title":    tools.StringSchema("工单标题,简洁明确"),
				"body":     tools.StringSchema("详细描述"),
				"priority": tools.EnumSchema("优先级", "low", "normal", "high", "urgent"),
			},
			[]string{"title", "body", "priority"},
		),
		RiskLevel: tools.RiskMedium, // 写操作,但可撤销
		Timeout:   5 * time.Second,
		Execute: func(ctx context.Context, raw json.RawMessage) (*tools.ToolResult, error) {
			var args CreateTicketArgs
			if err := json.Unmarshal(raw, &args); err != nil {
				return &tools.ToolResult{Error: "invalid args: " + err.Error()}, nil
			}
			if args.Title == "" {
				return &tools.ToolResult{Error: "title is required"}, nil
			}

			id := atomic.AddInt64(&ticketCounter, 1)
			ticketID := fmt.Sprintf("TICK-%05d", id)

			return &tools.ToolResult{
				Output: fmt.Sprintf("工单已创建:%s(优先级=%s)。等待L2审批。", ticketID, args.Priority),
				Metadata: map[string]string{
					"ticket_id": ticketID,
					"status":    "pending_approval",
				},
			}, nil
		},
	}
}
// internal/tools/builtin/get_user_info.go
package builtin

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	"yourname/agent-runtime/internal/tools"
)

var fakeUsers = map[string]map[string]any{
	"u_001": {"name": "Alice", "dept": "Engineering", "email": "alice@company.com"},
	"u_002": {"name": "Bob", "dept": "Sales", "email": "bob@company.com"},
}

type GetUserInfoArgs struct {
	UserID string `json:"user_id"`
}

func NewGetUserInfo() *tools.ToolDefinition {
	return &tools.ToolDefinition{
		Name:        "get_user_info",
		Description: "根据用户ID查询基础信息(姓名、部门、邮箱)。",
		InputSchema: tools.ObjectSchema(
			map[string]any{
				"user_id": tools.StringSchema("用户ID,形如 u_001"),
			},
			[]string{"user_id"},
		),
		RiskLevel: tools.RiskLow,
		Timeout:   2 * time.Second,
		Execute: func(ctx context.Context, raw json.RawMessage) (*tools.ToolResult, error) {
			var args GetUserInfoArgs
			if err := json.Unmarshal(raw, &args); err != nil {
				return &tools.ToolResult{Error: "invalid args: " + err.Error()}, nil
			}
			u, ok := fakeUsers[args.UserID]
			if !ok {
				return &tools.ToolResult{Error: fmt.Sprintf("user %q not found", args.UserID)}, nil
			}
			b, _ := json.Marshal(u)
			return &tools.ToolResult{
				Output: string(b),
				Data:   b,
			}, nil
		},
	}
}

var _ = time.Second

✅ 版本5:组装和使用

// cmd/api/main.go(片段)
import (
	"yourname/agent-runtime/internal/tools"
	"yourname/agent-runtime/internal/tools/builtin"
)

func setupTools() *tools.Registry {
	r := tools.NewRegistry()
	_ = r.Register(builtin.NewSearchKB())
	_ = r.Register(builtin.NewCreateTicket())
	_ = r.Register(builtin.NewGetUserInfo())
	return r
}

// 喂给OpenAI
registry := setupTools()
openaiTools := registry.ToOpenAIFormat()
// openaiTools可以直接放进GenerateRequest.Tools(稍作类型转换)

第三部分:关键概念

1. JSON Schema是"LLM和代码的合同"

  • 描述清楚 → LLM少幻觉
  • enum / required 等约束
  • additionalProperties: false 禁止未知字段

2. RiskLevel驱动审批流

if tool.RiskLevel == RiskHigh {
    // 暂停agent,写入pending_approvals表,等待人工
    return requireApproval(ctx, call)
}

(Day 5会实现审批流)

3. Timeout是最后一道防线

  • 工具内部可能调外部HTTP
  • 外部HTTP可能无响应
  • 不设timeout=整个agent被卡死
  • context.WithTimeout 必须传给底层调用

4. 永不panic原则

  • LLM输入不可控 → tool可能收到奇怪的args
  • 外部服务可能返回nil → 下游代码可能deref空指针
  • Registry的Execute必须用 defer recover() 兜住

5. Error vs ToolResult.Error

  • Go error:系统层问题(网络、数据库崩了),调用者可能想重试
  • ToolResult.Error:业务层问题(参数错误、user_not_found),应喂回LLM
  • 实践:业务错误优先放在 ToolResult.Error,系统错误返回 error

第四部分:自测清单

  • 我能说清 ToolDefinition 的6个字段和各自作用
  • 我能手写JSON Schema来描述一个带enum和required的参数
  • 我知道 Registry.Execute 有哪些"不会崩"的保护(timeout/panic/not-found)
  • 我能解释 errorToolResult.Error 的区别
  • 我能把 Registry 里所有 tool 转成 OpenAI function calling 格式
  • 我能新加一个tool,整个系统不需要动其他代码

第五部分:作业

任务1:完整实现

  • internal/tools/types.go + schema.go + registry.go
  • internal/tools/builtin/ 下三个tool

任务2:单元测试

// internal/tools/registry_test.go
func TestRegistry_Execute_NotFound(t *testing.T) {
    r := tools.NewRegistry()
    res := r.Execute(ctx, "no_such_tool", nil)
    require.Contains(t, res.Error, "not found")
}

func TestRegistry_Execute_Timeout(t *testing.T) {
    r := tools.NewRegistry()
    _ = r.Register(&tools.ToolDefinition{
        Name: "slow", Execute: func(ctx context.Context, _ json.RawMessage) (*tools.ToolResult, error) {
            <-ctx.Done()
            return nil, ctx.Err()
        },
        Timeout: 50 * time.Millisecond,
    })
    res := r.Execute(context.Background(), "slow", nil)
    require.Contains(t, res.Error, "timed out")
}

func TestRegistry_Execute_Panic(t *testing.T) {
    r := tools.NewRegistry()
    _ = r.Register(&tools.ToolDefinition{
        Name: "boom", Execute: func(ctx context.Context, _ json.RawMessage) (*tools.ToolResult, error) {
            panic("oops")
        },
    })
    res := r.Execute(context.Background(), "boom", nil)
    require.Contains(t, res.Error, "panicked")
}

任务3:自己再加一个Tool

  • 实现 send_email tool(mock即可)
  • 使用 RiskHigh
  • InputSchema里用enum约束 priority

任务4:思考题

  • 如果要支持流式tool(例如 stream_logs),接口要改成什么样?
  • 两个tool想共享同一个DB连接池,应该怎么注入?(提示:闭包 vs constructor参数)

第六部分:常见问题解答

Q1:为什么 Execute 返回 (*ToolResult, error) 两个值?直接一个不行吗? A:区分"系统错误(需重试/报警)"和"业务错误(喂回给LLM)"。

// 网络断了 → 返回error,Registry层可能做重试
return nil, errors.New("db connection lost")

// 用户不存在 → 返回ToolResult.Error,喂回LLM让它道歉
return &ToolResult{Error: "user not found"}, nil

Q2:同名tool再注册会怎样? A:返回error。静默覆盖是最难调的bug。

Q3:LLM传了不符合schema的arguments怎么办? A:两层防护:

  1. Prompt里强化schema描述(prompt engineering)
  2. 代码里 json.Unmarshal 失败时返回 ToolResult.Error——LLM下一轮会self-correct

更严格的做法:用 gojsonschema 库做schema validation。

Q4:Tool执行时想访问请求的 user_id / trace_id,怎么拿? A:通过context传:

ctx := context.WithValue(parent, ctxKeyUserID{}, "u_001")
// tool内部:
uid, _ := ctx.Value(ctxKeyUserID{}).(string)

不要用全局变量,不要塞到args里(那是给LLM看的)。

Q5:需要对tool做限流吗? A:是的。尤其是 create_ticket 这种写操作。实现思路:

  • Registry外面再包一层 RateLimitedRegistry(装饰器)
  • 或在Execute里按tool name维护 rate.Limiter

配套算法题

1. Merge Intervals(LeetCode 56)

题目: 给一组区间 [[1,3],[2,6],[8,10]],合并重叠区间。

思路: 按起点排序,遍历合并。

// algo/merge_intervals.go
package algo

import "sort"

func merge(intervals [][]int) [][]int {
	if len(intervals) == 0 {
		return nil
	}
	sort.Slice(intervals, func(i, j int) bool {
		return intervals[i][0] < intervals[j][0]
	})
	out := [][]int{intervals[0]}
	for _, iv := range intervals[1:] {
		last := out[len(out)-1]
		if iv[0] <= last[1] { // 重叠
			if iv[1] > last[1] {
				last[1] = iv[1]
			}
		} else {
			out = append(out, iv)
		}
	}
	return out
}

复杂度: O(n log n) / O(1)。

Agent上下文联想: 多个tool调用的time window合并、audit log聚合都能用到。


2. Search Insert Position(LeetCode 35)

题目: 有序数组中找target的插入位置(二分变体)。

// algo/search_insert.go
package algo

func searchInsert(nums []int, target int) int {
	lo, hi := 0, len(nums)
	for lo < hi {
		mid := lo + (hi-lo)/2
		if nums[mid] < target {
			lo = mid + 1
		} else {
			hi = mid
		}
	}
	return lo
}

复杂度: O(log n) / O(1)。

Agent上下文联想: 工具调用的优先级队列插入、timestamp定位。


3. Top K Frequent Elements(LeetCode 347)

题目: 返回出现次数前k高的元素。

思路: 哈希计数 + 桶排序(O(n))或最小堆(O(n log k))。

// algo/top_k.go
package algo

import "container/heap"

type freq struct {
	val, cnt int
}
type minHeap []freq

func (h minHeap) Len() int            { return len(h) }
func (h minHeap) Less(i, j int) bool  { return h[i].cnt < h[j].cnt }
func (h minHeap) Swap(i, j int)       { h[i], h[j] = h[j], h[i] }
func (h *minHeap) Push(x any)         { *h = append(*h, x.(freq)) }
func (h *minHeap) Pop() any           { old := *h; n := len(old); x := old[n-1]; *h = old[:n-1]; return x }

func topKFrequent(nums []int, k int) []int {
	cnt := make(map[int]int)
	for _, n := range nums {
		cnt[n]++
	}
	h := &minHeap{}
	heap.Init(h)
	for v, c := range cnt {
		heap.Push(h, freq{v, c})
		if h.Len() > k {
			heap.Pop(h)
		}
	}
	out := make([]int, k)
	for i := k - 1; i >= 0; i-- {
		out[i] = heap.Pop(h).(freq).val
	}
	return out
}

复杂度: O(n log k) / O(n + k)。

Agent上下文联想: 最常用top-k工具统计、热门query分析、cache预热候选。


下一步:Day 4 预告

明天是整个Week 1最关键的一天——Agent Loop

  1. 把Day 2的 LLMClient 和 Day 3的 Registry 组合起来
  2. 实现完整的决策循环:用户输入 → LLM → ToolCall? → 执行 → 反馈 → 循环
  3. 设计Loop state:iteration / messages / tool调用历史
  4. 终止条件:final answer / max iterations / error
  5. 画出状态转换图

准备问题:

  • 一个Agent一次对话,LLM最多会被调用几次?
  • 如果LLM每次都调同一个tool,陷入死循环怎么办?
  • Agent的状态(messages)应该存在内存还是数据库?