Week 2 Day 8:文档解析与Chunking - 苏格拉底教学

💡 从这一天开始,你会构建RAG系统的第一块基石:怎样把原始文档变成可被LLM理解的chunks。

第一部分:问题驱动

🤔 问题1:为什么不能直接把整个文档传给LLM?

引导问题:

  1. 一个30页的PDF有多少token?(~10K tokens)
  2. GPT-4的context window有多大?(128K tokens)
  3. 如果传整个文档,LLM会失焦吗?
  4. 怎样让LLM找到最相关的部分?

答案揭示:

  • LLM有context限制,传太多信息反而变成噪音
  • 需要分割成小块(chunks),便于检索和处理
  • 检索时只取最相关的chunks,构成compact的prompt

你应该理解:

  • Token限制是硬约束
  • Chunking是RAG的基础
  • Chunk大小的权衡

🤔 问题2:怎样分割文档才既不丢信息,也不太冗长?

选项A:固定大小(200 tokens)

优点:简单,reproducible 缺点:可能在句子中间切割,上下文断裂

选项B:按段落分割

优点:保留逻辑单元 缺点:某些段落太长,某些太短

选项C:固定大小 + 重叠(semantic chunking的简化)

文本:[...tokens 1-200...] [tokens 100-300] [tokens 200-400]... 优点:保留上下文连贯性 缺点:重复数据略多

建议: 混合策略

  • 按段落优先,段落太长时再分割
  • 相邻chunks保持50-100 token的重叠

第二部分:动手实现

✅ 版本1:最简单的chunker

// internal/rag/chunker.go
package rag

import (
	"strings"
)

type Chunker struct {
	chunkSize   int  // tokens(约4 chars = 1 token)
	overlapSize int
}

func NewChunker() *Chunker {
	return &Chunker{
		chunkSize:   1000,      // ~250 tokens
		overlapSize: 100,       // 25 tokens overlap
	}
}

// 最简单:按大小分割
func (c *Chunker) SimpleChunk(text string) []string {
	var chunks []string
	
	runes := []rune(text)
	for i := 0; i < len(runes); i += c.chunkSize - c.overlapSize {
		end := i + c.chunkSize
		if end > len(runes) {
			end = len(runes)
		}
		
		chunk := string(runes[i:end])
		chunks = append(chunks, chunk)
	}
	
	return chunks
}

问题: 这会在词中间切割!


✅ 版本2:按段落+固定大小

// internal/rag/chunker.go

type Chunk struct {
	ID       string            `json:"id"`
	DocID    string            `json:"doc_id"`
	Title    string            `json:"title"`
	Section  string            `json:"section"`
	Text     string            `json:"text"`
	Source   string            `json:"source"`
	Metadata map[string]string `json:"metadata"`
}

func (c *Chunker) SmartChunk(docID, title, text string) []Chunk {
	// Step 1: 按\n\n分段落
	paragraphs := strings.Split(text, "\n\n")
	
	var chunks []Chunk
	var currentChunk strings.Builder
	chunkNum := 0
	
	for _, para := range paragraphs {
		// Step 2: 检查加上这个段落会不会超过chunkSize
		testText := currentChunk.String() + "\n\n" + para
		
		if len(testText) > c.chunkSize && currentChunk.Len() > 0 {
			// 超了,先保存当前chunk
			chunk := Chunk{
				ID:      genID(docID, chunkNum),
				DocID:   docID,
				Title:   title,
				Text:    currentChunk.String(),
				Section: extractSection(text, currentChunk.String()),
				Source:  docID,
				Metadata: map[string]string{
					"chunk_index": fmt.Sprintf("%d", chunkNum),
					"length":      fmt.Sprintf("%d", len(currentChunk.String())),
				},
			}
			chunks = append(chunks, chunk)
			
			// 新chunk从这个段落开始
			currentChunk.Reset()
			currentChunk.WriteString(para)
			chunkNum++
		} else {
			// 还能放下
			if currentChunk.Len() > 0 {
				currentChunk.WriteString("\n\n")
			}
			currentChunk.WriteString(para)
		}
	}
	
	// 最后一个chunk
	if currentChunk.Len() > 0 {
		chunk := Chunk{
			ID:      genID(docID, chunkNum),
			DocID:   docID,
			Title:   title,
			Text:    currentChunk.String(),
			Section: extractSection(text, currentChunk.String()),
			Source:  docID,
		}
		chunks = append(chunks, chunk)
	}
	
	return chunks
}

func genID(docID string, index int) string {
	return fmt.Sprintf("%s_chunk_%d", docID, index)
}

func extractSection(fullText, chunkText string) string {
	// 简单实现:找这个chunk是在文档的哪个位置
	pos := strings.Index(fullText, chunkText)
	if pos < 100 {
		return "introduction"
	}
	return "body"
}

