💡 混合检索返回了 top-20 候选,但粗筛的质量还不够高。今天加入精排(Rerank)和引用生成(Citations),让 RAG 系统又准又可信。
引导问题:
答案揭示:
为什么分两阶段?
阶段1:粗筛(embedding + BM25)
- 在百万文档中找 top-100(毫秒级)
- 不可能对百万文档逐一做精排
阶段2:精排(Rerank)
- 对 top-100 中找 top-5(百毫秒级)
- 精排模型可以同时看 query 和 document,做深度交叉计算
Cross-encoder(如 Cohere Rerank、BGE-reranker):
原理:把 (query, document) 拼接后,输出一个相关性分数
优点:速度快(专用模型,推理一次几毫秒)、成本低
缺点:需要额外部署/调用专用 rerank API
适用:生产环境,需要低延迟
LLM Rerank(用 GPT-4o):
原理:把 query 和多个候选文档发给 LLM,让它直接给出排名
优点:理解能力强,可以推理复杂相关性;无需额外部署
缺点:慢(LLM 推理延迟大)、贵(按 token 计费)
适用:对质量要求极高的场景,或候选集很小(<20个)
本项目策略:
引导问题:
答案揭示:
[1][2] 让用户能追溯到原始文档你是一个文档相关性评估专家。
用户问题:{query}
以下是候选文档(按原始排名展示):
[1] {doc1}
[2] {doc2}
...
[N] {docN}
请按照与用户问题的相关性,从高到低重新排列这些文档的编号。
只输出重新排列后的编号列表,格式为 JSON 数组,例如:[3, 1, 5, 2, 4]
不要输出解释。
关键设计原则:
你是一个专业的问答助手。根据以下上下文回答用户问题。
规则:
1. 只能使用提供的上下文信息,不能添加任何上下文中没有的内容
2. 每个关键声明必须用 [数字] 标注来源,数字对应上下文编号
3. 如果上下文中没有足够信息,直接说"根据现有文档,无法确定..."
4. 不要编造、推断、或使用训练数据中的知识
上下文:
[1] 来源:{source1}
{text1}
[2] 来源:{source2}
{text2}
...
用户问题:{query}
回答:
// 检索结果中的 chunk_id 如何映射回原始文档?
// 设计:chunk_id = doc_id + "_chunk_" + index
// 例:faq_sso_001_chunk_3 → doc_id=faq_sso_001, chunk=3
type CitationMapper struct {
chunkToSource map[string]SourceInfo
}
type SourceInfo struct {
DocID string
Title string
URL string // 原始文档链接(如有)
PageNum int // PDF 页码(如有)
ChunkIdx int
}
// 从 Qdrant payload 构建映射
func BuildCitationMapper(results []HybridSearchResult) *CitationMapper {
m := &CitationMapper{
chunkToSource: make(map[string]SourceInfo),
}
for _, r := range results {
m.chunkToSource[r.ChunkID] = SourceInfo{
DocID: r.DocID,
Title: r.Title,
URL: r.Source,
}
}
return m
}// internal/rag/reranker.go
package rag
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"strings"
openai "github.com/sashabaranov/go-openai"
)
type LLMReranker struct {
client *openai.Client
model string
}
func NewLLMReranker(apiKey string) *LLMReranker {
return &LLMReranker{
client: openai.NewClient(apiKey),
model: openai.GPT4oMini, // 用 mini 降低 rerank 成本
}
}
// Rerank 对候选文档重新排名
// 输入:query + 候选列表
// 输出:按相关性排序的候选列表(top-k)
func (r *LLMReranker) Rerank(
ctx context.Context,
query string,
candidates []HybridSearchResult,
topK int,
) ([]HybridSearchResult, error) {
if len(candidates) == 0 {
return nil, nil
}
// 构建候选文档列表文本
var docsBuilder strings.Builder
for i, c := range candidates {
docsBuilder.WriteString(fmt.Sprintf("[%d] %s\n%s\n\n",
i+1, c.Title, truncateText(c.Text, 300)))
}
prompt := fmt.Sprintf(`你是文档相关性评估专家。
用户问题:%s
候选文档(共 %d 个):
%s
请按照与用户问题的相关性从高到低重新排列文档编号。
只输出 JSON 数字数组,不要任何解释。
例如:[3, 1, 5, 2, 4]`,
query, len(candidates), docsBuilder.String())
resp, err := r.client.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
Model: r.model,
Messages: []openai.ChatCompletionMessage{
{
Role: openai.ChatMessageRoleUser,
Content: prompt,
},
},
Temperature: 0, // Rerank 需要确定性输出
})
if err != nil {
return nil, fmt.Errorf("rerank llm call: %w", err)
}
// 解析排名
content := strings.TrimSpace(resp.Choices[0].Message.Content)
var ranking []int
if err := json.Unmarshal([]byte(content), &ranking); err != nil {
slog.Warn("rerank parse failed, using original order", "content", content, "error", err)
// 降级:解析失败时使用原始排名
if topK < len(candidates) {
return candidates[:topK], nil
}
return candidates, nil
}
// 按新排名重新排列
var reranked []HybridSearchResult
used := make(map[int]bool)
for _, rank := range ranking {
idx := rank - 1 // 转为 0-indexed
if idx < 0 || idx >= len(candidates) || used[idx] {
continue
}
used[idx] = true
reranked = append(reranked, candidates[idx])
if len(reranked) >= topK {
break
}
}
slog.Info("rerank complete",
"input", len(candidates),
"output", len(reranked),
"model", r.model,
)
return reranked, nil
}
func truncateText(text string, maxChars int) string {
runes := []rune(text)
if len(runes) <= maxChars {
return text
}
return string(runes[:maxChars]) + "..."
}// internal/rag/citation.go
package rag
import (
"context"
"fmt"
"strings"
openai "github.com/sashabaranov/go-openai"
)
type CitationGenerator struct {
client *openai.Client
model string
}
func NewCitationGenerator(apiKey string) *CitationGenerator {
return &CitationGenerator{
client: openai.NewClient(apiKey),
model: openai.GPT4o,
}
}
// CitationResult 带引用的回答
type CitationResult struct {
Answer string // 带 [1][2] 标注的回答
Sources []SourceInfo // 引用的来源列表
}
// Generate 生成带引用的回答
func (cg *CitationGenerator) Generate(
ctx context.Context,
query string,
docs []HybridSearchResult,
) (*CitationResult, error) {
if len(docs) == 0 {
return &CitationResult{
Answer: "根据现有文档,无法找到相关信息。",
}, nil
}
// 构建上下文文本(带编号)
var ctxBuilder strings.Builder
for i, doc := range docs {
ctxBuilder.WriteString(fmt.Sprintf("[%d] 来源:%s(%s)\n%s\n\n",
i+1, doc.Title, doc.Source, doc.Text))
}
systemPrompt := `你是一个专业的企业知识库问答助手。
回答规则:
1. 只能使用提供的上下文信息,严禁添加任何上下文中没有的内容
2. 每个关键声明必须用 [数字] 标注来源编号,可以同时引用多个来源如 [1][3]
3. 如果上下文信息不足以回答问题,必须明确说明:"根据现有文档,无法确定..."
4. 保持回答简洁、准确,使用中文
5. 不要推断、假设或使用训练数据中的知识`
userMessage := fmt.Sprintf("上下文:\n%s\n用户问题:%s", ctxBuilder.String(), query)
resp, err := cg.client.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
Model: cg.model,
Messages: []openai.ChatCompletionMessage{
{Role: openai.ChatMessageRoleSystem, Content: systemPrompt},
{Role: openai.ChatMessageRoleUser, Content: userMessage},
},
Temperature: 0.1, // 低温度,减少随机性
})
if err != nil {
return nil, fmt.Errorf("citation generation: %w", err)
}
answer := resp.Choices[0].Message.Content
// 构建来源列表
sources := make([]SourceInfo, len(docs))
for i, doc := range docs {
sources[i] = SourceInfo{
DocID: doc.DocID,
Title: doc.Title,
URL: doc.Source,
}
}
return &CitationResult{
Answer: answer,
Sources: sources,
}, nil
}// internal/rag/citation.go(续)
// ExtractCitations 从带标注的答案中提取引用编号
// 输入:"密码重置流程见 SSO 门户 [1][3]。"
// 输出:[]int{1, 3}
func ExtractCitations(answer string) []int {
var citations []int
seen := make(map[int]bool)
// 匹配 [数字] 格式
i := 0
for i < len(answer) {
if answer[i] == '[' {
j := i + 1
for j < len(answer) && answer[j] >= '0' && answer[j] <= '9' {
j++
}
if j < len(answer) && answer[j] == ']' && j > i+1 {
numStr := answer[i+1 : j]
var num int
fmt.Sscanf(numStr, "%d", &num)
if num > 0 && !seen[num] {
citations = append(citations, num)
seen[num] = true
}
}
i = j + 1
} else {
i++
}
}
return citations
}
// ValidateCitations 验证引用编号是否在合法范围内
func ValidateCitations(answer string, docCount int) (bool, []int) {
citations := ExtractCitations(answer)
var invalid []int
for _, c := range citations {
if c < 1 || c > docCount {
invalid = append(invalid, c)
}
}
return len(invalid) == 0, invalid
}// internal/rag/pipeline.go
package rag
import (
"context"
"fmt"
"log/slog"
"time"
)
type RAGPipeline struct {
hybridSearcher *HybridSearcher
reranker *LLMReranker
citationGenerator *CitationGenerator
}
func NewRAGPipeline(hs *HybridSearcher, rr *LLMReranker, cg *CitationGenerator) *RAGPipeline {
return &RAGPipeline{
hybridSearcher: hs,
reranker: rr,
citationGenerator: cg,
}
}
// RAGRequest 请求结构
type RAGRequest struct {
Query string
Filter FilterOptions
RetrieveTopK int // 粗筛候选数(默认 20)
RerankTopK int // 精排后保留数(默认 5)
}
// RAGResponse 响应结构
type RAGResponse struct {
Answer string
Sources []SourceInfo
RetrievedDocs []HybridSearchResult // 调试用:粗筛结果
RerankedDocs []HybridSearchResult // 调试用:精排结果
Latencies map[string]time.Duration
}
// Run 执行完整 RAG 流水线
func (p *RAGPipeline) Run(ctx context.Context, req RAGRequest) (*RAGResponse, error) {
if req.RetrieveTopK == 0 {
req.RetrieveTopK = 20
}
if req.RerankTopK == 0 {
req.RerankTopK = 5
}
latencies := make(map[string]time.Duration)
resp := &RAGResponse{Latencies: latencies}
// 阶段1:混合检索(粗筛)
t0 := time.Now()
retrieved, err := p.hybridSearcher.Search(ctx, HybridSearchRequest{
Query: req.Query,
TopK: req.RetrieveTopK,
Filter: req.Filter,
})
latencies["retrieve"] = time.Since(t0)
if err != nil {
return nil, fmt.Errorf("retrieve: %w", err)
}
resp.RetrievedDocs = retrieved
slog.Info("retrieved", "count", len(retrieved), "latency", latencies["retrieve"])
// 阶段2:LLM Rerank(精排)
t1 := time.Now()
reranked, err := p.reranker.Rerank(ctx, req.Query, retrieved, req.RerankTopK)
latencies["rerank"] = time.Since(t1)
if err != nil {
slog.Warn("rerank failed, using retrieved results", "error", err)
// 降级:rerank 失败时直接用粗筛结果
reranked = retrieved
if len(reranked) > req.RerankTopK {
reranked = reranked[:req.RerankTopK]
}
}
resp.RerankedDocs = reranked
slog.Info("reranked", "count", len(reranked), "latency", latencies["rerank"])
// 阶段3:生成带引用的回答
t2 := time.Now()
citResult, err := p.citationGenerator.Generate(ctx, req.Query, reranked)
latencies["generate"] = time.Since(t2)
if err != nil {
return nil, fmt.Errorf("generate: %w", err)
}
// 验证引用
valid, invalidCitations := ValidateCitations(citResult.Answer, len(reranked))
if !valid {
slog.Warn("invalid citations detected", "invalid", invalidCitations)
}
resp.Answer = citResult.Answer
resp.Sources = citResult.Sources
slog.Info("rag pipeline complete",
"query", req.Query,
"total_latency", latencies["retrieve"]+latencies["rerank"]+latencies["generate"],
)
return resp, nil
}// internal/server/rag_handler.go
package server
import (
"encoding/json"
"net/http"
"github.com/yourname/agent-runtime/internal/rag"
)
type RAGHandler struct {
pipeline *rag.RAGPipeline
}
type RAGRequest struct {
Query string `json:"query"`
Department string `json:"department,omitempty"`
Classification string `json:"classification,omitempty"`
}
type RAGResponse struct {
Answer string `json:"answer"`
Sources []rag.SourceInfo `json:"sources"`
}
func (h *RAGHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var req RAGRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid request", http.StatusBadRequest)
return
}
if req.Query == "" {
http.Error(w, "query is required", http.StatusBadRequest)
return
}
result, err := h.pipeline.Run(r.Context(), rag.RAGRequest{
Query: req.Query,
Filter: rag.FilterOptions{
Department: req.Department,
Classification: req.Classification,
},
})
if err != nil {
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(RAGResponse{
Answer: result.Answer,
Sources: result.Sources,
})
}# 启动服务
go run cmd/api/main.go
# 测试 RAG 查询
curl -X POST http://localhost:8080/rag \
-H "Content-Type: application/json" \
-d '{"query": "如何重置 SSO 密码?", "department": "engineering"}'
# 期望响应格式
# {
# "answer": "重置 SSO 密码的步骤如下:首先访问 SSO 门户 [1],点击"忘记密码"按钮 [1],
# 输入你的企业邮箱,系统会发送重置链接 [2]。",
# "sources": [
# {"doc_id": "faq_sso_001", "title": "SSO 密码重置指南", "url": "..."},
# {"doc_id": "faq_sso_002", "title": "账号管理 FAQ", "url": "..."}
# ]
# }// 层次1:Prompt 约束(已实现)
// "只能使用提供的上下文,不能添加额外信息"
// 层次2:引用验证(已实现)
valid, _ := ValidateCitations(answer, len(docs))
// 层次3:语义一致性检查(进阶)
// 用另一个 LLM 调用验证答案的每个声明是否能在文档中找到依据
func VerifyAnswer(ctx context.Context, answer string, docs []HybridSearchResult) (bool, error) {
prompt := fmt.Sprintf(`判断以下回答的每个陈述是否都能在文档中找到明确依据。
文档:%v
回答:%s
输出:{"verified": true/false, "unsupported_claims": ["..."]}`,
docsToText(docs), answer)
// 调用 LLM 验证...
}
// 层次4:温度控制
// Generate 时 temperature=0.1,减少随机性,让 LLM 更倾向于直接引用原文// 问题:GPT-4o Rerank 每次查询约 2000 tokens,成本约 $0.01
// 对于高并发系统不可接受
// 优化方案1:改用 GPT-4o-mini(成本降低 15 倍,质量略降)
model: openai.GPT4oMini
// 优化方案2:改用专用 Rerank API(Cohere Rerank)
// 成本:约 $0.001/次(比 LLM Rerank 便宜 10 倍)
// 速度:< 100ms(比 LLM 快 5-10 倍)
// 优化方案3:只在必要时 Rerank
// - 低置信度查询(RRF 分数差异小)才触发 Rerank
// - 高置信度查询(top-1 分数远高于其他)直接用粗筛结果
func needsRerank(candidates []HybridSearchResult) bool {
if len(candidates) < 2 {
return false
}
// 如果第1名分数比第2名高出 30% 以上,不需要精排
return candidates[0].RRFScore < candidates[1].RRFScore*1.3
}// 格式1:行内引用(推荐,信息最密集)
// "重置密码步骤如下 [1]:访问 SSO 门户 [1],点击忘记密码 [2]。"
// 格式2:脚注式(更正式,适合长文档)
// "重置密码需要三步(见参考[1][2]):..."
// [1] SSO 密码重置指南
// [2] 账号管理 FAQ
// 格式3:Hover 式(前端实现)
// 前端把 [1] 渲染成可点击的角标,hover 显示原文片段
type CitationAnnotation struct {
Number int `json:"number"`
Title string `json:"title"`
Excerpt string `json:"excerpt"` // 相关片段
URL string `json:"url"`
}运行前,问自己:
# 确保所有组件就绪
docker ps | grep qdrant # Qdrant 运行中
ls data/bleve.index/ # bleve 索引已建立
# 运行完整查询
curl -X POST http://localhost:8080/rag \
-H "Content-Type: application/json" \
-d '{"query": "如何申请年假?"}'evals/rerank_eval.json// 用 GPT-4o-mini 替换 GPT-4o 做 Rerank
// 测量:成本差异(token 用量)、质量差异(人工评分)、延迟差异Q: LLM 输出的 ranking JSON 格式不对,解析失败怎么办?
A: 多层降级策略:
// 尝试1:直接解析 JSON
var ranking []int
if err := json.Unmarshal([]byte(content), &ranking); err == nil {
return applyRanking(candidates, ranking)
}
// 尝试2:提取 JSON 子串(LLM 可能在 JSON 前后加了解释)
start := strings.Index(content, "[")
end := strings.LastIndex(content, "]")
if start >= 0 && end > start {
substr := content[start : end+1]
if err := json.Unmarshal([]byte(substr), &ranking); err == nil {
return applyRanking(candidates, ranking)
}
}
// 降级:解析失败,使用原始排名
return candidates[:topK], nilQ: 如果检索结果里没有问题的答案,LLM 还是生成了一个,怎么检测?
A: Faithfulness Check(忠实度检测):
// 调用 LLM 判断答案是否有文档依据
prompt := `以下答案的每个陈述是否都能在文档中找到直接依据?
文档:[...]
答案:[...]
输出:{"faithful": true/false, "hallucinated_parts": ["..."]}
`
// faithful=false 时拒绝返回该答案,要求重新生成Q: Citation 解析用了自己实现的字符串扫描,为什么不用正则?
A: 本项目用手写扫描主要是为了清晰展示逻辑。生产中直接用正则更简洁:
import "regexp"
var citationRe = regexp.MustCompile(`\[(\d+)\]`)
func ExtractCitations(answer string) []int {
matches := citationRe.FindAllStringSubmatch(answer, -1)
seen := make(map[int]bool)
var result []int
for _, m := range matches {
var n int
fmt.Sscanf(m[1], "%d", &n)
if !seen[n] {
seen[n] = true
result = append(result, n)
}
}
return result
}Q: 为什么 Rerank 要用 temperature=0,Generate 用 temperature=0.1?
A:
为什么? Rerank 中选 top-K 文档时,有时要避免选相邻重复的内容(相邻 chunk 语义高度重叠),这是区间选择问题的变体。
// LeetCode 198: House Robber
// 不能选相邻房屋,求最大收益
func rob(nums []int) int {
if len(nums) == 0 {
return 0
}
if len(nums) == 1 {
return nums[0]
}
// dp[i] = 到第 i 个房屋为止,能获得的最大金额
dp := make([]int, len(nums))
dp[0] = nums[0]
dp[1] = max(nums[0], nums[1])
for i := 2; i < len(nums); i++ {
// 选 i:dp[i-2] + nums[i]
// 不选 i:dp[i-1]
dp[i] = max(dp[i-1], dp[i-2]+nums[i])
}
return dp[len(nums)-1]
}
// 空间优化到 O(1):只需要前两个状态
func robOptimized(nums []int) int {
prev2, prev1 := 0, 0
for _, n := range nums {
curr := max(prev1, prev2+n)
prev2 = prev1
prev1 = curr
}
return prev1
}
// 环形版本(LeetCode 213):第一个和最后一个也算相邻
// 拆成两个子问题:[0..n-2] 和 [1..n-1],取 max
func robCircular(nums []int) int {
if len(nums) == 1 {
return nums[0]
}
return max(robOptimized(nums[:len(nums)-1]),
robOptimized(nums[1:]))
}
func max(a, b int) int {
if a > b {
return a
}
return b
}DP 思路模板:
定义 dp[i] = 子问题答案
dp[i] = f(dp[i-1], dp[i-2], ..., nums[i])
从小问题逐步推导大问题
为什么? RAG Pipeline 的各阶段(Retrieve/Rerank/Generate)有延迟预算,如何用有限的"延迟配额"组合出最优方案,是个背包/组合问题。
// LeetCode 322: Coin Change
// 给定硬币面额,凑成总金额所需的最少硬币数
// 无限背包问题
func coinChange(coins []int, amount int) int {
// dp[i] = 凑成金额 i 所需的最少硬币数
dp := make([]int, amount+1)
for i := range dp {
dp[i] = amount + 1 // 初始化为"不可能"
}
dp[0] = 0 // 凑成 0 不需要任何硬币
for i := 1; i <= amount; i++ {
for _, coin := range coins {
if coin <= i {
// 选这枚硬币:dp[i-coin] + 1
if dp[i-coin]+1 < dp[i] {
dp[i] = dp[i-coin] + 1
}
}
}
}
if dp[amount] > amount {
return -1 // 无法凑成
}
return dp[amount]
}
// 进阶:打印凑法(回溯路径)
func coinChangePath(coins []int, amount int) []int {
dp := make([]int, amount+1)
choice := make([]int, amount+1) // 记录每步用了哪个硬币
for i := range dp {
dp[i] = amount + 1
}
dp[0] = 0
for i := 1; i <= amount; i++ {
for _, coin := range coins {
if coin <= i && dp[i-coin]+1 < dp[i] {
dp[i] = dp[i-coin] + 1
choice[i] = coin // 记录选择
}
}
}
if dp[amount] > amount {
return nil
}
// 回溯路径
var path []int
for amount > 0 {
path = append(path, choice[amount])
amount -= choice[amount]
}
return path
}复杂度: Time O(amount × len(coins)),Space O(amount)
为什么? Citation 验证中需要判断答案文本是否可以被分解为来自文档的片段,是 Word Break 问题的应用场景。
// LeetCode 139: Word Break
// 判断字符串 s 是否能被字典中的单词拼成
func wordBreak(s string, wordDict []string) bool {
wordSet := make(map[string]bool)
for _, w := range wordDict {
wordSet[w] = true
}
n := len(s)
// dp[i] = s[0..i-1] 是否能被字典单词拼成
dp := make([]bool, n+1)
dp[0] = true // 空串可以被拼成
for i := 1; i <= n; i++ {
for j := 0; j < i; j++ {
// s[j..i-1] 是否是字典单词,且 s[0..j-1] 可以被拼成
if dp[j] && wordSet[s[j:i]] {
dp[i] = true
break
}
}
}
return dp[n]
}
// 进阶:返回所有拆分方案(LeetCode 140)
func wordBreakII(s string, wordDict []string) []string {
wordSet := make(map[string]bool)
for _, w := range wordDict {
wordSet[w] = true
}
memo := make(map[int][]string)
var dfs func(start int) []string
dfs = func(start int) []string {
if v, ok := memo[start]; ok {
return v
}
if start == len(s) {
return []string{""}
}
var result []string
for end := start + 1; end <= len(s); end++ {
word := s[start:end]
if wordSet[word] {
rest := dfs(end)
for _, r := range rest {
if r == "" {
result = append(result, word)
} else {
result = append(result, word+" "+r)
}
}
}
}
memo[start] = result
return result
}
return dfs(0)
}DP 与记忆化递归的关系:
DP(自底向上):
- 从 dp[0] 开始,逐步推导 dp[n]
- 迭代实现,无递归开销
记忆化递归(自顶向下):
- 从 dfs(0) 开始,遇到子问题查 memo
- 更直观,适合需要返回路径的场景
Week 2 你已经完成了:
这就是完整的 RAG 系统! 下周 Week 3 我们会:
本周自测:你能独立回答这些问题吗?