操作示例
更新时间:2024-12-12
概要
向量数据库目前主要应用于 RAG 场景。在 RAG 的应用中,除了向量数据库,还需要文档管理、文档解析、分档分块、Embedding 服务(将内容向量化)、Rerank,以及 LLM 大模型等多个服务的相互协作,共同实现完整的 RAG 功能。基于这一需求,我们对各模块服务进行了封装,提供了对应的接口,方便用户自定义各模块,并与向量数据库高效结合,快速构建属于自己的 RAG 服务。具体的使用方式可以参考:AI Search使用样例
文档管理
功能介绍
文档管理中,SDK 提供了用于管理本地文件和百度对象存储的简单实现,用户也可以根据业务需求自定义相关功能。
接口
class DocumentHub(ABC):
"""
DocumentHub: Abstract base class for the document hub that defines the basic operations.
Subclasses should implement methods for adding, removing, listing, and downloading documents.
"""
@abstractmethod
def add(self, doc: Document) -> Document:
"""
Add a document to the hub.
Parameters:
doc (Document): The document to be added.
"""
pass
@abstractmethod
def remove(self, doc: Document):
"""
Remove a document from the hub.
Parameters:
doc (Document): The document to be removed.
"""
pass
@abstractmethod
def list(self) -> List[Document]:
"""
List all documents available in the hub.
Returns:
List[Document]: A list of all available document objects.
"""
pass
@abstractmethod
def load(self, doc: Document) -> Document:
"""
Load a document from the hub.
Parameters:
doc (Document): The document to load.
Returns:
doc (Document): If the document in hub then load it.
"""
pass
基于接口,自定义文件管理器
from pymochow.ai.dochub import DocumentHub, DocumentHubEnv
class UserDefinedDocumentHub(DocumentHub):
...
示例
管理本地文件
import logging
import uuid
from pymochow.ai.dochub import (
LocalDocumentHub,
DocumentHubEnv
)
from pymochow.model.document import Document
env = DocumentHubEnv(root_path="local://your_root_path/")
doc_hub = LocalDocumentHub(env=env)
kb_id = str(uuid.uuid4())
doc = Document(
kb_id=kb_id, # 知识库ID
doc_name="test/test.pdf", # 文档名,指期望文档存储在文档管理中的相对路径,举例:test/test.pdf
file_path="./test.pdf" # 文档的本地路径
)
# 将文件纳入到文档管理中,将文件./test.pdf复制到/root_path/test/test.pdf
doc_hub.add(doc=doc)
# 列出文档管理器中的所有文档
docs = doc_hub.list()
for doc in docs:
logger.debug("doc: {} in hub".format(doc.to_dict()))
# 删除文档管理器中的文档
doc_hub.remove(doc=doc)
管理对象存储中的文件
import logging
import uuid
from pymochow.ai.dochub import (
BosDocumentHub,
DocumentHubEnv
)
from pymochow.model.document import Document
env = DocumentHubEnv(
endpoint="bj.bcebos.com",
ak="your_bos_ak",
sk="your_bos_sk",
root_path="bos://your_bucket/object_prefix",
local_cache_path="./tmp" # local file cache dir
)
doc_hub = BosDocumentHub(env=env)
kb_id = str(uuid.uuid4())
doc = Document(
kb_id=kb_id, # 知识库ID
doc_name="test/test.pdf", # 文档名,指期望文档存储在文档管理中的相对路径,举例:test/test.pdf
file_path="./test.pdf" # 文档的本地路径
)
# 将文件纳入到文档管理中
doc = doc_hub.add(doc=doc)
# 列出文档管理器中的所有文档
docs = doc_hub.list()
for doc in docs:
logger.debug("doc: {} in hub".format(doc.to_dict()))
# 将文档加载到本地cache
doc = Document(
kb_id=kb_id,
doc_name="test/test.pdf"
)
doc = doc_hub.load(doc)
logger.debug("load doc: {} from hub".format(doc.to_dict()))
# 删除文档管理器中的文档
doc_hub.remove(doc=doc)
文档处理
功能介绍
文档处理包括文档解析和文档分块,SDK 提供了基于Langchain和Baidu Qianfan的简单实现, 用户也可以根据业务需求自定义相关功能。
接口
class DocProcessor(ABC):
"""
DocProcessor: An abstract base class that defines the interface for parsing and splitting documents into chunks.
Subclasses must implement the `process_doc` method to parse and split the document based on specific criteria,
such as page length, overlap length, and the number of pages to take.
"""
@abstractmethod
def process_doc(self, doc) -> List[DocumentChunk]:
"""
Parse and split the document into chunks based on the provided parameters.
Parameters:
----------
doc : Document
The document to be parsed and split into chunks.
Returns:
-------
List[DocumentChunk]
A list of DocumentChunk objects, where each chunk represents a part of the parsed document.
"""
pass
基于接口,自定义文件解析器
from pymochow.ai.processor import DocProcessor
class UserDefinedDocProcessor(DocProcessor):
...
示例
基于Langchain的文档处理
import logging
import uuid
from pymochow.ai.dochub import (
LocalDocumentHub,
DocumentHubEnv
)
from pymochow.model.document import Document
from pymochow.ai.processor import LangchainDocProcessor
env = DocumentHubEnv(root_path="local://your_root_path/")
doc_hub = LocalDocumentHub(env=env)
kb_id = str(uuid.uuid4())
doc = Document(
kb_id=kb_id, # 知识库ID
doc_name="test/test.pdf", # 文档名,指期望文档存储在文档管理中的相对路径,举例:test/test.pdf
file_path="./test.pdf" # 文档的本地路径
)
# 将文件纳入到文档管理中,将文件./test.pdf复制到/root_path/test/test.pdf
doc_hub.add(doc=doc)
doc_processor = LangchainDocProcessor(maximum_page_length=300, page_overlap_length=50)
doc_chunks = doc_processor.process_doc(doc)
基于百度Qianfan的文档处理
import logging
import uuid
from pymochow.ai.dochub import (
LocalDocumentHub,
DocumentHubEnv
)
from pymochow.model.document import Document
from pymochow.ai.processor import QianfanDocProcessor
env = DocumentHubEnv(root_path="local://your_root_path/")
doc_hub = LocalDocumentHub(env=env)
kb_id = str(uuid.uuid4())
doc = Document(
kb_id=kb_id, # 知识库ID
doc_name="test/test.pdf", # 文档名,指期望文档存储在文档管理中的相对路径,举例:test/test.pdf
file_path="./test.pdf" # 文档的本地路径
)
# 将文件纳入到文档管理中,将文件./test.pdf复制到/root_path/test/test.pdf
doc_hub.add(doc=doc)
os.environ["APPBUILDER_TOKEN"] = "your_ab_token" # 需要填入您AppBuilder的token,会调用AppBuilder的服务进行文档处理
doc_processor = QianfanDocProcessor()
chunks = doc_processor.process_doc(doc)
Embedder
SDK 提供了用于百度Qianfan的实现,用户也可以根据业务需求对接其他的Embedding服务。
接口
class Embedder(ABC):
"""
Embedder: An abstract base class for generating embeddings for document chunks.
"""
@abstractmethod
def embedding(self, chunks) -> List[DocumentChunk]:
"""
Generate embeddings for specified fields in document chunks.
Parameters:
chunks (List[DocumentChunk]): A list of document chunks that need to be processed for embeddings.
Returns:
List[DocumentChunk]: A list of `DocumentChunk` objects,
with the embeddings added to the corresponding fields based on `field_mapping`.
"""
pass
@abstractmethod
def embedding_text(self, texts) -> List[List[float]]:
"""
Embeds a given text into a numerical representation and returns the result.
Args:
text (List[str]): The text to be embedded.
Returns:
List[List[float]]: The numerical embedding of the text as a list of floats.
Note:
The method includes a sleep call to throttle the embedding rate due to API rate limits.
"""
基于接口,自定义Embedder
from pymochow.ai.embedder import Embedder
class OpenAiEmbedder(Embedder):
...
示例
基于百度Qianfan的Embedding
import logging
import uuid
from pymochow.ai.dochub import (
LocalDocumentHub,
DocumentHubEnv
)
from pymochow.model.document import Document
from pymochow.ai.processor import QianfanDocProcessor
from pymochow.ai.embedder import QianfanEmbedder
env = DocumentHubEnv(root_path="local://your_root_path/")
doc_hub = LocalDocumentHub(env=env)
kb_id = str(uuid.uuid4())
doc = Document(
kb_id=kb_id, # 知识库ID
doc_name="test/test.pdf", # 文档名,指期望文档存储在文档管理中的相对路径,举例:test/test.pdf
file_path="./test.pdf" # 文档的本地路径
)
# 将文件纳入到文档管理中,将文件./test.pdf复制到/root_path/test/test.pdf
doc_hub.add(doc=doc)
os.environ["APPBUILDER_TOKEN"] = "your_ab_token" # 需要填入您AppBuilder的token,会调用AppBuilder的服务进行文档处理和Embedding
doc_processor = QianfanDocProcessor()
chunks = doc_processor.process_doc(doc)
embedder = QianfanEmbedder(batch=2)
chunks = embedder.embedding(chunks)
处理管道
功能介绍
处理管道可以高效整合文档管理器、文档处理器、Embedder 和向量数据库。SDK 提供了默认实现,用户可以借助默认实现完成文档导入向量数据库、向量检索、全文检索以及混合检索等功能。此外,用户还可以根据具体业务需求对相关功能进行自定义扩展。
接口
class Pipeline(ABC):
"""
Pipeline: An abstract base class that defines the interface for ingesting
doc into vectordb and search from it.
"""
@abstractmethod
def ingest_doc(doc,
doc_processor=None,
embedder=None,
meta_table=None,
doc_to_row_mapping=None,
chunk_table=None,
chunk_to_row_mapping=None):
"""
Abstract method for processing and storing the ingestion of documents and their chunks.
Parameters:
doc (Document): The document object to be ingested.
doc_processor (Processor, optional): A tool used to parse and split document.
embedder (Embedder, optional): A tool used to generate embeddings of the document content.
meta_table (Table, optional): The table in the database that stores document data.
doc_to_row_mapping (dict, optional): A JSON-like dictionary that defines
the mapping between document object attributes and database table columns.
Example mapping:
{
'doc_id': 'document_id', # Maps 'doc_id' in the document to 'document_id' in the database
'doc_name': 'document_name' # Maps 'doc_name' in the document to 'document_name' in the database
# Add more mappings as needed...
}
chunk_table (Table, optional): The table in the database used to store document chunks,
if the document is processed in chunks.
chunk_to_row_mapping (dict, optional): Similar to doc_to_row_mapping,
this dictionary defines how attributes of chunks map to database table columns.
Example:
{
'chunk_id': 'chunk_id', # Example mapping, no change
# Additional mappings can be added here...
}
Returns:
None: This method does not return anything but may modify data in the database or other storage systems.
Note:
This is an abstract method that must be implemented in any subclass inheriting
from Pipeline with specific logic for ingestion.
"""
pass
def vector_search(
self,
search_contents: List[str],
embedder: Embedder,
table: Table,
search_request: VectorSearchRequest,
partition_key: Dict[str, Any] = None,
projections: List[str] = None,
read_consistency: ReadConsistency = ReadConsistency.EVENTUAL,
config: Dict[Any, Any] = None
):
"""
Perform a vector-based search operation.
This method converts the search content into a vector using the embedder and performs a
vector search in the specified table, returning the most similar results to the query.
Parameters:
----------
search_contents : List[str]
The input search content, usually in the form of text or already embedded vector.
embedder : Embedder
The embedder object used to convert the search content into vector form.
table : Table
The target table where the search is conducted.
search_request : VectorSearchRequest
The search request object containing parameters like TopK, filters, etc.
partition_key : Dict[str, Any], optional
The partition key to narrow the search to a specific partition, default is None.
projections : List[str], optional
The list of fields to include in the search result, default is None.
read_consistency : ReadConsistency, optional
The level of read consistency required, default is EVENTUAL.
config : Dict[Any, Any], optional
Additional configurations for the search, default is None.
"""
pass
def bm25_search(
self,
table: Table,
search_request: BM25SearchRequest,
partition_key: Dict[str, Any] = None,
projections: List[str] = None,
read_consistency: ReadConsistency = ReadConsistency.EVENTUAL,
config: Dict[Any, Any] = None
):
"""
Perform a BM25-based text search operation.
This method uses the BM25 search algorithm to perform a text-based search on the specified
table, returning the most relevant documents based on the query terms.
Parameters:
----------
table : Table
The target table where the search will be performed.
search_request : BM25SearchRequest
The search request object, which contains query terms and other parameters.
partition_key : Dict[str, Any], optional
The partition key to narrow the search to a specific partition, default is None.
projections : List[str], optional
The list of fields to include in the search result, default is None.
read_consistency : ReadConsistency, optional
The level of read consistency required, default is EVENTUAL.
config : Dict[Any, Any], optional
Additional configurations for the search, default is None.
"""
pass
def hybrid_search(
self,
search_contents: List[str],
embedder: Embedder,
table: Table,
search_request: HybridSearchRequest,
partition_key: Dict[str, Any] = None,
projections: List[str] = None,
read_consistency: ReadConsistency = ReadConsistency.EVENTUAL,
config: Dict[Any, Any] = None
):
"""
Perform a hybrid search (vector + traditional text search).
This method combines vector search with BM25 text search, suitable for scenarios
requiring both semantic and keyword-based search.
Parameters:
----------
search_contents : List[str]
The input search content, usually text-based.
embedder : Embedder
The embedder object used to convert the search content into vectors.
table : Table
The target table where the search is conducted.
partition_key : Dict[str, Any], optional
The partition key to narrow the search to a specific partition, default is None.
projections : List[str], optional
The list of fields to include in the search result, default is None.
read_consistency : ReadConsistency, optional
The level of read consistency required, default is EVENTUAL.
config : Dict[Any, Any], optional
Additional configurations for the search, default is None.
"""
pass
示例
将文档导入向量数据库
import logging
import uuid
from pymochow.ai.dochub import (
LocalDocumentHub,
DocumentHubEnv
)
from pymochow.model.document import Document
from pymochow.ai.processor import QianfanDocProcessor
from pymochow.ai.embedder import QianfanEmbedder
from pymochow.ai.pipeline import DefaultPipeline
env = DocumentHubEnv(root_path="local://your_root_path/")
doc_hub = LocalDocumentHub(env=env)
kb_id = str(uuid.uuid4())
doc = Document(
kb_id=kb_id, # 知识库ID
doc_name="test/test.pdf", # 文档名,指期望文档存储在文档管理中的相对路径,举例:test/test.pdf
file_path="./test.pdf" # 文档的本地路径
)
# 将文件纳入到文档管理中,将文件./test.pdf复制到/root_path/test/test.pdf
doc_hub.add(doc=doc)
os.environ["APPBUILDER_TOKEN"] = "your_ab_token" # 需要填入您AppBuilder的token,会调用AppBuilder的服务进行文档处理和Embedding
doc_processor = QianfanDocProcessor(maximum_page_length=300, page_overlap_length=50)
embedder = QianfanEmbedder(batch=2)
pipeline = DefaultPipeline()
# 需要在向量数据库中创建对应的库和表,用于存储文档的元数据以及文档块内容
db_name = "DocumentInsight" # 向量数据库的库名
meta_table_name = "KnowledgeBase_Meta" # 存储文档元数据的表
chunk_table_name = "KnowledgeBase_Chunk" # 存储文档块的表
# 向量数据库中创建对应表
meta_table = self.create_or_get_meta_table(db_name=db_name, table_name=meta_table_name)
chunk_table = self.create_or_get_chunk_table(db_name=db_name, table_name=chunk_table_name)
# 将文档解析、Embedding后存储到向量数据库
pipeline.ingest_doc(
doc=doc,
doc_processor=doc_processor,
embedder=embedder,
meta_table=meta_table,
chunk_table=chunk_table
)
对知识库进行向量检索
from pymochow.ai.embedder import QianfanEmbedder
from pymochow.ai.pipeline import DefaultPipeline
from pymochow.model.table import (
VectorTopkSearchRequest
)
os.environ["APPBUILDER_TOKEN"] = "your ab token"
# embedder 用于对问题进行向量化
embedder = QianfanEmbedder(batch=2)
pipeline = DefaultPipeline()
db_name = "DocumentInsight"
chunk_table_name = "KnowledgeBase_Chunk"
chunk_table = self.create_or_get_chunk_table(db_name=db_name, table_name=chunk_table_name)
search_contents = ["your question"]
search_request = VectorTopkSearchRequest(
vector_field="embedding",
limit=10,
#filter="kb_id='xxx'", 填入标量过滤条件
config=VectorSearchConfig(ef=200)
)
result = pipeline.vector_search(
search_contents=search_contents,
embedder=embedder,
table=chunk_table,
search_request=search_request
)
对知识库进行全文检索
from pymochow.ai.pipeline import DefaultPipeline
from pymochow.model.table import (
BM25SearchRequest
)
pipeline = DefaultPipeline()
db_name = "DocumentInsight"
chunk_table_name = "KnowledgeBase_Chunk"
chunk_table = self.create_or_get_chunk_table(db_name=db_name, table_name=chunk_table_name)
search_request = BM25SearchRequest(
index_name="content_inverted_idx",
search_text="bm25 search text",
limit=10,
#filter="kb_id='xxx'"
)
result = pipeline.bm25_search(
table=chunk_table,
search_request=search_request
)
对知识库进行混合检索
from pymochow.ai.embedder import QianfanEmbedder
from pymochow.ai.pipeline import DefaultPipeline
from pymochow.model.table import (
VectorTopkSearchRequest,
BM25SearchRequest,
HybridSearchRequest
)
os.environ["APPBUILDER_TOKEN"] = "your ab token"
# embedder 用于对问题进行向量化
embedder = QianfanEmbedder(batch=2)
pipeline = DefaultPipeline()
db_name = "DocumentInsight"
chunk_table_name = "KnowledgeBase_Chunk"
chunk_table = self.create_or_get_chunk_table(db_name=db_name, table_name=chunk_table_name)
search_contents = ["your question"] # 向量检索的内容
vector_search_request = VectorTopkSearchRequest(
vector_field="embedding",
limit=10,
config=VectorSearchConfig(ef=200)
)
bm25_search_request = BM25SearchRequest(
index_name="content_inverted_idx",
search_text="bm25 search text" # 全文检索的内容
)
search_request = HybridSearchRequest(
vector_request=vector_search_request,
vector_weight=0.4, # 向量检索比重
bm25_request=bm25_search_request,
bm25_weight=0.6, # 全文检索比重
#filter="kd_id='xxx'", 填入标量过滤条件
limit=15
)
result = pipeline.hybrid_search(
search_contents=search_contents,
embedder=embedder,
table=chunk_table,
search_request=search_request
)