千帆+Langchain+VectorDB 建立简单的 RAG 示例
RAG 介绍
RAG是一种先进的自然语言处理方法,它结合了信息检索和文本生成技术,用于提高问答系统、聊天机器人等应用的性能。以下是RAG的详细工作流程:
RAG 的工作流程
RAG的工作流程
-
文档加载(Document Loading)
- 从各种来源加载大量文档数据。
- 这些文档将作为知识库,用于后续的信息检索。
-
文档分割(Document Splitting)
- 将加载的文档分割成更小的段落或部分。
- 这有助于提高检索的准确性和效率。
-
嵌入向量生成(Embedding Generation)
- 对每个文档或文档的部分生成嵌入向量。
- 这些嵌入向量捕捉文档的语义信息,方便后续的相似度比较。
-
写入向量数据库(Writing to Vector Database)
- 将生成的嵌入向量存储在一个向量数据库中。
- 数据库支持高效的相似度搜索操作。
-
查询生成(Query Generation)
- 用户提出一个问题或输入一个提示。
- RAG模型根据输入生成一个或多个相关的查询。
-
文档检索(Document Retrieval)
- 使用生成的查询在向量数据库中检索相关文档。
- 选择与查询最相关的文档作为信息源。
-
上下文融合(Context Integration)
- 将检索到的文档内容与原始问题或提示融合,构成扩展的上下文。
-
答案生成(Answer Generation)
- 基于融合后的上下文,RAG生成模型产生最终的回答或文本。
- 这一步骤旨在综合原始输入和检索到的信息。
准备环境
向量数据库环境
新用户可以创建免费版进行测试,免费版为单节点架构,地址:https://cloud.baidu.com/product/vdb.html
1、创建百度向量数据库实例,注意需要地域,可用区需要和 BCC 保持在同一个 VPC 内。 地址:https://console.bce.baidu.com/vdb/#/vdb/instance/create
2、创建成功后,通过实例详情页查看访问的地址信息和账号信息,用于访问操作向量数据库。如例子截图,访问信息如下:
# 访问地址格式:http://${IP}:${PORT}
访问地址:http://192.168.20.4:5287
账号:root
密钥:xxxx
开通千帆 Embedding 模型
千帆模型开通付费之后才能使用,开通不会产生费用,且有代金券赠送
1、开通千帆 Embedding-V1 模型的收费,地址: https://console.bce.baidu.com/qianfan/chargemanage/list
2、右上角个人中心的安全认证里面提取用于鉴权调用 Embedding 模型的 Access Key 和 Secret Key
客户端环境
数据准备和写入
本例子使用的是 bcc 计算型 c5 2c4g 实例基于 Centos 系统作为例子,但不仅限于 bcc,只要是同 vpc 内的服务器产品都可以。已经有 BCC 客户端的用户忽略步骤 1。
1、创建 BCC 客户端。 地址:https://console.bce.baidu.com/bcc/#/bcc/instance/create
2、登录创建的实例进行环境准备,部署安装 python 环境和搭建知识库所必须的依赖包,
# 安装 python 3.9
yum install -y python39
# langchain 依赖包,用于把文本数据转化为向量数据。
# pymochow 依赖包,用于访问和操作百度向量数据库。
# qianfan 依赖包,用于访问千帆大模型。
# pdfplumber 依赖包,加载处理 pdf 文档。
pip3.9 install "langchain>=0.1.13" pymochow qianfan pdfplumber
# 创建项目目录
mkdir -p knowledge/example_data && cd knowledge
3、上传一个 PDF 文件到 knowledge/example_data 目录下
4、创建访问的配置文件
# config.py
import os
from pymochow.auth.bce_credentials import BceCredentials
# 定义配置信息
account = 'root'
api_key = '修改为你的密钥'
endpoint = '修改为之前记录的访问地址,如 http://192.168.20.4:5287'
# 初始化BceCredentials对象
credentials = BceCredentials(account, api_key)
# 设置千帆AI平台的安全认证信息(AK/SK),通过环境变量
# 注意替换以下参数为您的Access Key和Secret Key
os.environ["QIANFAN_ACCESS_KEY"] = "your_iam_ak"
os.environ["QIANFAN_SECRET_KEY"] = "your_iam_sk"
5、创建 document 数据库
import pymochow
from pymochow.configuration import Configuration
import config # 导入配置文件
config_obj = Configuration(credentials=config.credentials, endpoint=config.endpoint)
client = pymochow.MochowClient(config_obj)
try:
db = client.create_database("document")
except Exception as e: # 捕获所有类型的异常
print(f"Error: {e}") # 打印异常信息
db_list = client.list_databases()
for db_name in db_list:
print(db_name.database_name)
client.close()
6、创建 chunks 数据表
import time
import pymochow # 导入pymochow库,用于操作数据库
from pymochow.configuration import Configuration # 用于配置客户端
import config # 导入配置文件,包含身份验证和终端信息
# 导入pymochow模型相关的类和枚举类型
from pymochow.model.schema import Schema, Field, VectorIndex, SecondaryIndex, HNSWParams
from pymochow.model.enum import FieldType, IndexType, MetricType, TableState
from pymochow.model.table import Partition
# 使用配置文件中的信息初始化客户端
config_obj = Configuration(credentials=config.credentials, endpoint=config.endpoint)
client = pymochow.MochowClient(config_obj)
# 选择或创建数据库
db = client.database("document")
# 定义数据表的字段
fields = [
Field("id", FieldType.UINT64, primary_key=True, partition_key=True, auto_increment=False, not_null=True),
Field("text", FieldType.STRING),
Field("metadata", FieldType.STRING),
Field("source", FieldType.STRING),
Field("author", FieldType.STRING, not_null=True),
Field("vector", FieldType.FLOAT_VECTOR, not_null=True, dimension=384)
]
# 定义数据表的索引
indexes = [
VectorIndex(index_name="vector_idx", field="vector", index_type=IndexType.HNSW, metric_type=MetricType.L2, params=HNSWParams(m=32, efconstruction=200)),
SecondaryIndex(index_name="author_idx", field="author")
]
# 尝试创建数据表,捕获并打印可能出现的异常
try:
# replication 数量不超过实例数据节点的数量
# 免费版参考值 replication=1
# 标准版参考值 replication=3
table = db.create_table(table_name="chunks", replication=1, partition=Partition(partition_num=1), schema=Schema(fields=fields, indexes=indexes))
except Exception as e: # 捕获所有类型的异常
print(f"Error: {e}") # 打印异常信息
# 轮询数据表状态,直到表状态为NORMAL,表示表已准备好
while True:
time.sleep(2) # 每次检查前暂停2秒,减少对服务器的压力
table = db.describe_table("chunks")
if table.state == TableState.NORMAL: # 表状态为NORMAL,跳出循环
break
# 打印数据表的详细信息
print("table: {}".format(table.to_dict()))
client.close() # 关闭客户端连接
7、从PDF文档中加载数据、将文档内容分割为更小的文本块以及利用千帆AI平台的接口来对文本进行向量化表示,并且写到 chunks 表,本例子会用小的文档作为例子,用户可以根据实际情况加载。
# 导入必要的库
from langchain_community.document_loaders import PDFPlumberLoader # 用于加载PDF文档
from langchain.text_splitter import RecursiveCharacterTextSplitter # 用于文本分割
import os # 用于操作系统功能,如设置环境变量
import qianfan # 千帆AI平台SDK
import time # 用于暂停执行,避免请求频率过高
import json
import pymochow
import config # 导入配置文件
from pymochow.model.table import Row # 用于写入向量数据
from pymochow.configuration import Configuration
# 加载PDF文档
loader = PDFPlumberLoader("./example_data/ai-paper.pdf") # 指定PDF文件路径
documents = loader.load() # 加载文档
print(documents[0]) # 打印加载的第一个文档内容
# 设置文本分割器,指定分割的参数
# chunk_size定义了每个分割块的字符数,chunk_overlap定义了块之间的重叠字符数
# separators列表定义了用于分割的分隔符
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=384,
chunk_overlap=0,
separators=["\n\n", "\n", " ", "", "。", ","]
)
all_splits = text_splitter.split_documents(documents) # 对文档进行分割
print(all_splits[0]) # 打印分割后的第一个块内容
emb = qianfan.Embedding() # 初始化嵌入模型对象
embeddings = [] # 用于存储每个文本块的嵌入向量
for chunk in all_splits: # 遍历所有分割的文本块
# 获取文本块的嵌入向量,使用默认模型Embedding-V1
resp = emb.do(texts=[chunk.page_content])
embeddings.append(resp['data'][0]['embedding']) # 将嵌入向量添加到列表中
time.sleep(1) # 暂停1秒,避免请求过于频繁
print(embeddings[0]) # 打印第一个文本块的嵌入向量
# 逐行写入向量化数据
rows = []
for index, chunk in enumerate(all_splits):
metadata = "{}"
if chunk.metadata is not None:
metadata = json.dumps(chunk.metadata)
row = Row(
id=index,
text=chunk.page_content,
metadata=metadata,
source=chunk.metadata["source"],
author=chunk.metadata["Creator"],
vector=embeddings[index]
)
rows.append(row)
# 打印第一个Row对象转换成的字典格式,以验证数据结构
print(rows[0].to_dict())
# 读取数据库配置文件,并且初始化连接
config_obj = Configuration(credentials=config.credentials, endpoint=config.endpoint)
client = pymochow.MochowClient(config_obj)
# 选择或创建数据库
db = client.database("document")
try:
table = db.describe_table("chunks")
table.upsert(rows=rows) # 批量写入向量数据,一次最多支持写入1000条
table.rebuild_index("vector_idx") # 创建向量索引,必要步骤
except Exception as e: # 捕获所有类型的异常
print(f"Error: {e}") # 打印异常信息
client.close()
当打印到如下的数据证明你写入成功了。
RAG 问答示例
结合百度向量数据库、Langchain、千帆实现一个简单的RAG功能。
import os
import config
from langchain_community.vectorstores import BaiduVectorDB
from langchain_community.vectorstores.baiduvectordb import ConnectionParams, TableParams
from langchain_community.embeddings import QianfanEmbeddingsEndpoint
from langchain_community.chat_models import QianfanChatEndpoint
from langchain.chains import RetrievalQA
# 初始化向量嵌入和连接参数
embeddings = QianfanEmbeddingsEndpoint()
conn_params = ConnectionParams(
endpoint=config.endpoint,
account=config.account,
api_key=config.api_key
)
# 初始化百度云向量数据库
vector_db = BaiduVectorDB(
embedding=embeddings,
connection_params=conn_params,
table_params=TableParams(384),
database_name="document",
table_name="chunks",
drop_old=False,
)
# 初始化检索器和对话模型
retriever = vector_db.as_retriever(search_type="similarity")
qianfan_chat_model = QianfanChatEndpoint(model="ERNIE-Bot", temperature=0.1)
# 初始化问答模块
qa = RetrievalQA.from_chain_type(llm=qianfan_chat_model, chain_type="refine", retriever=retriever, return_source_documents=True)
# 接收用户输入的问题
query = input("\nYour question: ")
# 处理用户问题并获取答案和相关文档
res = qa(query)
answer, docs = res['result'], res['source_documents']
# 打印用户提出的问题和系统给出的回答
print("\n\n> Question:")
print(query)
print("\n> Answer:")
print(answer)
运行之后我们就可以基于文档提问,如提问“VectorDB 的数据引擎叫什么,用中文回答”,Demo 会把相关的文档内容提炼答复,如下图所示
标量和向量检索示例
1、基于标量的检索
import pymochow
from pymochow.configuration import Configuration
import config # 导入配置文件
config_obj = Configuration(credentials=config.credentials, endpoint=config.endpoint)
client = pymochow.MochowClient(config_obj)
# 选择或创建数据库
db = client.database("document")
try:
table = db.describe_table("chunks")
primary_key = {'id': 0}
projections = ["id", "text", "source", "author"]
res = table.query(
primary_key=primary_key,
projections=projections,
retrieve_vector=True
)
except Exception as e: # 捕获所有类型的异常
print(f"Error: {e}") # 打印异常信息
print(res)
client.close()
2、基于向量的检索
import os
import config
import pymochow
import qianfan
from pymochow.configuration import Configuration
from pymochow.model.table import AnnSearch, HNSWSearchParams
# 初始化千帆AI平台的嵌入模型对象
emb = qianfan.Embedding()
# 定义待查询的问题文本
question = "讲解下大模型的发展趋势"
# 获取问题文本的嵌入向量
resp = emb.do(texts=[question])
question_embedding = resp['data'][0]['embedding']
# 使用配置信息初始化向量数据库客户端
config_obj = Configuration(credentials=config.credentials, endpoint=config.endpoint)
client = pymochow.MochowClient(config_obj)
# 选择数据库
db = client.database("document")
try:
# 获取指定表的描述信息
table = db.describe_table("chunks")
# 构建近似最近邻搜索对象
anns = AnnSearch(
vector_field="vector", # 指定用于搜索的向量字段名
vector_floats=question_embedding, # 提供查询向量
params=HNSWSearchParams(ef=200, limit=1) # 设置HNSW算法参数和返回结果的限制数量
)
# 执行搜索操作
res = table.search(anns=anns)
# 打印搜索结果
print(res)
except Exception as e: # 捕获并打印所有异常信息
print(f"Error: {e}")
# 关闭客户端连接
client.close()
3、基于标量和向量的混合检索
import os
import config
import pymochow
import qianfan
from pymochow.configuration import Configuration
from pymochow.model.table import AnnSearch, HNSWSearchParams
# 初始化千帆AI平台的嵌入模型对象
emb = qianfan.Embedding()
# 定义待查询的问题文本
question = "讲解下大模型的发展趋势"
# 获取问题文本的嵌入向量
resp = emb.do(texts=[question])
question_embedding = resp['data'][0]['embedding']
# 使用配置信息初始化向量数据库客户端
config_obj = Configuration(credentials=config.credentials, endpoint=config.endpoint)
client = pymochow.MochowClient(config_obj)
# 选择数据库
db = client.database("document")
try:
# 获取指定表的描述信息
table = db.describe_table("chunks")
# 构建近似最近邻搜索对象
anns = AnnSearch(
vector_field="vector", # 指定用于搜索的向量字段名
vector_floats=question_embedding, # 提供查询向量
params=HNSWSearchParams(ef=200, limit=1), # 设置HNSW算法参数和返回结果的限制数量
filter="author='CNKI'" # 提供标量的过滤条件
)
# 执行搜索操作
res = table.search(anns=anns)
# 打印搜索结果
print(res)
except Exception as e: # 捕获并打印所有异常信息
print(f"Error: {e}")
# 关闭客户端连接
client.close()