操作示例
更新时间:2025-01-22
概要
向量数据库目前主要应用于 RAG 场景。在 RAG 的应用中,除了向量数据库,还需要文档管理、文档解析、分档分块、Embedding 服务(将内容向量化)、Rerank,以及 LLM 大模型等多个服务的相互协作,共同实现完整的 RAG 功能。基于这一需求,我们对各模块服务进行了封装,提供了对应的接口,方便用户自定义各模块,并与向量数据库高效结合,快速构建属于自己的 RAG 服务。具体的使用方式可以参考:AI Search使用样例
文档管理
功能介绍
文档管理中,SDK 提供了用于管理本地文件和百度对象存储的简单实现,用户也可以根据业务需求自定义相关功能。
接口
class DocumentHub(ABC):
"""
DocumentHub: Abstract base class for the document hub that defines the basic operations.
Subclasses should implement methods for adding, removing, listing, and downloading documents.
"""
@abstractmethod
def add(self, doc: Document) -> Document:
"""
Add a document to the hub.
Parameters:
doc (Document): The document to be added.
"""
pass
@abstractmethod
def remove(self, doc: Document):
"""
Remove a document from the hub.
Parameters:
doc (Document): The document to be removed.
"""
pass
@abstractmethod
def list(self) -> List[Document]:
"""
List all documents available in the hub.
Returns:
List[Document]: A list of all available document objects.
"""
pass
@abstractmethod
def load(self, doc: Document) -> Document:
"""
Load a document from the hub.
Parameters:
doc (Document): The document to load.
Returns:
doc (Document): If the document in hub then load it.
"""
pass
基于接口,自定义文件管理器
from pymochow.ai.dochub import DocumentHub, DocumentHubEnv
class UserDefinedDocumentHub(DocumentHub):
...
示例
管理本地文件
import logging
import uuid
from pymochow.ai.dochub import (
LocalDocumentHub,
DocumentHubEnv
)
from pymochow.model.document import Document
env = DocumentHubEnv(root_path="local://your_root_path/")
doc_hub = LocalDocumentHub(env=env)
kb_id = str(uuid.uuid4())
doc = Document(
kb_id=kb_id, # 知识库ID
doc_name="test/test.pdf", # 文档名,指期望文档存储在文档管理中的相对路径,举例:test/test.pdf
file_path="./test.pdf" # 文档的本地路径
)
# 将文件纳入到文档管理中,将文件./test.pdf复制到/root_path/test/test.pdf
doc_hub.add(doc=doc)
# 列出文档管理器中的所有文档
docs = doc_hub.list()
for doc in docs:
logger.debug("doc: {} in hub".format(doc.to_dict()))
# 删除文档管理器中的文档
doc_hub.remove(doc=doc)
管理对象存储中的文件
import logging
import uuid
from pymochow.ai.dochub import (
BosDocumentHub,
DocumentHubEnv
)
from pymochow.model.document import Document
env = DocumentHubEnv(
endpoint="bj.bcebos.com",
ak="your_bos_ak",
sk="your_bos_sk",
root_path="bos://your_bucket/object_prefix",
local_cache_path="./tmp" # local file cache dir
)
doc_hub = BosDocumentHub(env=env)
kb_id = str(uuid.uuid4())
doc = Document(
kb_id=kb_id, # 知识库ID
doc_name="test/test.pdf", # 文档名,指期望文档存储在文档管理中的相对路径,举例:test/test.pdf
file_path="./test.pdf" # 文档的本地路径
)
# 将文件纳入到文档管理中
doc = doc_hub.add(doc=doc)
# 列出文档管理器中的所有文档
docs = doc_hub.list()
for doc in docs:
logger.debug("doc: {} in hub".format(doc.to_dict()))
# 将文档加载到本地cache
doc = Document(
kb_id=kb_id,
doc_name="test/test.pdf"
)
doc = doc_hub.load(doc)
logger.debug("load doc: {} from hub".format(doc.to_dict()))
# 删除文档管理器中的文档
doc_hub.remove(doc=doc)
文档处理
功能介绍
文档处理包括文档解析和文档分块,SDK 提供了基于Langchain和Baidu Qianfan的简单实现, 用户也可以根据业务需求自定义相关功能。
接口
class DocProcessor(ABC):
"""
DocProcessor: An abstract base class that defines the interface for parsing and splitting documents into chunks.
Subclasses must implement the `process_doc` method to parse and split the document based on specific criteria,
such as page length, overlap length, and the number of pages to take.
"""
@abstractmethod
def process_doc(self, doc) -> List[DocumentChunk]:
"""
Parse and split the document into chunks based on the provided parameters.
Parameters:
----------
doc : Document
The document to be parsed and split into chunks.
Returns:
-------
List[DocumentChunk]
A list of DocumentChunk objects, where each chunk represents a part of the parsed document.
"""
pass
基于接口,自定义文件解析器
from pymochow.ai.processor import DocProcessor
class UserDefinedDocProcessor(DocProcessor):
...
示例
基于Langchain的文档处理
import logging
import uuid
from pymochow.ai.dochub import (
LocalDocumentHub,
DocumentHubEnv
)
from pymochow.model.document import Document
from pymochow.ai.processor import LangchainDocProcessor
env = DocumentHubEnv(root_path="local://your_root_path/")
doc_hub = LocalDocumentHub(env=env)
kb_id = str(uuid.uuid4())
doc = Document(
kb_id=kb_id, # 知识库ID
doc_name="test/test.pdf", # 文档名,指期望文档存储在文档管理中的相对路径,举例:test/test.pdf
file_path="./test.pdf" # 文档的本地路径
)
# 将文件纳入到文档管理中,将文件./test.pdf复制到/root_path/test/test.pdf
doc_hub.add(doc=doc)
doc_processor = LangchainDocProcessor(maximum_page_length=300, page_overlap_length=50)
doc_chunks = doc_processor.process_doc(doc)
基于百度Qianfan的文档处理
import logging
import uuid
from pymochow.ai.dochub import (
LocalDocumentHub,
DocumentHubEnv
)
from pymochow.model.document import Document
from pymochow.ai.processor import QianfanDocProcessor
env = DocumentHubEnv(root_path="local://your_root_path/")
doc_hub = LocalDocumentHub(env=env)
kb_id = str(uuid.uuid4())
doc = Document(
kb_id=kb_id, # 知识库ID
doc_name="test/test.pdf", # 文档名,指期望文档存储在文档管理中的相对路径,举例:test/test.pdf
file_path="./test.pdf" # 文档的本地路径
)
# 将文件纳入到文档管理中,将文件./test.pdf复制到/root_path/test/test.pdf
doc_hub.add(doc=doc)
os.environ["APPBUILDER_TOKEN"] = "your_ab_token" # 需要填入您AppBuilder的token,会调用AppBuilder的服务进行文档处理
doc_processor = QianfanDocProcessor()
chunks = doc_processor.process_doc(doc)
Embedder
SDK 提供了用于百度Qianfan的实现,用户也可以根据业务需求对接其他的Embedding服务。
接口
class Embedder(ABC):
"""
Embedder: An abstract base class for generating embeddings for document chunks.
"""
@abstractmethod
def embedding(self, chunks) -> List[DocumentChunk]:
"""
Generate embeddings for specified fields in document chunks.
Parameters:
chunks (List[DocumentChunk]): A list of document chunks that need to be processed for embeddings.
Returns:
List[DocumentChunk]: A list of `DocumentChunk` objects,
with the embeddings added to the corresponding fields based on `field_mapping`.
"""
pass
@abstractmethod
def embedding_text(self, texts) -> List[List[float]]:
"""
Embeds a given text into a numerical representation and returns the result.
Args:
text (List[str]): The text to be embedded.
Returns:
List[List[float]]: The numerical embedding of the text as a list of floats.
Note:
The method includes a sleep call to throttle the embedding rate due to API rate limits.
"""
基于接口,自定义Embedder
from pymochow.ai.embedder import Embedder
class OpenAiEmbedder(Embedder):
...
示例
基于百度Qianfan的Embedding
import logging
import uuid
from pymochow.ai.dochub import (
LocalDocumentHub,
DocumentHubEnv
)
from pymochow.model.document import Document
from pymochow.ai.processor import QianfanDocProcessor
from pymochow.ai.embedder import QianfanEmbedder
env = DocumentHubEnv(root_path="local://your_root_path/")
doc_hub = LocalDocumentHub(env=env)
kb_id = str(uuid.uuid4())
doc = Document(
kb_id=kb_id, # 知识库ID
doc_name="test/test.pdf", # 文档名,指期望文档存储在文档管理中的相对路径,举例:test/test.pdf
file_path="./test.pdf" # 文档的本地路径
)
# 将文件纳入到文档管理中,将文件./test.pdf复制到/root_path/test/test.pdf
doc_hub.add(doc=doc)
os.environ["APPBUILDER_TOKEN"] = "your_ab_token" # 需要填入您AppBuilder的token,会调用AppBuilder的服务进行文档处理和Embedding
doc_processor = QianfanDocProcessor()
chunks = doc_processor.process_doc(doc)
embedder = QianfanEmbedder(batch=2)
chunks = embedder.embedding(chunks)
处理管道
功能介绍
处理管道可以高效整合文档管理器、文档处理器、Embedder 和向量数据库。SDK 提供了默认实现,用户可以借助默认实现完成文档导入向量数据库、向量检索、全文检索以及混合检索等功能。此外,用户还可以根据具体业务需求对相关功能进行自定义扩展。
接口
class Pipeline(ABC):
"""
Pipeline: An abstract base class that defines the interface for ingesting
doc into vectordb and search from it.
"""
@abstractmethod
def ingest_doc(doc,
doc_processor=None,
embedder=None,
meta_table=None,
doc_to_row_mapping=None,
chunk_table=None,
chunk_to_row_mapping=None):
"""
Abstract method for processing and storing the ingestion of documents and their chunks.
Parameters:
doc (Document): The document object to be ingested.
doc_processor (Processor, optional): A tool used to parse and split document.
embedder (Embedder, optional): A tool used to generate embeddings of the document content.
meta_table (Table, optional): The table in the database that stores document data.
doc_to_row_mapping (dict, optional): A JSON-like dictionary that defines
the mapping between document object attributes and database table columns.
Example mapping:
{
'doc_id': 'document_id', # Maps 'doc_id' in the document to 'document_id' in the database
'doc_name': 'document_name' # Maps 'doc_name' in the document to 'document_name' in the database
# Add more mappings as needed...
}
chunk_table (Table, optional): The table in the database used to store document chunks,
if the document is processed in chunks.
chunk_to_row_mapping (dict, optional): Similar to doc_to_row_mapping,
this dictionary defines how attributes of chunks map to database table columns.
Example:
{
'chunk_id': 'chunk_id', # Example mapping, no change
# Additional mappings can be added here...
}
Returns:
None: This method does not return anything but may modify data in the database or other storage systems.
Note:
This is an abstract method that must be implemented in any subclass inheriting
from Pipeline with specific logic for ingestion.
"""
pass
def vector_search(
self,
search_contents: List[str],
embedder: Embedder,
table: Table,
search_request: VectorSearchRequest,
partition_key: Dict[str, Any] = None,
projections: List[str] = None,
read_consistency: ReadConsistency = ReadConsistency.EVENTUAL,
config: Dict[Any, Any] = None
):
"""
Perform a vector-based search operation.
This method converts the search content into a vector using the embedder and performs a
vector search in the specified table, returning the most similar results to the query.
Parameters:
----------
search_contents : List[str]
The input search content, usually in the form of text or already embedded vector.
embedder : Embedder
The embedder object used to convert the search content into vector form.
table : Table
The target table where the search is conducted.
search_request : VectorSearchRequest
The search request object containing parameters like TopK, filters, etc.
partition_key : Dict[str, Any], optional
The partition key to narrow the search to a specific partition, default is None.
projections : List[str], optional
The list of fields to include in the search result, default is None.
read_consistency : ReadConsistency, optional
The level of read consistency required, default is EVENTUAL.
config : Dict[Any, Any], optional
Additional configurations for the search, default is None.
"""
pass
def bm25_search(
self,
table: Table,
search_request: BM25SearchRequest,
partition_key: Dict[str, Any] = None,
projections: List[str] = None,
read_consistency: ReadConsistency = ReadConsistency.EVENTUAL,
config: Dict[Any, Any] = None
):
"""
Perform a BM25-based text search operation.
This method uses the BM25 search algorithm to perform a text-based search on the specified
table, returning the most relevant documents based on the query terms.
Parameters:
----------
table : Table
The target table where the search will be performed.
search_request : BM25SearchRequest
The search request object, which contains query terms and other parameters.
partition_key : Dict[str, Any], optional
The partition key to narrow the search to a specific partition, default is None.
projections : List[str], optional
The list of fields to include in the search result, default is None.
read_consistency : ReadConsistency, optional
The level of read consistency required, default is EVENTUAL.
config : Dict[Any, Any], optional
Additional configurations for the search, default is None.
"""
pass
def hybrid_search(
self,
search_contents: List[str],
embedder: Embedder,
table: Table,
search_request: HybridSearchRequest,
partition_key: Dict[str, Any] = None,
projections: List[str] = None,
read_consistency: ReadConsistency = ReadConsistency.EVENTUAL,
config: Dict[Any, Any] = None
):
"""
Perform a hybrid search (vector + traditional text search).
This method combines vector search with BM25 text search, suitable for scenarios
requiring both semantic and keyword-based search.
Parameters:
----------
search_contents : List[str]
The input search content, usually text-based.
embedder : Embedder
The embedder object used to convert the search content into vectors.
table : Table
The target table where the search is conducted.
partition_key : Dict[str, Any], optional
The partition key to narrow the search to a specific partition, default is None.
projections : List[str], optional
The list of fields to include in the search result, default is None.
read_consistency : ReadConsistency, optional
The level of read consistency required, default is EVENTUAL.
config : Dict[Any, Any], optional
Additional configurations for the search, default is None.
"""
pass