Haystack RAG 框架¶
为什么要学¶
Haystack 是由 deepset 开发的可组合 AI Pipeline 框架,专注于构建 RAG 和 Agent 系统:
- 高度模块化:每个组件(检索器、生成器、转换器)都是独立可替换的
- Pipeline 架构:像搭积木一样构建复杂的处理流程
- 生产就绪:被大量企业在生产环境使用
- 模型无关:支持 OpenAI/HuggingFace/Ollama/Cohere 等
- 组件丰富:文档处理、向量存储、Web 搜索等全覆盖
如果你需要构建可维护、可扩展的 RAG 或 Agent 系统,Haystack 的组件化设计是最工程化的选择。
核心概念¶
白话解释¶
Haystack 的核心思想是"管道(Pipeline)": - Component(组件):一个独立的处理单元(如文档拆分、向量检索、LLM生成) - Pipeline(管道):多个组件按顺序/分支连接形成的处理流程 - Document Store(文档存储):存放你的文档和向量的地方
就像工厂的流水线:原材料(文档)进入 → 经过多道工序(组件) → 产出成品(回答)。
核心概念对照表¶
| 概念 | 说明 | 类比 |
|---|---|---|
| Component | 独立处理单元 | 流水线工位 |
| Pipeline | 组件连接的处理流程 | 生产流水线 |
| Document | 数据单元(文本+元数据) | 带标签的文件 |
| DocumentStore | 文档存储后端 | 文件柜/数据库 |
| Converter | 文件格式转换 | PDF→文本提取 |
| Splitter | 文本拆分器 | 把长文切块 |
| Embedder | 向量化组件 | 文本→数字表示 |
| Retriever | 检索组件 | 搜索引擎 |
| Generator | LLM生成组件 | AI写手 |
| Router | 条件路由 | 流水线分叉口 |
安装配置¶
安装¶
# 核心包
pip install haystack-ai
# 常用集成
pip install haystack-ai[openai]
pip install sentence-transformers # 本地embedding
# 可选组件
pip install chroma-haystack # ChromaDB
pip install pgvector-haystack # PostgreSQL
pip install qdrant-haystack # Qdrant
环境配置¶
export OPENAI_API_KEY="sk-..."
# 或其他模型提供商
export ANTHROPIC_API_KEY="..."
export COHERE_API_KEY="..."
快速上手¶
最简 RAG Pipeline¶
from haystack import Pipeline, Document
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders.prompt_builder import PromptBuilder
# 定义prompt模板
template = """基于以下上下文回答问题。如果上下文中没有答案,请说"我不确定"。
上下文:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
问题: {{ question }}
回答:"""
# 创建Pipeline
pipe = Pipeline()
pipe.add_component("prompt_builder", PromptBuilder(template=template))
pipe.add_component("llm", OpenAIGenerator(model="gpt-4o-mini"))
# 连接组件
pipe.connect("prompt_builder", "llm")
# 运行
result = pipe.run({
"prompt_builder": {
"documents": [
Document(content="向量数据库是专门存储高维向量的数据库,支持相似性搜索。"),
Document(content="常见的向量数据库包括ChromaDB、Pinecone、Qdrant等。")
],
"question": "什么是向量数据库?"
}
})
print(result["llm"]["replies"][0])
完整 RAG(含向量检索)¶
from haystack import Pipeline, Document
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.embedders import (
SentenceTransformersDocumentEmbedder,
SentenceTransformersTextEmbedder
)
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders.prompt_builder import PromptBuilder
# 1. 创建文档存储
document_store = InMemoryDocumentStore()
# 2. 索引Pipeline(文档入库)
indexing_pipe = Pipeline()
indexing_pipe.add_component(
"embedder",
SentenceTransformersDocumentEmbedder(model="BAAI/bge-small-zh-v1.5")
)
indexing_pipe.add_component("writer", DocumentWriter(document_store=document_store))
indexing_pipe.connect("embedder", "writer")
# 导入文档
docs = [
Document(content="Python是一种解释型高级编程语言", meta={"source": "wiki"}),
Document(content="Rust是一种系统编程语言,注重安全和性能", meta={"source": "wiki"}),
Document(content="Go语言由Google开发,擅长并发编程", meta={"source": "wiki"}),
]
indexing_pipe.run({"embedder": {"documents": docs}})
# 3. 查询Pipeline
template = """基于上下文回答:
{% for doc in documents %}
- {{ doc.content }}
{% endfor %}
问题: {{ question }}
答案:"""
query_pipe = Pipeline()
query_pipe.add_component("text_embedder", SentenceTransformersTextEmbedder(model="BAAI/bge-small-zh-v1.5"))
query_pipe.add_component("retriever", InMemoryEmbeddingRetriever(document_store=document_store, top_k=2))
query_pipe.add_component("prompt_builder", PromptBuilder(template=template))
query_pipe.add_component("llm", OpenAIGenerator(model="gpt-4o-mini"))
query_pipe.connect("text_embedder.embedding", "retriever.query_embedding")
query_pipe.connect("retriever", "prompt_builder.documents")
query_pipe.connect("prompt_builder", "llm")
# 查询
result = query_pipe.run({
"text_embedder": {"text": "哪个语言最适合写并发程序?"},
"prompt_builder": {"question": "哪个语言最适合写并发程序?"}
})
print(result["llm"]["replies"][0])
进阶用法¶
1. 文档处理 Pipeline¶
from haystack.components.converters import (
PyPDFToDocument,
TextFileToDocument,
MarkdownToDocument
)
from haystack.components.preprocessors import DocumentSplitter, DocumentCleaner
from haystack.components.routers import FileTypeRouter
# 多格式文件处理
indexing = Pipeline()
# 路由不同文件类型
indexing.add_component("router", FileTypeRouter(
mime_types=["application/pdf", "text/plain", "text/markdown"]
))
indexing.add_component("pdf_converter", PyPDFToDocument())
indexing.add_component("text_converter", TextFileToDocument())
indexing.add_component("md_converter", MarkdownToDocument())
# 清理和拆分
indexing.add_component("cleaner", DocumentCleaner())
indexing.add_component("splitter", DocumentSplitter(
split_by="sentence",
split_length=3,
split_overlap=1
))
# 向量化和存储
indexing.add_component("embedder", SentenceTransformersDocumentEmbedder())
indexing.add_component("writer", DocumentWriter(document_store=document_store))
# 连接
indexing.connect("router.application/pdf", "pdf_converter")
indexing.connect("router.text/plain", "text_converter")
indexing.connect("router.text/markdown", "md_converter")
indexing.connect("pdf_converter", "cleaner")
indexing.connect("text_converter", "cleaner")
indexing.connect("md_converter", "cleaner")
indexing.connect("cleaner", "splitter")
indexing.connect("splitter", "embedder")
indexing.connect("embedder", "writer")
2. 混合检索(BM25 + 语义)¶
from haystack.components.retrievers.in_memory import (
InMemoryBM25Retriever,
InMemoryEmbeddingRetriever
)
from haystack.components.joiners import DocumentJoiner
from haystack.components.rankers import TransformersSimilarityRanker
pipe = Pipeline()
# 双路检索
pipe.add_component("bm25_retriever", InMemoryBM25Retriever(document_store=store, top_k=10))
pipe.add_component("text_embedder", SentenceTransformersTextEmbedder())
pipe.add_component("embedding_retriever", InMemoryEmbeddingRetriever(document_store=store, top_k=10))
# 合并结果
pipe.add_component("joiner", DocumentJoiner())
# 重排序
pipe.add_component("ranker", TransformersSimilarityRanker(model="cross-encoder/ms-marco-MiniLM-L-6-v2", top_k=5))
# 连接
pipe.connect("text_embedder.embedding", "embedding_retriever.query_embedding")
pipe.connect("bm25_retriever", "joiner")
pipe.connect("embedding_retriever", "joiner")
pipe.connect("joiner", "ranker")
3. Agent(Tool Use)¶
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack.tools import Tool
# 定义工具
def search_web(query: str) -> str:
"""搜索网络获取信息"""
# 实际实现搜索逻辑
return f"搜索'{query}'的结果..."
def calculate(expression: str) -> str:
"""计算数学表达式"""
return str(eval(expression))
# 创建工具对象
tools = [
Tool(name="search_web", function=search_web, description="搜索网络"),
Tool(name="calculate", function=calculate, description="计算数学表达式")
]
# Agent Pipeline
from haystack.components.agents import Agent
agent = Agent(
chat_generator=OpenAIChatGenerator(model="gpt-4o-mini"),
tools=tools,
system_prompt="你是一个有帮助的助手,可以搜索网络和做计算"
)
result = agent.run(messages=[ChatMessage.from_user("2024年OpenAI的估值是多少?换算成人民币是多少?")])
4. 条件路由¶
from haystack.components.routers import ConditionalRouter
# 根据问题类型路由到不同处理逻辑
routes = [
{
"condition": "{{ 'code' in query.lower() or '代码' in query }}",
"output": "{{ query }}",
"output_name": "code_query",
"output_type": str
},
{
"condition": "{{ True }}", # 默认路由
"output": "{{ query }}",
"output_name": "general_query",
"output_type": str
}
]
pipe = Pipeline()
pipe.add_component("router", ConditionalRouter(routes=routes))
pipe.add_component("code_agent", code_pipeline)
pipe.add_component("general_agent", general_pipeline)
pipe.connect("router.code_query", "code_agent")
pipe.connect("router.general_query", "general_agent")
5. 使用本地模型¶
from haystack_integrations.components.generators.ollama import OllamaGenerator
# Ollama本地模型
pipe = Pipeline()
pipe.add_component("llm", OllamaGenerator(
model="llama3.2",
url="http://localhost:11434"
))
6. 序列化与部署¶
# Pipeline可以序列化为YAML
pipe.dump("my_pipeline.yaml")
# 从YAML加载
from haystack import Pipeline
loaded_pipe = Pipeline.load("my_pipeline.yaml")
# 或转为字典
config = pipe.to_dict()
常见问题¶
Q1: Haystack 2.x vs 1.x?¶
Haystack 2.x 是完全重写版本,不向后兼容: - 2.x:Pipeline + Component 架构,更灵活 - 1.x:Node 架构,已不再维护 - 新项目请直接使用 2.x
Q2: 和 LangChain/LlamaIndex 的区别?¶
| 特性 | Haystack | LangChain | LlamaIndex |
|---|---|---|---|
| 定位 | 工程化Pipeline框架 | 通用LLM框架 | 数据索引框架 |
| 架构 | Pipeline+Component | Chain+Agent | Index+Query |
| 优势 | 清晰/可维护 | 生态最大 | RAG最专注 |
| 复杂度 | 中 | 高 | 低 |
| 生产 | 强 | 中 | 中 |
Q3: DocumentStore 如何选择?¶
| 存储 | 适用场景 | 特点 |
|---|---|---|
| InMemory | 原型/测试 | 简单但不持久 |
| ChromaDB | 小规模生产 | 易用 |
| Qdrant | 中大规模 | 高性能 |
| Pinecone | 云原生 | 托管无需运维 |
| pgvector | 已有PostgreSQL | 无需额外服务 |
Q4: Pipeline 连接报错?¶
确保组件的输入输出类型匹配:
参考资源¶
- Haystack 官方文档 - 完整文档
- Haystack GitHub - 源代码
- Tutorials - 官方教程
- Integrations - 集成列表
- deepset Blog - 技术博客