跳转至

Haystack RAG 框架

为什么要学

Haystack 是由 deepset 开发的可组合 AI Pipeline 框架,专注于构建 RAG 和 Agent 系统:

  • 高度模块化:每个组件(检索器、生成器、转换器)都是独立可替换的
  • Pipeline 架构:像搭积木一样构建复杂的处理流程
  • 生产就绪:被大量企业在生产环境使用
  • 模型无关:支持 OpenAI/HuggingFace/Ollama/Cohere 等
  • 组件丰富:文档处理、向量存储、Web 搜索等全覆盖

如果你需要构建可维护、可扩展的 RAG 或 Agent 系统,Haystack 的组件化设计是最工程化的选择。

核心概念

白话解释

Haystack 的核心思想是"管道(Pipeline)": - Component(组件):一个独立的处理单元(如文档拆分、向量检索、LLM生成) - Pipeline(管道):多个组件按顺序/分支连接形成的处理流程 - Document Store(文档存储):存放你的文档和向量的地方

就像工厂的流水线:原材料(文档)进入 → 经过多道工序(组件) → 产出成品(回答)。

核心概念对照表

概念说明类比
Component独立处理单元流水线工位
Pipeline组件连接的处理流程生产流水线
Document数据单元(文本+元数据)带标签的文件
DocumentStore文档存储后端文件柜/数据库
Converter文件格式转换PDF→文本提取
Splitter文本拆分器把长文切块
Embedder向量化组件文本→数字表示
Retriever检索组件搜索引擎
GeneratorLLM生成组件AI写手
Router条件路由流水线分叉口

安装配置

安装

# 核心包
pip install haystack-ai

# 常用集成
pip install haystack-ai[openai]
pip install sentence-transformers  # 本地embedding

# 可选组件
pip install chroma-haystack      # ChromaDB
pip install pgvector-haystack    # PostgreSQL
pip install qdrant-haystack      # Qdrant

环境配置

export OPENAI_API_KEY="sk-..."
# 或其他模型提供商
export ANTHROPIC_API_KEY="..."
export COHERE_API_KEY="..."

快速上手

最简 RAG Pipeline

from haystack import Pipeline, Document
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders.prompt_builder import PromptBuilder

# 定义prompt模板
template = """基于以下上下文回答问题。如果上下文中没有答案,请说"我不确定"。

上下文:
{% for doc in documents %}
  {{ doc.content }}
{% endfor %}

问题: {{ question }}
回答:"""

# 创建Pipeline
pipe = Pipeline()
pipe.add_component("prompt_builder", PromptBuilder(template=template))
pipe.add_component("llm", OpenAIGenerator(model="gpt-4o-mini"))

# 连接组件
pipe.connect("prompt_builder", "llm")

# 运行
result = pipe.run({
    "prompt_builder": {
        "documents": [
            Document(content="向量数据库是专门存储高维向量的数据库,支持相似性搜索。"),
            Document(content="常见的向量数据库包括ChromaDB、Pinecone、Qdrant等。")
        ],
        "question": "什么是向量数据库?"
    }
})

print(result["llm"]["replies"][0])

完整 RAG(含向量检索)

from haystack import Pipeline, Document
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.embedders import (
    SentenceTransformersDocumentEmbedder,
    SentenceTransformersTextEmbedder
)
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders.prompt_builder import PromptBuilder

# 1. 创建文档存储
document_store = InMemoryDocumentStore()

# 2. 索引Pipeline(文档入库)
indexing_pipe = Pipeline()
indexing_pipe.add_component(
    "embedder",
    SentenceTransformersDocumentEmbedder(model="BAAI/bge-small-zh-v1.5")
)
indexing_pipe.add_component("writer", DocumentWriter(document_store=document_store))
indexing_pipe.connect("embedder", "writer")

# 导入文档
docs = [
    Document(content="Python是一种解释型高级编程语言", meta={"source": "wiki"}),
    Document(content="Rust是一种系统编程语言,注重安全和性能", meta={"source": "wiki"}),
    Document(content="Go语言由Google开发,擅长并发编程", meta={"source": "wiki"}),
]
indexing_pipe.run({"embedder": {"documents": docs}})

# 3. 查询Pipeline
template = """基于上下文回答:
{% for doc in documents %}
- {{ doc.content }}
{% endfor %}

问题: {{ question }}
答案:"""

query_pipe = Pipeline()
query_pipe.add_component("text_embedder", SentenceTransformersTextEmbedder(model="BAAI/bge-small-zh-v1.5"))
query_pipe.add_component("retriever", InMemoryEmbeddingRetriever(document_store=document_store, top_k=2))
query_pipe.add_component("prompt_builder", PromptBuilder(template=template))
query_pipe.add_component("llm", OpenAIGenerator(model="gpt-4o-mini"))

query_pipe.connect("text_embedder.embedding", "retriever.query_embedding")
query_pipe.connect("retriever", "prompt_builder.documents")
query_pipe.connect("prompt_builder", "llm")

# 查询
result = query_pipe.run({
    "text_embedder": {"text": "哪个语言最适合写并发程序?"},
    "prompt_builder": {"question": "哪个语言最适合写并发程序?"}
})

print(result["llm"]["replies"][0])

进阶用法

1. 文档处理 Pipeline

from haystack.components.converters import (
    PyPDFToDocument,
    TextFileToDocument,
    MarkdownToDocument
)
from haystack.components.preprocessors import DocumentSplitter, DocumentCleaner
from haystack.components.routers import FileTypeRouter