✅ 版本3:加入Metadata

// Chunk现在包含更多信息
type DocumentMetadata struct {
	Source        string            // 文件来源
	SourceType    string            // "faq", "policy", "ticket"
	Department    string            // "hr", "eng", "sales"
	CreatedAt     time.Time
	UpdatedAt     time.Time
	Classification string           // "public", "internal", "confidential"
	Tags          []string
}

func (c *Chunker) ChunkWithMetadata(doc *Document) []Chunk {
	baseChunks := c.SmartChunk(doc.ID, doc.Title, doc.Content)
	
	// 加入metadata
	for i := range baseChunks {
		baseChunks[i].Metadata = map[string]string{
			"source_type":      doc.Metadata.SourceType,
			"department":       doc.Metadata.Department,
			"classification":   doc.Metadata.Classification,
			"last_updated":     doc.Metadata.UpdatedAt.Format(time.RFC3339),
			"chunk_index":      fmt.Sprintf("%d", i),
			"total_chunks":     fmt.Sprintf("%d", len(baseChunks)),
			"chunk_size_chars": fmt.Sprintf("%d", len(baseChunks[i].Text)),
		}
	}
	
	return baseChunks
}

第三部分:准备数据和输出

准备FAQ文档

# 创建数据目录
mkdir -p data/docs

# 创建sample FAQ文件
cat > data/docs/faq.md << 'EOF'
# Frequently Asked Questions

## How to reset SSO?

To reset your SSO password:
1. Go to the SSO portal
2. Click "Forgot Password"
3. Enter your email
4. Follow the link in your email

## What is the approval process?

All ticket creation requires approval from L2 support or above.
The approval workflow is:
- Draft created by agent
- Human reviews
- Approved or rejected

## How to create a ticket?

You can create a ticket through:
- Web portal
- Email (forward to support)
- Chat interface
EOF

输出为JSONL

// internal/rag/chunker.go

import (
	"encoding/json"
	"os"
)

func (c *Chunker) WriteChunksToJSONL(chunks []Chunk, outputPath string) error {
	file, err := os.Create(outputPath)
	if err != nil {
		return err
	}
	defer file.Close()
	
	encoder := json.NewEncoder(file)
	
	for _, chunk := range chunks {
		if err := encoder.Encode(chunk); err != nil {
			return err
		}
	}
	
	return nil
}

完整的Day 8流程

func main() {
	// 1. 读取文档
	content, _ := ioutil.ReadFile("data/docs/faq.md")
	
	// 2. 创建chunker
	chunker := NewChunker()
	
	// 3. 分割
	chunks := chunker.ChunkWithMetadata(&Document{
		ID:      "faq_001",
		Title:   "FAQ",
		Content: string(content),
		Metadata: DocumentMetadata{
			SourceType:    "faq",
			Department:    "support",
			Classification: "public",
		},
	})
	
	// 4. 输出
	chunker.WriteChunksToJSONL(chunks, "evals/chunks.jsonl")
	
	fmt.Printf("Generated %d chunks\n", len(chunks))
}

第四部分:关键概念

Token vs Characters

// Token近似:1 token ≈ 4 characters(英文)
// 或 ≈ 1.5 characters(中文)

func estimateTokens(text string) int {
	// 粗略估计
	return len(text) / 4
}

// 准确方法:用tiktoken库
import "github.com/pkoukk/tiktoken-go"

func countTokensAccurate(text string) int {
	encoding := tiktoken.MustNewEncoding("cl100k_base") // GPT-4用
	return len(encoding.Encode(text, nil, nil))
}

Metadata的用途

  1. 权限过滤 - Day 19时,用department和classification过滤
  2. 溯源 - 答案时能回溯到原document和chunk
  3. 去重 - 相同的chunk只存一次
  4. 热度追踪 - 统计哪些chunks被检索最多

第五部分:自测清单

运行前,问自己:

  • 我能解释为什么要分割文档?
  • Chunk大小怎样选择?(1000 tokens)
  • 为什么要overlap?
  • Metadata包含什么信息?
  • 怎样生成稳定的chunk ID?

第六部分:作业

任务1:准备真实数据

  • 创建5-10篇FAQ/Policy文档(markdown格式)
  • 每篇500-2000字
  • 放在 data/docs/ 目录

任务2:实现chunker

  • 完成 internal/rag/chunker.go
  • 能处理不同大小的文档
  • Metadata自动填充

任务3:生成chunks.jsonl

  • 一行一个chunk(JSONL格式)
  • 包含所有metadata
  • 生成50-100个chunks

任务4:验证质量

