操作示例
更新时间:2025-01-22
概要
向量数据库目前主要应用于 RAG 场景。在 RAG 的应用中,除了向量数据库,还需要文档管理、文档解析、分档分块、Embedding 服务(将内容向量化)、Rerank,以及 LLM 大模型等多个服务的相互协作,共同实现完整的 RAG 功能。基于这一需求,我们对各模块服务进行了封装,提供了对应的接口,方便用户自定义各模块,并与向量数据库高效结合,快速构建属于自己的 RAG 服务。具体的使用方式可以参考:AI Search使用样例
文档管理
功能介绍
文档管理中,SDK 提供了用于管理本地文件和百度对象存储的简单实现,用户也可以根据业务需求自定义相关功能。
接口
Python
1class DocumentHub(ABC):
2 """
3 DocumentHub: Abstract base class for the document hub that defines the basic operations.
4 Subclasses should implement methods for adding, removing, listing, and downloading documents.
5 """
6
7 @abstractmethod
8 def add(self, doc: Document) -> Document:
9 """
10 Add a document to the hub.
11
12 Parameters:
13 doc (Document): The document to be added.
14 """
15 pass
16
17 @abstractmethod
18 def remove(self, doc: Document):
19 """
20 Remove a document from the hub.
21
22 Parameters:
23 doc (Document): The document to be removed.
24 """
25 pass
26
27 @abstractmethod
28 def list(self) -> List[Document]:
29 """
30 List all documents available in the hub.
31
32 Returns:
33 List[Document]: A list of all available document objects.
34 """
35 pass
36
37 @abstractmethod
38 def load(self, doc: Document) -> Document:
39 """
40 Load a document from the hub.
41
42 Parameters:
43 doc (Document): The document to load.
44
45 Returns:
46 doc (Document): If the document in hub then load it.
47 """
48 pass
基于接口,自定义文件管理器
Python
1from pymochow.ai.dochub import DocumentHub, DocumentHubEnv
2class UserDefinedDocumentHub(DocumentHub):
3...
示例
管理本地文件
Python
1import logging
2import uuid
3from pymochow.ai.dochub import (
4 LocalDocumentHub,
5 DocumentHubEnv
6)
7from pymochow.model.document import Document
8
9env = DocumentHubEnv(root_path="local://your_root_path/")
10doc_hub = LocalDocumentHub(env=env)
11kb_id = str(uuid.uuid4())
12doc = Document(
13 kb_id=kb_id, # 知识库ID
14 doc_name="test/test.pdf", # 文档名,指期望文档存储在文档管理中的相对路径,举例:test/test.pdf
15 file_path="./test.pdf" # 文档的本地路径
16)
17
18# 将文件纳入到文档管理中,将文件./test.pdf复制到/root_path/test/test.pdf
19doc_hub.add(doc=doc)
20
21# 列出文档管理器中的所有文档
22docs = doc_hub.list()
23for doc in docs:
24 logger.debug("doc: {} in hub".format(doc.to_dict()))
25
26# 删除文档管理器中的文档
27doc_hub.remove(doc=doc)
管理对象存储中的文件
Python
1import logging
2import uuid
3from pymochow.ai.dochub import (
4 BosDocumentHub,
5 DocumentHubEnv
6)
7from pymochow.model.document import Document
8
9env = DocumentHubEnv(
10 endpoint="bj.bcebos.com",
11 ak="your_bos_ak",
12 sk="your_bos_sk",
13 root_path="bos://your_bucket/object_prefix",
14 local_cache_path="./tmp" # local file cache dir
15)
16doc_hub = BosDocumentHub(env=env)
17kb_id = str(uuid.uuid4())
18doc = Document(
19 kb_id=kb_id, # 知识库ID
20 doc_name="test/test.pdf", # 文档名,指期望文档存储在文档管理中的相对路径,举例:test/test.pdf
21 file_path="./test.pdf" # 文档的本地路径
22)
23
24# 将文件纳入到文档管理中
25doc = doc_hub.add(doc=doc)
26
27# 列出文档管理器中的所有文档
28docs = doc_hub.list()
29for doc in docs:
30 logger.debug("doc: {} in hub".format(doc.to_dict()))
31
32# 将文档加载到本地cache
33doc = Document(
34 kb_id=kb_id,
35 doc_name="test/test.pdf"
36)
37doc = doc_hub.load(doc)
38logger.debug("load doc: {} from hub".format(doc.to_dict()))
39
40# 删除文档管理器中的文档
41doc_hub.remove(doc=doc)
文档处理
功能介绍
文档处理包括文档解析和文档分块,SDK 提供了基于Langchain和Baidu Qianfan的简单实现, 用户也可以根据业务需求自定义相关功能。
接口
Python
1class DocProcessor(ABC):
2 """
3 DocProcessor: An abstract base class that defines the interface for parsing and splitting documents into chunks.
4
5 Subclasses must implement the `process_doc` method to parse and split the document based on specific criteria,
6 such as page length, overlap length, and the number of pages to take.
7 """
8 @abstractmethod
9 def process_doc(self, doc) -> List[DocumentChunk]:
10 """
11 Parse and split the document into chunks based on the provided parameters.
12
13 Parameters:
14 ----------
15 doc : Document
16 The document to be parsed and split into chunks.
17
18 Returns:
19 -------
20 List[DocumentChunk]
21 A list of DocumentChunk objects, where each chunk represents a part of the parsed document.
22 """
23 pass
基于接口,自定义文件解析器
Python
1from pymochow.ai.processor import DocProcessor
2class UserDefinedDocProcessor(DocProcessor):
3...
示例
基于Langchain的文档处理
Python
1import logging
2import uuid
3from pymochow.ai.dochub import (
4 LocalDocumentHub,
5 DocumentHubEnv
6)
7from pymochow.model.document import Document
8from pymochow.ai.processor import LangchainDocProcessor
9
10env = DocumentHubEnv(root_path="local://your_root_path/")
11doc_hub = LocalDocumentHub(env=env)
12kb_id = str(uuid.uuid4())
13doc = Document(
14 kb_id=kb_id, # 知识库ID
15 doc_name="test/test.pdf", # 文档名,指期望文档存储在文档管理中的相对路径,举例:test/test.pdf
16 file_path="./test.pdf" # 文档的本地路径
17)
18
19# 将文件纳入到文档管理中,将文件./test.pdf复制到/root_path/test/test.pdf
20doc_hub.add(doc=doc)
21
22doc_processor = LangchainDocProcessor(maximum_page_length=300, page_overlap_length=50)
23doc_chunks = doc_processor.process_doc(doc)
基于百度Qianfan的文档处理
Python
1import logging
2import uuid
3from pymochow.ai.dochub import (
4 LocalDocumentHub,
5 DocumentHubEnv
6)
7from pymochow.model.document import Document
8from pymochow.ai.processor import QianfanDocProcessor
9
10env = DocumentHubEnv(root_path="local://your_root_path/")
11doc_hub = LocalDocumentHub(env=env)
12kb_id = str(uuid.uuid4())
13doc = Document(
14 kb_id=kb_id, # 知识库ID
15 doc_name="test/test.pdf", # 文档名,指期望文档存储在文档管理中的相对路径,举例:test/test.pdf
16 file_path="./test.pdf" # 文档的本地路径
17)
18
19# 将文件纳入到文档管理中,将文件./test.pdf复制到/root_path/test/test.pdf
20doc_hub.add(doc=doc)
21
22os.environ["APPBUILDER_TOKEN"] = "your_ab_token" # 需要填入您AppBuilder的token,会调用AppBuilder的服务进行文档处理
23doc_processor = QianfanDocProcessor()
24chunks = doc_processor.process_doc(doc)
Embedder
SDK 提供了用于百度Qianfan的实现,用户也可以根据业务需求对接其他的Embedding服务。
接口
Python
1class Embedder(ABC):
2 """
3 Embedder: An abstract base class for generating embeddings for document chunks.
4 """
5
6 @abstractmethod
7 def embedding(self, chunks) -> List[DocumentChunk]:
8 """
9 Generate embeddings for specified fields in document chunks.
10
11 Parameters:
12 chunks (List[DocumentChunk]): A list of document chunks that need to be processed for embeddings.
13 Returns:
14 List[DocumentChunk]: A list of `DocumentChunk` objects,
15 with the embeddings added to the corresponding fields based on `field_mapping`.
16 """
17 pass
18
19 @abstractmethod
20 def embedding_text(self, texts) -> List[List[float]]:
21 """
22 Embeds a given text into a numerical representation and returns the result.
23
24 Args:
25 text (List[str]): The text to be embedded.
26
27 Returns:
28 List[List[float]]: The numerical embedding of the text as a list of floats.
29
30 Note:
31 The method includes a sleep call to throttle the embedding rate due to API rate limits.
32 """
基于接口,自定义Embedder
Python
1from pymochow.ai.embedder import Embedder
2class OpenAiEmbedder(Embedder):
3...
示例
基于百度Qianfan的Embedding
Python
1import logging
2import uuid
3from pymochow.ai.dochub import (
4 LocalDocumentHub,
5 DocumentHubEnv
6)
7from pymochow.model.document import Document
8from pymochow.ai.processor import QianfanDocProcessor
9from pymochow.ai.embedder import QianfanEmbedder
10
11env = DocumentHubEnv(root_path="local://your_root_path/")
12doc_hub = LocalDocumentHub(env=env)
13kb_id = str(uuid.uuid4())
14doc = Document(
15 kb_id=kb_id, # 知识库ID
16 doc_name="test/test.pdf", # 文档名,指期望文档存储在文档管理中的相对路径,举例:test/test.pdf
17 file_path="./test.pdf" # 文档的本地路径
18)
19
20# 将文件纳入到文档管理中,将文件./test.pdf复制到/root_path/test/test.pdf
21doc_hub.add(doc=doc)
22
23os.environ["APPBUILDER_TOKEN"] = "your_ab_token" # 需要填入您AppBuilder的token,会调用AppBuilder的服务进行文档处理和Embedding
24doc_processor = QianfanDocProcessor()
25chunks = doc_processor.process_doc(doc)
26
27embedder = QianfanEmbedder(batch=2)
28chunks = embedder.embedding(chunks)
处理管道
功能介绍
处理管道可以高效整合文档管理器、文档处理器、Embedder 和向量数据库。SDK 提供了默认实现,用户可以借助默认实现完成文档导入向量数据库、向量检索、全文检索以及混合检索等功能。此外,用户还可以根据具体业务需求对相关功能进行自定义扩展。
接口
Python
1class Pipeline(ABC):
2 """
3 Pipeline: An abstract base class that defines the interface for ingesting
4 doc into vectordb and search from it.
5 """
6 @abstractmethod
7 def ingest_doc(doc,
8 doc_processor=None,
9 embedder=None,
10 meta_table=None,
11 doc_to_row_mapping=None,
12 chunk_table=None,
13 chunk_to_row_mapping=None):
14 """
15 Abstract method for processing and storing the ingestion of documents and their chunks.
16
17 Parameters:
18 doc (Document): The document object to be ingested.
19 doc_processor (Processor, optional): A tool used to parse and split document.
20 embedder (Embedder, optional): A tool used to generate embeddings of the document content.
21 meta_table (Table, optional): The table in the database that stores document data.
22 doc_to_row_mapping (dict, optional): A JSON-like dictionary that defines
23 the mapping between document object attributes and database table columns.
24 Example mapping:
25 {
26 'doc_id': 'document_id', # Maps 'doc_id' in the document to 'document_id' in the database
27 'doc_name': 'document_name' # Maps 'doc_name' in the document to 'document_name' in the database
28 # Add more mappings as needed...
29 }
30 chunk_table (Table, optional): The table in the database used to store document chunks,
31 if the document is processed in chunks.
32 chunk_to_row_mapping (dict, optional): Similar to doc_to_row_mapping,
33 this dictionary defines how attributes of chunks map to database table columns.
34 Example:
35 {
36 'chunk_id': 'chunk_id', # Example mapping, no change
37 # Additional mappings can be added here...
38 }
39
40 Returns:
41 None: This method does not return anything but may modify data in the database or other storage systems.
42
43 Note:
44 This is an abstract method that must be implemented in any subclass inheriting
45 from Pipeline with specific logic for ingestion.
46 """
47 pass
48
49 def vector_search(
50 self,
51 search_contents: List[str],
52 embedder: Embedder,
53 table: Table,
54 search_request: VectorSearchRequest,
55 partition_key: Dict[str, Any] = None,
56 projections: List[str] = None,
57 read_consistency: ReadConsistency = ReadConsistency.EVENTUAL,
58 config: Dict[Any, Any] = None
59 ):
60 """
61 Perform a vector-based search operation.
62
63 This method converts the search content into a vector using the embedder and performs a
64 vector search in the specified table, returning the most similar results to the query.
65
66 Parameters:
67 ----------
68 search_contents : List[str]
69 The input search content, usually in the form of text or already embedded vector.
70 embedder : Embedder
71 The embedder object used to convert the search content into vector form.
72 table : Table
73 The target table where the search is conducted.
74 search_request : VectorSearchRequest
75 The search request object containing parameters like TopK, filters, etc.
76 partition_key : Dict[str, Any], optional
77 The partition key to narrow the search to a specific partition, default is None.
78 projections : List[str], optional
79 The list of fields to include in the search result, default is None.
80 read_consistency : ReadConsistency, optional
81 The level of read consistency required, default is EVENTUAL.
82 config : Dict[Any, Any], optional
83 Additional configurations for the search, default is None.
84 """
85 pass
86
87 def bm25_search(
88 self,
89 table: Table,
90 search_request: BM25SearchRequest,
91 partition_key: Dict[str, Any] = None,
92 projections: List[str] = None,
93 read_consistency: ReadConsistency = ReadConsistency.EVENTUAL,
94 config: Dict[Any, Any] = None
95 ):
96 """
97 Perform a BM25-based text search operation.
98
99 This method uses the BM25 search algorithm to perform a text-based search on the specified
100 table, returning the most relevant documents based on the query terms.
101
102 Parameters:
103 ----------
104 table : Table
105 The target table where the search will be performed.
106 search_request : BM25SearchRequest
107 The search request object, which contains query terms and other parameters.
108 partition_key : Dict[str, Any], optional
109 The partition key to narrow the search to a specific partition, default is None.
110 projections : List[str], optional
111 The list of fields to include in the search result, default is None.
112 read_consistency : ReadConsistency, optional
113 The level of read consistency required, default is EVENTUAL.
114 config : Dict[Any, Any], optional
115 Additional configurations for the search, default is None.
116 """
117 pass
118
119 def hybrid_search(
120 self,
121 search_contents: List[str],
122 embedder: Embedder,
123 table: Table,
124 search_request: HybridSearchRequest,
125 partition_key: Dict[str, Any] = None,
126 projections: List[str] = None,
127 read_consistency: ReadConsistency = ReadConsistency.EVENTUAL,
128 config: Dict[Any, Any] = None
129 ):
130 """
131 Perform a hybrid search (vector + traditional text search).
132
133 This method combines vector search with BM25 text search, suitable for scenarios
134 requiring both semantic and keyword-based search.
135
136 Parameters:
137 ----------
138 search_contents : List[str]
139 The input search content, usually text-based.
140 embedder : Embedder
141 The embedder object used to convert the search content into vectors.
142 table : Table
143 The target table where the search is conducted.
144 partition_key : Dict[str, Any], optional
145 The partition key to narrow the search to a specific partition, default is None.
146 projections : List[str], optional
147 The list of fields to include in the search result, default is None.
148 read_consistency : ReadConsistency, optional
149 The level of read consistency required, default is EVENTUAL.
150 config : Dict[Any, Any], optional
151 Additional configurations for the search, default is None.
152 """
153 pass