1. 面试题目 #
在构建和部署RAG(检索增强生成)系统时,我们经常会遇到召回(Retrieval)结果与用户查询意图不完全匹配的情况,这直接影响了最终生成答案的质量和用户体验。请您详细阐述导致这种不匹配的潜在原因,并提出在RAG系统不同阶段(预处理、检索、后处理)可以采取的优化策略。同时,请结合具体技术,深入探讨如何提升召回的准确性和相关性。
2. 参考答案 #
2.1 引言:RAG系统召回不匹配的挑战 #
RAG系统通过结合信息检索和大型语言模型(LLM)的生成能力,旨在提供更准确、更具时效性的答案。然而,当检索到的文档与用户查询的真实意图不符时,即使LLM的生成能力再强,也难以给出高质量的回答。解决召回不匹配问题是优化RAG系统性能的关键。
2.2 RAG系统召回不匹配的潜在原因 #
- 查询理解不足:用户查询可能过于简短、模糊,或包含多重意图,导致检索系统难以准确理解
- 文档质量问题:原始文档存在噪声、错误、格式不一致,或缺乏必要的元数据
- 分块策略不当:文档分块过大可能引入无关信息,过小可能割裂上下文
- 检索算法局限:单一检索方法(如纯关键词或纯向量)可能无法捕捉所有相关性
- 相关性排序不足:初始召回的文档虽然包含相关信息,但排序不佳,导致最相关的内容未被优先送入LLM
2.3 RAG系统各阶段的优化策略 #
为了解决召回不匹配问题,我们可以从RAG系统的三个主要阶段进行优化:
2.3.1 预处理阶段优化 #
此阶段主要关注原始数据的质量和结构,为后续检索打下基础。
数据清洗:
import re
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
class DocumentPreprocessor:
def __init__(self):
self.lemmatizer = WordNetLemmatizer()
self.stop_words = set(stopwords.words('english'))
def clean_text(self, text):
"""清洗文本数据"""
# 1. 移除特殊字符和噪声
text = re.sub(r'[^\w\s]', '', text)
# 2. 转换为小写
text = text.lower()
# 3. 分词和词形还原
tokens = word_tokenize(text)
tokens = [self.lemmatizer.lemmatize(token)
for token in tokens if token not in self.stop_words]
# 4. 重新组合
cleaned_text = ' '.join(tokens)
return cleaned_text
def extract_metadata(self, document):
"""提取文档元数据"""
metadata = {
'title': self.extract_title(document),
'author': self.extract_author(document),
'date': self.extract_date(document),
'topic': self.classify_topic(document),
'language': self.detect_language(document)
}
return metadata
def chunk_document(self, text, chunk_size=1000, overlap=200):
"""智能文档分块"""
# 基于句子边界分块
sentences = text.split('. ')
chunks = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk + sentence) < chunk_size:
current_chunk += sentence + ". "
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence + ". "
if current_chunk:
chunks.append(current_chunk.strip())
return chunks元数据增强:
class MetadataEnhancer:
def __init__(self, llm_model):
self.llm = llm_model
def enhance_metadata(self, document, chunk):
"""为文档块增强元数据"""
enhancement_prompt = f"""
为以下文档块生成增强元数据:
文档:{document}
块内容:{chunk}
请提供:
1. 关键概念标签
2. 主题分类
3. 重要性评分
4. 相关实体
"""
enhanced_metadata = self.llm.generate(enhancement_prompt)
return enhanced_metadata2.3.2 检索阶段优化 #
此阶段核心在于如何根据用户查询,从知识库中高效、准确地找到最相关的文档。
混合检索实现:
from langchain.retrievers import BM25Retriever
from langchain.vectorstores import FAISS
from langchain.embeddings import OpenAIEmbeddings
from langchain.retrievers import EnsembleRetriever
class HybridRetrievalSystem:
def __init__(self, documents):
self.documents = documents
self.setup_retrievers()
def setup_retrievers(self):
"""设置多种检索器"""
# 1. BM25关键词检索器
self.bm25_retriever = BM25Retriever.from_documents(self.documents)
self.bm25_retriever.k = 10
# 2. 向量检索器
embeddings = OpenAIEmbeddings()
self.vectorstore = FAISS.from_documents(self.documents, embeddings)
self.vector_retriever = self.vectorstore.as_retriever(search_kwargs={"k": 10})
# 3. 混合检索器
self.ensemble_retriever = EnsembleRetriever(
retrievers=[self.bm25_retriever, self.vector_retriever],
weights=[0.4, 0.6] # 向量检索权重更高
)
def retrieve_documents(self, query, method="hybrid"):
"""根据方法检索文档"""
if method == "bm25":
return self.bm25_retriever.get_relevant_documents(query)
elif method == "vector":
return self.vector_retriever.get_relevant_documents(query)
elif method == "hybrid":
return self.ensemble_retriever.get_relevant_documents(query)
else:
raise ValueError("Unsupported retrieval method")查询重写技术:
from langchain.prompts import ChatPromptTemplate
from langchain.chains import LLMChain
class QueryRewriter:
def __init__(self, llm_model):
self.llm = llm_model
self.setup_prompts()
def setup_prompts(self):
"""设置查询重写提示词"""
self.multi_query_prompt = ChatPromptTemplate.from_template("""
基于以下用户查询,生成3个不同角度的查询变体:
原始查询:{original_query}
请生成:
1. 技术细节查询
2. 概念解释查询
3. 应用场景查询
每个查询应该从不同角度探索同一主题。
""")
self.hyde_prompt = ChatPromptTemplate.from_template("""
基于以下查询,生成一个假设性文档,该文档应该包含用户可能寻找的答案:
查询:{query}
请生成一个包含相关信息的假设性文档。
""")
self.stepback_prompt = ChatPromptTemplate.from_template("""
将以下具体问题抽象为更通用的概念性问题:
具体问题:{specific_query}
请生成一个更抽象、更广泛的问题,用于检索相关原理和概念。
""")
def multi_query_rewrite(self, query):
"""多查询重写"""
chain = LLMChain(llm=self.llm, prompt=self.multi_query_prompt)
result = chain.run(original_query=query)
return self.parse_multi_queries(result)
def hyde_rewrite(self, query):
"""Hyde方法查询重写"""
chain = LLMChain(llm=self.llm, prompt=self.hyde_prompt)
hypothetical_doc = chain.run(query=query)
return hypothetical_doc
def stepback_rewrite(self, query):
"""StepBack方法查询重写"""
chain = LLMChain(llm=self.llm, prompt=self.stepback_prompt)
abstract_query = chain.run(specific_query=query)
return abstract_query
def parse_multi_queries(self, result):
"""解析多查询结果"""
# 简单的解析逻辑,实际应用中可能需要更复杂的解析
queries = result.split('\n')
return [q.strip() for q in queries if q.strip()]重排序机制:
from sentence_transformers import CrossEncoder
class DocumentReranker:
def __init__(self, model_name="cross-encoder/ms-marco-MiniLM-L-6-v2"):
self.reranker = CrossEncoder(model_name)
def rerank_documents(self, query, documents, top_k=5):
"""对文档进行重排序"""
# 准备查询-文档对
pairs = [(query, doc.page_content) for doc in documents]
# 计算相关性分数
scores = self.reranker.predict(pairs)
# 创建文档-分数对并排序
doc_scores = list(zip(documents, scores))
doc_scores.sort(key=lambda x: x[1], reverse=True)
# 返回top_k个最相关的文档
return [doc for doc, score in doc_scores[:top_k]]
def adaptive_reranking(self, query, documents, initial_k=20, final_k=5):
"""自适应重排序"""
# 1. 增加初始召回数量
if len(documents) < initial_k:
return documents
# 2. 重排序
reranked = self.rerank_documents(query, documents, top_k=final_k)
return reranked2.3.3 后处理阶段优化 #
此阶段主要对LLM输入前的上下文进行精炼,确保LLM接收到的信息是高效且无冗余的。
上下文压缩:
class ContextCompressor:
def __init__(self, llm_model):
self.llm = llm_model
def compress_context(self, query, documents, max_tokens=4000):
"""压缩上下文内容"""
# 1. 提取关键信息
key_information = self.extract_key_information(query, documents)
# 2. 移除冗余内容
compressed = self.remove_redundancy(key_information)
# 3. 确保不超过token限制
if self.estimate_tokens(compressed) > max_tokens:
compressed = self.truncate_content(compressed, max_tokens)
return compressed
def extract_key_information(self, query, documents):
"""提取关键信息"""
context = "\n\n".join([doc.page_content for doc in documents])
extraction_prompt = f"""
基于用户查询,从以下上下文中提取最相关的信息:
查询:{query}
上下文:{context}
请提取:
1. 直接相关的信息
2. 支持性证据
3. 关键数据点
4. 重要结论
保持信息的完整性和准确性。
"""
key_info = self.llm.generate(extraction_prompt)
return key_info
def remove_redundancy(self, content):
"""移除冗余内容"""
deduplication_prompt = f"""
请移除以下内容中的冗余信息,保留核心要点:
{content}
要求:
1. 保持信息完整性
2. 移除重复内容
3. 保持逻辑连贯性
"""
deduplicated = self.llm.generate(deduplication_prompt)
return deduplicatedRAG Fusion实现:
class RAGFusionSystem:
def __init__(self, retrieval_system, reranker, context_compressor):
self.retrieval_system = retrieval_system
self.reranker = reranker
self.context_compressor = context_compressor
def fused_retrieval(self, query, num_queries=3, top_k=10):
"""RAG Fusion检索"""
# 1. 多查询生成
query_rewriter = QueryRewriter(self.llm)
multi_queries = query_rewriter.multi_query_rewrite(query)
# 2. 多查询检索
all_documents = []
for q in multi_queries[:num_queries]:
docs = self.retrieval_system.retrieve_documents(q, method="hybrid")
all_documents.extend(docs)
# 3. 去重
unique_docs = self.deduplicate_documents(all_documents)
# 4. 重排序
reranked_docs = self.reranker.rerank_documents(query, unique_docs, top_k)
# 5. 上下文压缩
compressed_context = self.context_compressor.compress_context(
query, reranked_docs
)
return {
"documents": reranked_docs,
"compressed_context": compressed_context,
"query_variations": multi_queries
}
def deduplicate_documents(self, documents):
"""文档去重"""
seen_content = set()
unique_docs = []
for doc in documents:
content_hash = hash(doc.page_content)
if content_hash not in seen_content:
seen_content.add(content_hash)
unique_docs.append(doc)
return unique_docs2.4 元数据优化应用场景 #
时效性筛选:
class TemporalFilter:
def __init__(self):
self.date_extractor = self.setup_date_extractor()
def filter_by_recency(self, documents, days=30):
"""根据时效性筛选文档"""
cutoff_date = datetime.now() - timedelta(days=days)
recent_docs = []
for doc in documents:
doc_date = self.extract_date_from_doc(doc)
if doc_date and doc_date >= cutoff_date:
recent_docs.append(doc)
return recent_docs
def rank_by_freshness(self, documents):
"""按新鲜度排序"""
doc_dates = [(doc, self.extract_date_from_doc(doc)) for doc in documents]
doc_dates.sort(key=lambda x: x[1] or datetime.min, reverse=True)
return [doc for doc, date in doc_dates]领域过滤:
class DomainFilter:
def __init__(self, domain_classifier):
self.domain_classifier = domain_classifier
def filter_by_domain(self, documents, target_domain):
"""根据领域过滤文档"""
filtered_docs = []
for doc in documents:
doc_domain = self.classify_document_domain(doc)
if doc_domain == target_domain:
filtered_docs.append(doc)
return filtered_docs
def classify_document_domain(self, document):
"""分类文档领域"""
# 使用预训练的分类器或LLM进行分类
classification_prompt = f"""
请将以下文档分类到合适的领域:
文档:{document.page_content}
可选领域:技术、医疗、金融、法律、教育、其他
请只返回领域名称。
"""
domain = self.domain_classifier.generate(classification_prompt)
return domain.strip()2.5 系统集成与监控 #
完整的RAG优化系统:
class OptimizedRAGSystem:
def __init__(self, llm_model, documents):
self.llm = llm_model
self.documents = documents
# 初始化各个组件
self.preprocessor = DocumentPreprocessor()
self.metadata_enhancer = MetadataEnhancer(llm_model)
self.retrieval_system = HybridRetrievalSystem(documents)
self.reranker = DocumentReranker()
self.context_compressor = ContextCompressor(llm_model)
self.rag_fusion = RAGFusionSystem(
self.retrieval_system,
self.reranker,
self.context_compressor
)
# 预处理文档
self.processed_docs = self.preprocess_documents()
def preprocess_documents(self):
"""预处理所有文档"""
processed = []
for doc in self.documents:
# 清洗文本
cleaned_text = self.preprocessor.clean_text(doc.page_content)
# 增强元数据
enhanced_metadata = self.metadata_enhancer.enhance_metadata(doc, cleaned_text)
# 分块
chunks = self.preprocessor.chunk_document(cleaned_text)
for chunk in chunks:
processed.append({
'content': chunk,
'metadata': enhanced_metadata,
'original_doc': doc
})
return processed
def answer_question(self, query, use_fusion=True):
"""回答用户问题"""
if use_fusion:
# 使用RAG Fusion
retrieval_result = self.rag_fusion.fused_retrieval(query)
context = retrieval_result["compressed_context"]
else:
# 使用标准检索
docs = self.retrieval_system.retrieve_documents(query, method="hybrid")
reranked_docs = self.reranker.rerank_documents(query, docs)
context = self.context_compressor.compress_context(query, reranked_docs)
# 生成最终答案
answer_prompt = f"""
基于以下上下文信息回答用户问题:
用户问题:{query}
上下文信息:
{context}
请提供准确、完整的答案。如果上下文中没有足够信息,请明确说明。
"""
answer = self.llm.generate(answer_prompt)
return {
"answer": answer,
"context": context,
"retrieval_method": "fusion" if use_fusion else "standard"
}2.6 性能评估与优化 #
评估指标:
class RAGEvaluator:
def __init__(self):
self.metrics = {}
def evaluate_retrieval_quality(self, query, retrieved_docs, ground_truth):
"""评估检索质量"""
# 1. 召回率 (Recall)
relevant_docs = set(ground_truth)
retrieved_doc_ids = set([doc.id for doc in retrieved_docs])
recall = len(relevant_docs & retrieved_doc_ids) / len(relevant_docs)
# 2. 精确率 (Precision)
precision = len(relevant_docs & retrieved_doc_ids) / len(retrieved_doc_ids)
# 3. MRR (Mean Reciprocal Rank)
mrr = self.calculate_mrr(retrieved_docs, ground_truth)
# 4. NDCG (Normalized Discounted Cumulative Gain)
ndcg = self.calculate_ndcg(retrieved_docs, ground_truth)
return {
"recall": recall,
"precision": precision,
"mrr": mrr,
"ndcg": ndcg
}
def calculate_mrr(self, retrieved_docs, ground_truth):
"""计算MRR"""
for i, doc in enumerate(retrieved_docs):
if doc.id in ground_truth:
return 1.0 / (i + 1)
return 0.0
def calculate_ndcg(self, retrieved_docs, ground_truth, k=10):
"""计算NDCG@k"""
# 简化的NDCG计算
dcg = 0.0
for i, doc in enumerate(retrieved_docs[:k]):
if doc.id in ground_truth:
dcg += 1.0 / np.log2(i + 2)
# 理想DCG
idcg = sum(1.0 / np.log2(i + 2) for i in range(min(len(ground_truth), k)))
return dcg / idcg if idcg > 0 else 0.02.7 总结 #
优化RAG系统召回结果与用户意图不匹配的问题是一个系统性工程,需要从数据预处理、检索策略和后处理等多个环节综合考虑。通过采用混合检索、查询重写、重排序以及元数据优化等技术,可以显著提升RAG系统的性能和用户满意度。关键是要根据具体应用场景选择合适的优化策略,并持续监控和评估系统性能,进行迭代改进。