# 检查生成的chunks
wc -l evals/chunks.jsonl      # 行数(chunk数)
head -1 evals/chunks.jsonl    # 看第一个chunk的结构
jq . evals/chunks.jsonl       # 检查JSON格式

第七部分:问题解答

Q: 如果一个段落本身就很长(>chunkSize)怎么办?

A: 需要继续分割。改进版:

if len(para) > c.chunkSize {
    // 段落太长,按句子分割
    sentences := strings.Split(para, ". ")
    for _, sent := range sentences {
        // 像段落一样处理
    }
}

Q: Chunk大小(1000 tokens)怎样确定?

A: 权衡因素:

  • 太小:上下文丢失,检索精度低
  • 太大:包含噪音,LLM容易distracted

建议:先用1000试,然后根据Day 12的eval结果调整。

Q: 怎样处理表格、代码块这样的结构化内容?

A: 特殊处理:

if isCodeBlock(para) {
    // 代码块不分割,整块保存
    chunk := createChunk(para)
    chunk.Metadata["type"] = "code"
}


配套算法题

题1:Maximum Depth of Binary Tree (Easy)

题目: 给定一棵二叉树的根节点,返回其最大深度(即从根节点到最远叶子节点的最长路径上的节点数)。

思路: 递归DFS。树的最大深度 = 1 + max(左子树深度, 右子树深度)。空节点深度为0,递归终止。

type TreeNode struct {
    Val   int
    Left  *TreeNode
    Right *TreeNode
}

func maxDepth(root *TreeNode) int {
    if root == nil {
        return 0
    }
    leftDepth := maxDepth(root.Left)
    rightDepth := maxDepth(root.Right)
    if leftDepth > rightDepth {
        return leftDepth + 1
    }
    return rightDepth + 1
}

复杂度: 时间 O(n)(每个节点访问一次),空间 O(h)(h 为树高,递归栈)

Agent 上下文联想: RAG 的文档结构常是树形(章节→段落→句子),递归遍历和 chunking 的层级拆分思路一致。


题2:Balanced Binary Tree (Easy)

题目: 给定一棵二叉树,判断它是否是高度平衡的——即每个节点的两棵子树高度差不超过1。

思路: 自底向上递归(后序遍历)。定义辅助函数返回当前子树的高度;若发现不平衡则提前返回 -1 作为"哨兵",避免重复计算。这比自顶向下的朴素解法减少了重复递归。

func isBalanced(root *TreeNode) bool {
    return height(root) != -1
}

// 返回树高,若不平衡返回 -1
func height(node *TreeNode) int {
    if node == nil {
        return 0
    }
    left := height(node.Left)
    if left == -1 {
        return -1 // 左子树已不平衡,直接剪枝
    }
    right := height(node.Right)
    if right == -1 {
        return -1 // 右子树已不平衡,直接剪枝
    }
    diff := left - right
    if diff > 1 || diff < -1 {
        return -1 // 当前节点不平衡
    }
    if left > right {
        return left + 1
    }
    return right + 1
}

复杂度: 时间 O(n),空间 O(h)

面试关键点: 区分"自顶向下 O(n²)"和"自底向上 O(n)"两种思路,面试官会追问。


题3:Lowest Common Ancestor of a Binary Tree (Medium)

题目: 给定一棵二叉树和两个节点 p、q,找到它们的最近公共祖先(LCA)。LCA 定义为:在树中同时是 p 和 q 的祖先,且离它们最近的节点。

思路: 递归分治。对每个节点:

  1. 若当前节点为 nil 或等于 p/q,直接返回当前节点
  2. 分别在左右子树中递归查找
  3. 若左右各返回一个非 nil 值,说明当前节点就是 LCA
  4. 否则返回非 nil 的那一侧
func lowestCommonAncestor(root, p, q *TreeNode) *TreeNode {
    // 基础情况:空节点或找到目标节点
    if root == nil || root == p || root == q {
        return root
    }

    // 在左右子树中分别查找
    left := lowestCommonAncestor(root.Left, p, q)
    right := lowestCommonAncestor(root.Right, p, q)

    // 左右各找到一个 → 当前节点是 LCA
    if left != nil && right != nil {
        return root
    }

    // 只有一侧找到 → 返回那一侧的结果
    if left != nil {
        return left
    }
    return right
}

复杂度: 时间 O(n),空间 O(h)

Agent 上下文联想: 分治思路类似 RAG 中对文档做层级分段:先处理子问题(子树),汇总结果到父节点(上层语义)。


下一步:Day 9 预告

明天我们会:

  1. 调用OpenAI Embedding API
  2. 把chunks转换成向量
  3. 存入Qdrant Vector DB
  4. 实现vector search

准备问题:

  • Embedding是什么?(把文本转成数字向量)
  • 为什么需要向量存储?(快速相似度搜索)