Milvus离线迁移VectorDB解决方案
概述
本方案旨在帮助用户将 Milvus 中的数据离线全量迁移至 Baidu VectorDB 中。
前置准备
- 本方案支持 Milvus 2.3.0 及以上版本迁移至 Baidu VDB 2.1 及以上版本
- 需要 python 3.8.0 及以上环境
-
需要安装以下 python 库
- pymochow版本 >= 2.1
- pymilvus版本 >= 2.3.0
1pip install pymochow
2pip install pymilvus
3pip install pyaml
4pip install tqdm
迁移限制
导出脚本会自动完成集合的 schema 映射:
- 如果字段类型不支持迁移,则该集合导出失败
- 如果索引类型不支持迁移,则会跳过该索引,需要在迁移结束后手动创建索引
结构对象迁移
逻辑概念
Milvus 与 VDB 逻辑概念对比如下:
逻辑概念 | Mochow | Milvus |
---|---|---|
库 | Database | Database |
表/集合 | Table | Collection |
分片/分区 | Partition | Partition |
分段(一个分区内部进一步分段) | 不对外暴露 | Segment |
列族 | 不对外暴露 | 无 |
行/记录/对象/实体 | Row | Entity |
列/字段 | Field | Field |
索引 | Index | Index |
数据类型
Milvus 与 VDB 数据类型对比如下:
数据类型 | Mochow | Milvus |
---|---|---|
布尔类型 | BOOL | BOOL |
8/16/32/64 位整型 | INT8/16/32/64 | INT8/16/32/64 |
8/16/32/64 位无符号整型 | UINT8/16/32/64 | 无 |
单/双精度浮点类型 | FLOAT/DOUBLE | FLOAT/DOUBLE |
日期类型 | DATE | 无 |
日期时间类型 | DATETIME | 无 |
UUID | UUID | 无 |
字符串 | STRING | VARCHAR |
二进制 | BINARY | 无 |
各类文本类型 | TEXT/TEXT_GBK/TEXT_GB18030 | 无 |
JSON | JSON(即将上线) | JSON |
ARRAY | ARRAY | ARRAY |
单精度浮点向量(稠密) | FLOAT_VECTOR | FLOAT_VECTOR |
半精度浮点向量(稠密) | 无 | B/FLOAT16_VECTOR |
单精度稀疏浮点向量 | SPARSE_FLOAT_VECTOR(即将上线) | SPARSE_FLOAT_VECTOR |
二进制向量 | BINARY_VECTOR(即将上线) | BINARY_VECTOR |
空/未知类型 | 无 | NONE/UNKNOWN |
Array 支持的元素类型
数据类型 | Mochow | Milvus |
---|---|---|
布尔类型 | BOOL | BOOL |
8/16/32/64 位整型 | INT8/16/32/64 | INT8/16/32/64 |
8/16/32/64 位无符号整型 | UINT8/16/32/64 | 无 |
单/双精度浮点类型 | FLOAT/DOUBLE | FLOAT/DOUBLE |
日期类型 | DATE | 无 |
日期时间类型 | DATETIME | 无 |
UUID | UUID | 无 |
字符串 | STRING | VARCHAR |
二进制 | BINARY | 无 |
各类文本类型 | TEXT/TEXT_GBK/TEXT_GB18030 | 无 |
JSON | 无 | JSON |
ARRAY | 无 | ARRAY |
索引概念
标量索引概念对比如下:
标量索引概念 | Mochow | Milvus |
---|---|---|
主键索引 | 隐含 | 隐含 |
二级索引 | SECONDARY | STL_SORT TRIE |
倒排文本索引 | INVERTED | INVERTED |
过滤索引 | FILTERING | 无 |
向量索引概念对比如下:
向量索引概念 | Mochow | Milvus |
---|---|---|
HNSW | HNSW | HNSW |
HNSW 标量量化 | HNSW_SQ | 无 |
HNSW 乘积量化 | HNSW_PQ | 无 |
平坦索引(KNN) | FLAT | FLAT |
倒排结构索引 | PUCK PUCK_PQ | IVF_*系列 |
DiskANN | 无 | DISKANN |
稀疏向量索引 | SPARSE_OPTIMIZED_FLAT(即将上线) | SPARSE_*系列 |
二进制向量索引 | 无需指定索引,默认为 FLAT | BIN_*系列 |
自动索引 | 无 | AUTOINDEX |
GPU 索引 | 无 | GPU_*系列 |
全量迁移流程
Milvus 全量导出到本地
本地文件结构
准备好导出脚本export.py
及导出配置文件milvus2csv_config.yaml
,并创建输出目录,本文以data
为例。将导出脚本export.py
、导出配置文件milvus2csv_config.yaml
及输出目录data
存放至同一个目录下。目录层级如下:
1├── export.py
2├── milvus2csv_config.yaml
3└── data
导出脚本
导出脚本如下:
1import yaml
2import orjson
3import json
4import os
5import csv
6import logging
7import re
8import time
9from tqdm import tqdm
10from pymilvus import (
11 connections,
12 DataType,
13 Collection,
14 MilvusClient
15)
16from pymilvus.orm.index import (
17 Index
18)
19from pymochow.model.enum import (
20 FieldType as mochowFieldType,
21 IndexType as mochowIndexType,
22 MetricType as mochowMetricType,
23 ElementType as mochowElementType,
24)
25from pymochow.model.schema import (
26 Schema as mochowSchema,
27 Field as mochowField,
28 SecondaryIndex as mochowSecondaryIndex,
29 VectorIndex as mochowVectorIndex,
30 HNSWParams as mochowHNSWParams,
31)
32from pymochow.model.table import Partition as mochowPartition
33
34with open("./milvus2csv_config.yaml", "r", encoding='utf-8') as f:
35 config = yaml.safe_load(f)
36
37print("configuration:")
38print(config)
39
40milvus_config = config["milvus"]
41milvus_config["uri"] = f'http://{milvus_config["host"]}:{milvus_config["port"]}'
42export_config = config["export"]
43
44milvus_type_to_mochow_type = {
45 DataType.BOOL: mochowFieldType.BOOL,
46 DataType.INT8: mochowFieldType.INT8,
47 DataType.INT16: mochowFieldType.INT16,
48 DataType.INT32: mochowFieldType.INT32,
49 DataType.INT64: mochowFieldType.INT64,
50
51 DataType.FLOAT: mochowFieldType.FLOAT,
52 DataType.DOUBLE: mochowFieldType.DOUBLE,
53
54 DataType.STRING: mochowFieldType.STRING,
55 DataType.VARCHAR: mochowFieldType.STRING,
56
57 DataType.FLOAT_VECTOR: mochowFieldType.FLOAT_VECTOR,
58 DataType.ARRAY: mochowFieldType.ARRAY,
59
60 # not support now
61 # DataType.FLOAT16_VECTOR: mochowFieldType.FLOAT_VECTOR,
62 # DataType.BFLOAT16_VECTOR: mochowFieldType.FLOAT_VECTOR,
63 # DataType.JSON: mochowFieldType.JSON,
64 # DataType.SPARSE_FLOAT_VECTOR: mochowFieldType.SPARSE_FLOAT_VECTOR,
65 # DataType.BINARY_VECTOR: mochowFieldType.BINARY_VECTOR,
66 # DataType.UNKNOWN: "NULL",
67}
68
69milvus_array_element_type_to_mochow_array_element_type = {
70 DataType.BOOL: mochowElementType.BOOL,
71 DataType.INT8: mochowElementType.INT8,
72 DataType.INT16: mochowElementType.INT16,
73 DataType.INT32: mochowElementType.INT32,
74 DataType.INT64: mochowElementType.INT64,
75
76 DataType.FLOAT: mochowElementType.FLOAT,
77 DataType.DOUBLE: mochowElementType.DOUBLE,
78
79 DataType.VARCHAR: mochowElementType.STRING,
80}
81
82basic_data_type = [
83 DataType.BOOL,
84 DataType.INT8,
85 DataType.INT16,
86 DataType.INT32,
87 DataType.INT64,
88
89 DataType.FLOAT,
90 DataType.DOUBLE,
91
92 DataType.VARCHAR,
93 DataType.STRING,
94]
95
96scalar_index_supported = {
97 "STL_SORT", "Trie"
98}
99
100vector_index_supported = {
101 "HNSW": mochowIndexType.HNSW,
102 "FLAT": mochowIndexType.FLAT,
103}
104
105metirc_type_map = {
106 "L2": mochowMetricType.L2,
107 "IP": mochowMetricType.IP,
108 "COSINE": mochowMetricType.COSINE,
109}
110
111# check field
112
113def is_legal_field_name(field_name: str) -> bool:
114 pattern = r"(^[A-Za-z][A-Za-z0-9_]{0,254}$)"
115 return bool(re.match(pattern, field_name))
116
117def check_valid_field(field) -> dict:
118 if field.dtype not in milvus_type_to_mochow_type:
119 return {
120 "valid": False,
121 "error": f"field type not supported yet: {field.to_dict()}"
122 }
123 if field.dtype == DataType.ARRAY and field.element_type not in milvus_array_element_type_to_mochow_array_element_type:
124 return {
125 "valid": False,
126 "error": f"element type not supported yet: {field.to_dict()}"
127 }
128 if not is_legal_field_name(field.name):
129 return {
130 "valid": False,
131 "error": f"invalid field name, Only uppercase and lowercase letters, numbers and underscores (_) are supported. It must start with a letter and the length is limited to 1-255: {field.to_dict()}"
132 }
133 return {
134 "valid": True,
135 "error": ""
136 }
137
138def get_mochow_field_from_milvus_field(field, is_not_null: bool) -> mochowField:
139 field_type = milvus_type_to_mochow_type[field.dtype]
140 primary_key = False
141 partition_key = False
142 auto_increment = False
143 not_null = False
144 dimension = 0
145 if field.is_primary:
146 primary_key = True
147 auto_increment = field.auto_id
148 not_null = True
149 if field.is_partition_key:
150 partition_key = True
151 not_null = True
152 if field.dtype == DataType.FLOAT_VECTOR:
153 if "dim" not in field.params:
154 raise Exception(f"not found param 'dim' of FLOAT_VECTOR")
155 dimension = field.params["dim"]
156 not_null = True
157 if is_not_null:
158 not_null = True
159 element_type = None
160 max_capacity = None
161 if field.dtype == DataType.ARRAY:
162 element_type = milvus_array_element_type_to_mochow_array_element_type[field.element_type]
163 if "max_capacity" not in field.params:
164 raise Exception(f"not found param 'max_capacity' of array")
165 max_capacity = field.params["max_capacity"]
166 mochow_field = mochowField(
167 field_name=field.name,
168 field_type=field_type,
169 primary_key=primary_key,
170 partition_key=partition_key,
171 auto_increment=auto_increment,
172 not_null=not_null,
173 dimension=dimension,
174 element_type=element_type,
175 max_capacity=max_capacity,
176 )
177 return mochow_field
178
179def get_mochow_index_from_milvus_index(index: Index) -> mochowVectorIndex:
180 if 'index_type' not in index.params:
181 logger.warning(f"index type not supported yet: {index.to_dict()}")
182 return None
183 index_type = index.params['index_type']
184 if index_type in scalar_index_supported:
185 return mochowSecondaryIndex(
186 index_name=index.index_name,
187 field=index.field_name
188 )
189 if index_type in vector_index_supported:
190 mochow_index_type = vector_index_supported[index_type]
191 if 'metric_type' not in index.params:
192 raise Exception(f"not found param 'metric_type' of vector index: {index.to_dict()}")
193 metric_type = index.params['metric_type']
194 if metric_type not in metirc_type_map:
195 return None
196 mochow_metirc_type = metirc_type_map[metric_type]
197 params = None
198 if mochow_index_type == mochowIndexType.HNSW:
199 if 'M' not in index.params or 'efConstruction' not in index.params:
200 raise Exception(f"invalid HNSW param of vector index: {index.to_dict()}")
201 params = mochowHNSWParams(
202 m=int(index.params['M']),
203 efconstruction=int(index.params['efConstruction'])
204 )
205 return mochowVectorIndex(
206 index_name=index.index_name,
207 field=index.field_name,
208 index_type=mochow_index_type,
209 metric_type=mochow_metirc_type,
210 params=params
211 )
212 logger.warning(f"index type not supported yet: {index.to_dict()}")
213 return None
214
215def get_partition_info(collection_name: str) -> mochowPartition:
216 client = MilvusClient(**milvus_config)
217 collection_desc = client.describe_collection(collection_name=collection_name)
218 if 'num_partitions' not in collection_desc:
219 raise Exception(f"not found param 'num_partitions' of collection: {collection_desc}")
220 return mochowPartition(
221 partition_num=collection_desc['num_partitions'],
222 )
223
224def dump_collection_schema(collection_name: str, logger: logging.Logger, log_file: str) -> bool:
225 connections.connect(**milvus_config)
226 collection = Collection(collection_name)
227
228 mochow_indexes = []
229 mochow_index_field_set = set()
230 has_no_support_index_type = False
231 for index in collection.indexes:
232 mochow_index = get_mochow_index_from_milvus_index(index)
233 if mochow_index is None:
234 has_no_support_index_type = True
235 continue
236 mochow_index_field_set.add(mochow_index.field)
237 mochow_indexes.append(mochow_index)
238 if has_no_support_index_type:
239 print(f"found index_type which is not supported yet")
240
241 not_support_fields = {}
242 mochow_fields = []
243 has_invalid_field = False
244 for field in collection.schema.fields:
245 check_field = check_valid_field(field)
246 if not check_field["valid"]:
247 not_support_fields[field.name] = field
248 has_invalid_field = True
249 else:
250 mochow_fields.append(get_mochow_field_from_milvus_field(field, field.name in mochow_index_field_set))
251 if has_invalid_field:
252 for field in not_support_fields.values():
253 check_field = check_valid_field(field)
254 logger.error(check_field["error"])
255 return False
256
257
258 schema = mochowSchema(
259 fields=mochow_fields,
260 indexes=mochow_indexes,
261 )
262 partiton = get_partition_info(collection_name)
263 create_table_args = {
264 "database": milvus_config["db_name"],
265 "table": collection_name,
266 "replication": export_config["replication"],
267 "partition": partiton.to_dict(),
268 "schema": schema.to_dict(),
269 "enableDynamicField": False,
270 }
271 if collection.schema.description != "":
272 create_table_args["description"] = collection.schema.description
273
274 json_bytes = orjson.dumps(create_table_args)
275 output_file_name = os.path.join(export_config["output_path"], collection_name, "table_config.json")
276 with open(output_file_name, 'wb') as json_file:
277 json_file.write(json_bytes)
278 print(f"generate table config of {collection_name} succeed")
279 return True
280
281# convert data to str
282
283# def convert_to_binary(binary_data):
284# decimal_value = int.from_bytes(binary_data, byteorder='big')
285# binary_string = bin(decimal_value)[2:].zfill(len(binary_data) * 8)
286# return ','.join(list(binary_string))
287
288def basic_data_convert_to_str(data, dtype, delimeter):
289 if dtype == DataType.BOOL:
290 return "true" if data else "false"
291 elif dtype in [DataType.INT8, DataType.INT16,
292 DataType.INT32, DataType.INT64,
293 DataType.FLOAT, DataType.DOUBLE]:
294 return str(data)
295 elif dtype in [DataType.STRING, DataType.VARCHAR]:
296 return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"")
297
298 raise Exception(f"Unsupported Basic DataType {dtype}")
299
300def data_convert_array_to_str(data, element_type, delimeter="|"):
301 if element_type in milvus_array_element_type_to_mochow_array_element_type:
302 if element_type == DataType.VARCHAR or element_type == DataType.STRING:
303 tmp_data = [basic_data_convert_to_str(x, element_type, delimeter) for x in data ]
304 orjson_str = orjson.dumps(tmp_data).decode("utf-8")
305 return json.dumps(orjson_str)
306 return "[" + ", ".join(basic_data_convert_to_str(x, element_type, delimeter) for x in data ) + "]"
307
308 raise Exception(f"Unsupported ElementType {element_type}")
309
310def data_convert_to_str(data, dtype, delimeter):
311 if dtype in basic_data_type:
312 return basic_data_convert_to_str(data, dtype, delimeter)
313 elif dtype == DataType.FLOAT_VECTOR:
314 return data
315 elif dtype == DataType.ARRAY:
316 return data
317
318 raise Exception(f"Unsupported DataType {dtype}")
319
320def csv_write_rows(datum, fd, fields_types, delimiter="|"):
321 for data in datum:
322 for i in range(len(data)):
323 ftype = fields_types[i]["dtype"]
324 data[i] = data_convert_to_str(data[i], ftype, delimiter)
325 fd.writerow(data)
326
327
328def csv_write_header(headers: list, fd, delimiter="|"):
329 fd.writerow(headers)
330
331def dump_collection(collection_name: str, logger: logging.Logger):
332 results = []
333 file_cnt = 0
334 print("connecting to milvus...")
335 connections.connect(**milvus_config)
336
337 export_config = config["export"]
338 collection = Collection(collection_name)
339 collection.load()
340 tmp_path = os.path.join(export_config["output_path"], collection_name)
341 if not os.path.exists(tmp_path):
342 os.mkdir(tmp_path)
343
344 fields_types = []
345 headers = []
346 for schema in collection.schema.fields:
347 field_type = {"dtype": schema.dtype}
348 if schema.dtype == DataType.ARRAY:
349 field_type["element_type"] = schema.element_type
350 fields_types.append(field_type)
351 headers.append(schema.name)
352
353 total_num = collection.num_entities
354 collection.load()
355 query_iterator = collection.query_iterator(batch_size=1000, expr="", output_fields=headers)
356
357 def write_to_csv_file(col_names, data):
358 if len(results) == 0:
359 return
360 nonlocal file_cnt
361 assert(file_cnt <= 1e9)
362 output_file_name = os.path.join(export_config["output_path"], collection_name, f"{str(file_cnt).zfill(10)}.csv")
363 with open(output_file_name, "w", newline="", encoding='utf-8') as csv_file:
364 writer = csv.writer(csv_file, delimiter="|")
365 # write header
366 csv_write_header(col_names, writer)
367 # write data
368 csv_write_rows(data, writer, fields_types)
369 file_cnt += 1
370 results.clear()
371
372 with tqdm(total=total_num, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") as pbar:
373 while True:
374 res = query_iterator.next()
375 if len(res) == 0:
376 # close the iterator
377 query_iterator.close()
378 break
379 for row in res:
380 row_list = []
381 for i in range(len(headers)):
382 field = row[headers[i]]
383 if isinstance(field, list) and fields_types[i]["dtype"] == DataType.FLOAT_VECTOR:
384 row_list.append("[" + ", ".join(str(x) for x in field) + "]")
385 elif isinstance(field, list) and fields_types[i]["dtype"] == DataType.ARRAY:
386 row_list.append(data_convert_array_to_str(field, fields_types[i]["element_type"]))
387 elif isinstance(field, dict):
388 row_list.append(orjson.dumps(field))
389 else:
390 row_list.append(field)
391 results.append(row_list)
392 if len(results) >= export_config["max_line_in_file"]:
393 write_to_csv_file(headers, data=results)
394 pbar.update(1)
395
396 write_to_csv_file(headers, data=results)
397
398def check_db_collections():
399 connections.connect(**milvus_config)
400 for name in config["export"]["collections"]:
401 collection = Collection(name)
402
403if __name__ == "__main__":
404 check_db_collections()
405 for name in config["export"]["collections"]:
406 print(f"start export {name}")
407 logger = logging.getLogger(f"export: {name}")
408 dir = os.path.join(export_config["output_path"], name)
409 os.makedirs(dir, exist_ok=True)
410 log_file = os.path.join(export_config["output_path"], name, "warning.log")
411 file_handler = logging.FileHandler(log_file, mode='w')
412 file_handler.setLevel(logging.DEBUG)
413 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
414 file_handler.setFormatter(formatter)
415 logger.addHandler(file_handler)
416 logger.setLevel(logging.INFO)
417 start_time = time.time()
418 if dump_collection_schema(name, logger, log_file):
419 dump_collection(name, logger)
420 end_time = time.time()
421 elapsed_time_seconds = end_time - start_time
422 logger.info(f'time cost for export {name}: {elapsed_time_seconds:.4f} seconds')
423 print(f'time cost for export {name}: {elapsed_time_seconds:.4f} seconds')
424 else:
425 print(f"generate table config failed, please check {log_file}")
yaml 文件配置
milvus2csv_config.yaml
文件配置如下:
1milvus:
2 host: '<localhost>' # Milvus服务主机地址
3 port: 19530 # Milvus服务端口
4 user: '<user_name>' # 用户名
5 password: '<password>' # 密码
6 db_name: '<database_name>' # 数据库名
7 token: '<token_id>' # 访问token
8
9export:
10 collections:
11 - 'export_collection'
12 - 'test_collection'
13 # - 'hello_milvus'
14 # - 'car'
15 # - 'medium_articles_with_dynamic'
16 # 填写所有需要导出的collection名称
17 max_line_in_file: 40000 # 导出文件切分行数
18 output_path: './data' # 导出目标目录,本文以./data为例
19 replication: 3 # 单个表分区的副本数量(含主副本),取值范围为[1,10]
20 partitionType: "HASH" # 分区类型枚举,当前仅支持"HASH"。
需要根据源端 Milvus 集群信息修改 yaml 文件中的相关配置项。
运行与检查
运行 export.py,并检查输出结果。输出结果如下:
1.
2├── export.py
3├── milvus2csv_config.yaml
4└── data
5 ├── medium_articles_with_json
6 | ├── warning.log
7 │ ├── table_config.json
8 │ ├── 0000000000.csv
9 │ ├── 0000000001.csv
10 │ └── 0000000002.csv
11 └── test
12 ├── warning.log
13 ├── table_config.json
14 └── 0000000000.csv
其中 table_config.json 为由 Milvus 表转化而来的、符合 VDB 概念的表结构,用于在 VDB 中创建对应的表。其结构如下:
1{
2 "database": "benchmark_db",
3 "table": "benchmark_table",
4 "replication": 3,
5 "partition": {
6 "partitionType": "HASH",
7 "partitionNum": 1
8 },
9 "schema": {
10 "fields": [
11 {
12 "fieldName": "id",
13 "fieldType": "UINT64",
14 "primaryKey": true,
15 "partitionKey": true,
16 "autoIncrement": false,
17 "notNull": true
18 },
19 {
20 "fieldName": "vector",
21 "fieldType": "FLOAT_VECTOR",
22 "dimension": 3,
23 "notNull": true
24 }
25 ]
26 }
27}
0000000000.csv 则是表中一部分数据,以 csv 格式呈现。其结构如下:
1// csv的第一行为列名,列之间采用|分隔
2my_id|my_vector
310|[-0.65697396, -0.25675339, 0.21634601, -0.7329633, 0.655407]
411|[0.61959845, -0.105865225, -0.27274612, 0.31226006, -0.9997308]
512|[-0.68236375, 0.07309595, 0.8319045, -0.34065887, -0.5734473]
613|[-0.026351633, -0.20818894, -0.8079838, -0.53878766, -0.6403443]
714|[0.27372813, -0.26643738, -0.5988725, -0.66084, 0.3210081]
warning.log 中包含导出过程中告警日志,其内容如下:
12024-10-22 13:55:50,152 - export: export_collection - WARNING - index type not supported yet: {'collection': 'export_collection', 'field': 'name', 'index_name': 'name', 'index_param': {'index_type': 'INVERTED'}}
22024-10-22 13:55:50,152 - export: export_collection - WARNING - index type not supported yet: {'collection': 'export_collection', 'field': 'ivf_flat_embedding', 'index_name': 'ivf_flat_embedding', 'index_param': {'index_type': 'IVF_FLAT', 'metric_type': 'COSINE', 'params': {'nlist': 128}}}
本地数据全量导入 VDB
本地文件结构
准备好导入脚本import.py
、导入配置文件csv2vdb_config.yaml
及需要导入的数据data(即在导出步骤中得到的data
目录)。将导入脚本import.py
、导出配置文件csv2vdb_config``.yaml
及输出目录data
存放至同一个目录下。目录层级如下:
1├── import.py
2├── csv2vdb_config.yaml
3└── data
4 ├── medium_articles_with_json
5 │ ├── table_config.json
6 │ ├── 0000000000.csv
7 │ ├── 0000000001.csv
8 │ └── 0000000002.csv
9 └── test
10 ├── table_config.json
11 └── 0000000000.csv
导入脚本
导入脚本如下:
1import pymochow
2import yaml
3import orjson
4import json
5import glob
6import csv
7import os
8import time
9from pymochow.configuration import Configuration
10from pymochow.auth.bce_credentials import BceCredentials
11from pymochow.exception import ClientError, ServerError
12from pymochow.model.database import Database
13from pymochow.model.schema import (
14 Schema,
15 Field,
16 SecondaryIndex,
17 VectorIndex,
18 HNSWParams,
19 HNSWPQParams,
20 PUCKParams,
21 AutoBuildTiming
22)
23from pymochow.model.enum import FieldType, ElementType, IndexType, MetricType, PartitionType, ServerErrCode
24from pymochow.model.enum import TableState, IndexState
25from pymochow.model.table import (
26 Partition,
27 Row,
28)
29
30def get_fields(table_config) -> list[Field]:
31 # schema
32 table_schema = table_config['schema']
33 # fields in schema
34 fields = []
35 for field_schema in table_schema['fields']:
36 field_name = field_schema['fieldName']
37 field_type = FieldType(field_schema['fieldType'])
38 elment_type = None
39 max_capacity = None
40 if field_type == FieldType.ARRAY:
41 elment_type = ElementType(field_schema['elementType'])
42 max_capacity = field_schema['maxCapacity']
43
44 primary_key = False
45 if 'primaryKey' in field_schema:
46 primary_key = field_schema['primaryKey']
47
48 partition_key = False
49 if 'partitionKey' in field_schema:
50 partition_key = field_schema['partitionKey']
51
52 auto_increment = False
53 if 'autoIncrement' in field_schema:
54 auto_increment = field_schema['autoIncrement']
55
56 not_null = False
57 if 'notNull' in field_schema:
58 not_null = field_schema['notNull']
59
60 dimension = 0
61 if 'dimension' in field_schema:
62 dimension = field_schema['dimension']
63
64 field = Field(field_name = field_name,
65 field_type = field_type,
66 primary_key = primary_key,
67 partition_key = partition_key,
68 auto_increment = auto_increment,
69 not_null = not_null,
70 dimension = dimension,
71 element_type = elment_type,
72 max_capacity = max_capacity,
73 )
74 fields.append(field)
75 return fields
76
77def get_indexes(table_config) -> list:
78 # schema
79 table_schema = table_config['schema']
80 # indexes in schema
81 indexes = []
82 if 'indexes' in table_schema:
83 for index_schema in table_schema['indexes']:
84 field = index_schema['field']
85 index_name = index_schema['indexName']
86 index_type = IndexType(index_schema['indexType'])
87 if index_type == IndexType.SECONDARY_INDEX:
88 index = SecondaryIndex(index_name = index_name,
89 field = field)
90 indexes.append(index)
91 continue
92
93 metric_type = MetricType[index_schema['metricType']]
94
95 if index_type == IndexType.HNSW:
96 m = index_schema['params']['M']
97 ef_construction = index_schema['params']['efConstruction']
98 params = HNSWParams(m = m,
99 efconstruction = ef_construction)
100 index = VectorIndex(index_name = index_name,
101 index_type = index_type,
102 field = field,
103 metric_type = metric_type,
104 params = params
105 )
106 indexes.append(index)
107 elif index_type == IndexType.HNSWPQ:
108 m = index_schema['params']['M']
109 ef_construction = index_schema['params']['efConstruction']
110 sample_rate = index_schema['params']['sampleRate']
111 NSQ = index_schema['params']['NSQ']
112 params = HNSWPQParams(m = m,
113 efconstruction = ef_construction,
114 NSQ=NSQ,
115 samplerate = sample_rate)
116 index = VectorIndex(index_name = index_name,
117 index_type = index_type,
118 field = field,
119 metric_type = metric_type,
120 params = params
121 )
122 indexes.append(index)
123 elif index_type == IndexType.PUCK:
124 course_cluster_count = index_schema['params']['courceClusterCount']
125 fine_cluster_count = index_schema['params']['fineClusterCount']
126 params = PUCKParams(courceClusterCount = course_cluster_count,
127 fineClusterCount = fine_cluster_count)
128 index = VectorIndex(index_name = index_name,
129 index_type = index_type,
130 field = field,
131 metric_type = metric_type,
132 params = params
133 )
134 indexes.append(index)
135 elif index_type == IndexType.FLAT:
136 index = VectorIndex(index_name = index_name,
137 index_type = index_type,
138 field = field,
139 metric_type = metric_type)
140 indexes.append(index)
141 else:
142 raise Exception("not support index type")
143 return indexes
144
145def create_table(db, table_config) -> bool:
146 db_name = table_config['database']
147 table_name = table_config['table']
148 replication = table_config['replication']
149
150 # check if table already exist
151 table_list = db.list_table()
152 for table_item in table_list:
153 if table_item.table_name == table_name:
154 print("")
155 print("Table already exist. database: {}, table:{}".format(db_name, table_name))
156 print("")
157 return False
158
159 # partition
160 partition_type = table_config['partition']['partitionType']
161 partition_num = table_config['partition']['partitionNum']
162 partition = Partition(partition_num = partition_num,
163 partition_type = PartitionType[partition_type])
164
165 fields = get_fields(table_config)
166 indexes = get_indexes(table_config)
167
168 schema = Schema(fields = fields,
169 indexes = indexes)
170
171 db.create_table(
172 table_name,
173 replication,
174 partition,
175 schema
176 )
177
178 while True:
179 time.sleep(2)
180 table = db.describe_table(table_name)
181 if table.state == TableState.NORMAL:
182 break
183
184 print("")
185 print("Succeed to create table. table: {}".format(table.to_dict()))
186 print("")
187 return True
188
189if __name__ == "__main__":
190 with open('csv2vdb_config.yaml', 'r', encoding='utf-8') as config_file:
191 config = yaml.safe_load(config_file)
192
193 print("current config:" + str(config))
194 print('#'*60)
195 print("")
196
197 db_host = config['database']['host']
198 db_port = config['database']['port']
199 account = config['database']['account']
200 api_key = config['database']['api_key']
201 db_name = config['database']['db_name']
202 data_path = config['data_path']
203
204 # initialize client
205 endpoint=f"http://{db_host}:{db_port}"
206 config = Configuration(credentials=BceCredentials(account, api_key),
207 endpoint=endpoint)
208 client = pymochow.MochowClient(config)
209
210 # create database if not exist
211 db_list = client.list_databases()
212 exist_flag = False
213 for db_item in db_list:
214 if db_item.database_name == db_name:
215 exist_flag = True
216 break
217 if not exist_flag:
218 client.create_database(db_name)
219 db = client.database(db_name)
220
221 for table_name in os.listdir(data_path):
222 print('+'*60)
223 table_folder = os.path.join(data_path, table_name)
224 # create table
225 table_config_file = os.path.join(table_folder, 'table_config.json')
226 if not os.path.exists(table_config_file):
227 print(f"No table_config.json found for table: {table_name}. Skipping...")
228 print('+'*60)
229 print("")
230 continue
231 succeed_flag = False
232 with open(table_config_file, 'r', encoding='utf-8') as file:
233 data = file.read()
234 table_config = orjson.loads(data)
235 succeed_flag = create_table(db, table_config)
236
237 if not succeed_flag:
238 print('+'*60)
239 print("")
240 continue
241
242 # start inserting table
243 table = db.table(table_name)
244 print(f"Begin Process table: {table_name}")
245 fields = get_fields(table_config)
246 field_dict = {field.field_name: field for field in fields}
247 if os.path.isdir(table_folder):
248 cnt = 0
249 csv_files = glob.glob(os.path.join(table_folder, '*.csv'))
250 for csv_file in csv_files:
251 rows = []
252 with open(csv_file, 'r', encoding='utf-8') as file:
253 csv_reader = csv.reader(file, delimiter='|')
254 # 读取第一行作为表头
255 headers = next(csv_reader)
256 # print("header:", headers)
257 # 遍历数据行
258 for row in csv_reader:
259 for i in range(0, len(row)):
260 # skip string type field
261 if field_dict[headers[i]].field_type == FieldType.STRING:
262 continue
263 if field_dict[headers[i]].field_type == FieldType.ARRAY and field_dict[headers[i]].element_type == ElementType.STRING:
264 row[i] = json.loads(row[i])
265 row[i] = orjson.loads(row[i])
266 row_dict = dict(zip(headers, row))
267 rows.append(Row(**row_dict))
268 if len(rows) >= 1000:
269 table.upsert(rows=rows)
270 rows = []
271 if len(rows) > 0:
272 table.upsert(rows=rows)
273 rows = []
274
275 cnt += 1
276 print(f"Imported data from: {csv_file} | {cnt}/{len(csv_files)} file(s) Done")
277
278 table_stats = table.stats()
279 print(f"Finish importing table: {table_name}, stats: {table_stats}")
280 print('+'*60)
281 print("")
282
283 print(f"Finish importing all tables. Please consider rebuilding index before searching.")
284 print('#'*60)
285 print("")
286
287 client.close()
yaml 文件配置
csv2vdb_config.yaml
文件配置如下:
1database:
2 host: "192.16.XX.XX" # VDB实例的IP地址
3 port: 5287 # VDB实例端口号
4 account: "root" # VDB的账号
5 api_key: "" # VDB的api秘钥
6 db_name: "vector_database" # 导入目标数据库名
7
8data_path: "./data" # 导入数据源
需要根据目标端 VDB 集群信息修改 yaml 文件中的相关配置项。
运行与检查
运行 import.py,并检查输入结果。在 VDB 中检查数据是否成功导入。
使用mochow-cli 检查迁移后的数据表
1mochow-cli > list db
2+--------------------+
3| databases |
4+====================+
5| vector_database |
6+--------------------+
7
8mochow-cli > list tb -d vector_database
9+----------------------+
10| tables |
11+======================+
12| export_collection |
13+----------------------+
14| test_collection |
15+----------------------+
16
17mochow-cli > stats tb -d vector_database -t export_collection
18+-------------+---------------------+-------------------+
19| rowCount | memorySizeInByte | diskSizeInByte |
20+=============+=====================+===================+
21| 1000 | 242037 | 151525 |
22+-------------+---------------------+-------------------+
重建索引
在使用前,推荐用户重建向量索引以提升查询速度。相关操作可查阅 OpenAPI 或 SDK 相关文档。