# 多格式文件处理
indexing = Pipeline()

# 路由不同文件类型
indexing.add_component("router", FileTypeRouter(
    mime_types=["application/pdf", "text/plain", "text/markdown"]
))
indexing.add_component("pdf_converter", PyPDFToDocument())
indexing.add_component("text_converter", TextFileToDocument())
indexing.add_component("md_converter", MarkdownToDocument())

# 清理和拆分
indexing.add_component("cleaner", DocumentCleaner())
indexing.add_component("splitter", DocumentSplitter(
    split_by="sentence",
    split_length=3,
    split_overlap=1
))

# 向量化和存储
indexing.add_component("embedder", SentenceTransformersDocumentEmbedder())
indexing.add_component("writer", DocumentWriter(document_store=document_store))

# 连接
indexing.connect("router.application/pdf", "pdf_converter")
indexing.connect("router.text/plain", "text_converter")
indexing.connect("router.text/markdown", "md_converter")
indexing.connect("pdf_converter", "cleaner")
indexing.connect("text_converter", "cleaner")
indexing.connect("md_converter", "cleaner")
indexing.connect("cleaner", "splitter")
indexing.connect("splitter", "embedder")
indexing.connect("embedder", "writer")

2. 混合检索(BM25 + 语义)

from haystack.components.retrievers.in_memory import (
    InMemoryBM25Retriever,
    InMemoryEmbeddingRetriever
)
from haystack.components.joiners import DocumentJoiner
from haystack.components.rankers import TransformersSimilarityRanker

pipe = Pipeline()

# 双路检索
pipe.add_component("bm25_retriever", InMemoryBM25Retriever(document_store=store, top_k=10))
pipe.add_component("text_embedder", SentenceTransformersTextEmbedder())
pipe.add_component("embedding_retriever", InMemoryEmbeddingRetriever(document_store=store, top_k=10))

# 合并结果
pipe.add_component("joiner", DocumentJoiner())

# 重排序
pipe.add_component("ranker", TransformersSimilarityRanker(model="cross-encoder/ms-marco-MiniLM-L-6-v2", top_k=5))

# 连接
pipe.connect("text_embedder.embedding", "embedding_retriever.query_embedding")
pipe.connect("bm25_retriever", "joiner")
pipe.connect("embedding_retriever", "joiner")
pipe.connect("joiner", "ranker")

3. Agent(Tool Use)

from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack.tools import Tool

# 定义工具
def search_web(query: str) -> str:
    """搜索网络获取信息"""
    # 实际实现搜索逻辑
    return f"搜索'{query}'的结果..."

def calculate(expression: str) -> str:
    """计算数学表达式"""
    return str(eval(expression))

# 创建工具对象
tools = [
    Tool(name="search_web", function=search_web, description="搜索网络"),
    Tool(name="calculate", function=calculate, description="计算数学表达式")
]

# Agent Pipeline
from haystack.components.agents import Agent

agent = Agent(
    chat_generator=OpenAIChatGenerator(model="gpt-4o-mini"),
    tools=tools,
    system_prompt="你是一个有帮助的助手,可以搜索网络和做计算"
)

result = agent.run(messages=[ChatMessage.from_user("2024年OpenAI的估值是多少?换算成人民币是多少?")])

4. 条件路由

from haystack.components.routers import ConditionalRouter

# 根据问题类型路由到不同处理逻辑
routes = [
    {
        "condition": "{{ 'code' in query.lower() or '代码' in query }}",
        "output": "{{ query }}",
        "output_name": "code_query",
        "output_type": str
    },
    {
        "condition": "{{ True }}",  # 默认路由
        "output": "{{ query }}",
        "output_name": "general_query",
        "output_type": str
    }
]

pipe = Pipeline()
pipe.add_component("router", ConditionalRouter(routes=routes))
pipe.add_component("code_agent", code_pipeline)
pipe.add_component("general_agent", general_pipeline)

pipe.connect("router.code_query", "code_agent")
pipe.connect("router.general_query", "general_agent")

5. 使用本地模型

from haystack_integrations.components.generators.ollama import OllamaGenerator

# Ollama本地模型
pipe = Pipeline()
pipe.add_component("llm", OllamaGenerator(
    model="llama3.2",
    url="http://localhost:11434"
))

6. 序列化与部署

# Pipeline可以序列化为YAML
pipe.dump("my_pipeline.yaml")

# 从YAML加载
from haystack import Pipeline
loaded_pipe = Pipeline.load("my_pipeline.yaml")

# 或转为字典
config = pipe.to_dict()

常见问题

Q1: Haystack 2.x vs 1.x?

Haystack 2.x 是完全重写版本,不向后兼容: - 2.x:Pipeline + Component 架构,更灵活 - 1.x:Node 架构,已不再维护 - 新项目请直接使用 2.x

Q2: 和 LangChain/LlamaIndex 的区别?

特性HaystackLangChainLlamaIndex
定位工程化Pipeline框架通用LLM框架数据索引框架
架构Pipeline+ComponentChain+AgentIndex+Query
优势清晰/可维护生态最大RAG最专注
复杂度
生产

Q3: DocumentStore 如何选择?

存储适用场景特点
InMemory原型/测试简单但不持久
ChromaDB小规模生产易用
Qdrant中大规模高性能
Pinecone云原生托管无需运维
pgvector已有PostgreSQL无需额外服务

Q4: Pipeline 连接报错?

确保组件的输入输出类型匹配:

# 查看组件的输入输出
print(my_component.input_type)
print(my_component.output_type)

参考资源