Milvus离线迁移VectorDB解决方案
概述
本方案旨在帮助用户将 Milvus 中的数据离线全量迁移至 Baidu VectorDB 中。
前置准备
- 本方案支持 Milvus 2.3.0 及以上版本迁移至 Baidu VDB 2.1 及以上版本
- 需要 python 3.8.0 及以上环境
- 
需要安装以下 python 库 - pymochow版本 >= 2.1
- pymilvus版本 >= 2.3.0
 
1pip install pymochow
2pip install pymilvus
3pip install pyaml
4pip install tqdm迁移限制
导出脚本会自动完成集合的 schema 映射:
- 如果字段类型不支持迁移,则该集合导出失败
- 如果索引类型不支持迁移,则会跳过该索引,需要在迁移结束后手动创建索引
结构对象迁移
逻辑概念
Milvus 与 VDB 逻辑概念对比如下:
| 逻辑概念 | Mochow | Milvus | 
|---|---|---|
| 库 | Database | Database | 
| 表/集合 | Table | Collection | 
| 分片/分区 | Partition | Partition | 
| 分段(一个分区内部进一步分段) | 不对外暴露 | Segment | 
| 列族 | 不对外暴露 | 无 | 
| 行/记录/对象/实体 | Row | Entity | 
| 列/字段 | Field | Field | 
| 索引 | Index | Index | 
数据类型
Milvus 与 VDB 数据类型对比如下:
| 数据类型 | Mochow | Milvus | 
|---|---|---|
| 布尔类型 | BOOL | BOOL | 
| 8/16/32/64 位整型 | INT8/16/32/64 | INT8/16/32/64 | 
| 8/16/32/64 位无符号整型 | UINT8/16/32/64 | 无 | 
| 单/双精度浮点类型 | FLOAT/DOUBLE | FLOAT/DOUBLE | 
| 日期类型 | DATE | 无 | 
| 日期时间类型 | DATETIME | 无 | 
| UUID | UUID | 无 | 
| 字符串 | STRING | VARCHAR | 
| 二进制 | BINARY | 无 | 
| 各类文本类型 | TEXT/TEXT_GBK/TEXT_GB18030 | 无 | 
| JSON | JSON(即将上线) | JSON | 
| ARRAY | ARRAY | ARRAY | 
| 单精度浮点向量(稠密) | FLOAT_VECTOR | FLOAT_VECTOR | 
| 半精度浮点向量(稠密) | 无 | B/FLOAT16_VECTOR | 
| 单精度稀疏浮点向量 | SPARSE_FLOAT_VECTOR(即将上线) | SPARSE_FLOAT_VECTOR | 
| 二进制向量 | BINARY_VECTOR(即将上线) | BINARY_VECTOR | 
| 空/未知类型 | 无 | NONE/UNKNOWN | 
Array 支持的元素类型
| 数据类型 | Mochow | Milvus | 
|---|---|---|
| 布尔类型 | BOOL | BOOL | 
| 8/16/32/64 位整型 | INT8/16/32/64 | INT8/16/32/64 | 
| 8/16/32/64 位无符号整型 | UINT8/16/32/64 | 无 | 
| 单/双精度浮点类型 | FLOAT/DOUBLE | FLOAT/DOUBLE | 
| 日期类型 | DATE | 无 | 
| 日期时间类型 | DATETIME | 无 | 
| UUID | UUID | 无 | 
| 字符串 | STRING | VARCHAR | 
| 二进制 | BINARY | 无 | 
| 各类文本类型 | TEXT/TEXT_GBK/TEXT_GB18030 | 无 | 
| JSON | 无 | JSON | 
| ARRAY | 无 | ARRAY | 
索引概念
标量索引概念对比如下:
| 标量索引概念 | Mochow | Milvus | 
|---|---|---|
| 主键索引 | 隐含 | 隐含 | 
| 二级索引 | SECONDARY | STL_SORT TRIE | 
| 倒排文本索引 | INVERTED | INVERTED | 
| 过滤索引 | FILTERING | 无 | 
向量索引概念对比如下:
| 向量索引概念 | Mochow | Milvus | 
|---|---|---|
| HNSW | HNSW | HNSW | 
| HNSW 标量量化 | HNSW_SQ | 无 | 
| HNSW 乘积量化 | HNSW_PQ | 无 | 
| 平坦索引(KNN) | FLAT | FLAT | 
| 倒排结构索引 | PUCK PUCK_PQ | IVF_*系列 | 
| DiskANN | 无 | DISKANN | 
| 稀疏向量索引 | SPARSE_OPTIMIZED_FLAT(即将上线) | SPARSE_*系列 | 
| 二进制向量索引 | 无需指定索引,默认为 FLAT | BIN_*系列 | 
| 自动索引 | 无 | AUTOINDEX | 
| GPU 索引 | 无 | GPU_*系列 | 
全量迁移流程
Milvus 全量导出到本地
本地文件结构
准备好导出脚本export.py及导出配置文件milvus2csv_config.yaml,并创建输出目录,本文以data为例。将导出脚本export.py、导出配置文件milvus2csv_config.yaml及输出目录data存放至同一个目录下。目录层级如下:
1├── export.py
2├── milvus2csv_config.yaml
3└── data导出脚本
导出脚本如下:
1import yaml
2import orjson
3import json
4import os
5import csv
6import logging
7import re
8import time
9from tqdm import tqdm
10from pymilvus import (
11    connections,
12    DataType,
13    Collection,
14    MilvusClient
15)
16from pymilvus.orm.index import (
17    Index
18)
19from pymochow.model.enum import (
20    FieldType as mochowFieldType,
21    IndexType as mochowIndexType,
22    MetricType as mochowMetricType,
23    ElementType as mochowElementType,
24)
25from pymochow.model.schema import (
26    Schema as mochowSchema, 
27    Field as mochowField, 
28    SecondaryIndex as mochowSecondaryIndex, 
29    VectorIndex as mochowVectorIndex, 
30    HNSWParams as mochowHNSWParams,
31)
32from pymochow.model.table import Partition as mochowPartition
33
34with open("./milvus2csv_config.yaml", "r", encoding='utf-8') as f:
35    config = yaml.safe_load(f)
36
37print("configuration:")
38print(config)
39
40milvus_config = config["milvus"]
41milvus_config["uri"] = f'http://{milvus_config["host"]}:{milvus_config["port"]}'
42export_config = config["export"]
43
44milvus_type_to_mochow_type = {
45    DataType.BOOL: mochowFieldType.BOOL,
46    DataType.INT8: mochowFieldType.INT8,
47    DataType.INT16: mochowFieldType.INT16,
48    DataType.INT32: mochowFieldType.INT32,
49    DataType.INT64: mochowFieldType.INT64,
50
51    DataType.FLOAT: mochowFieldType.FLOAT,
52    DataType.DOUBLE: mochowFieldType.DOUBLE,
53
54    DataType.STRING: mochowFieldType.STRING,
55    DataType.VARCHAR: mochowFieldType.STRING,
56
57    DataType.FLOAT_VECTOR: mochowFieldType.FLOAT_VECTOR,
58    DataType.ARRAY: mochowFieldType.ARRAY,
59    
60    # not support now
61    # DataType.FLOAT16_VECTOR: mochowFieldType.FLOAT_VECTOR,
62    # DataType.BFLOAT16_VECTOR: mochowFieldType.FLOAT_VECTOR,
63    # DataType.JSON: mochowFieldType.JSON,
64    # DataType.SPARSE_FLOAT_VECTOR: mochowFieldType.SPARSE_FLOAT_VECTOR,
65    # DataType.BINARY_VECTOR: mochowFieldType.BINARY_VECTOR,
66    # DataType.UNKNOWN: "NULL",
67}
68
69milvus_array_element_type_to_mochow_array_element_type = {
70    DataType.BOOL: mochowElementType.BOOL,
71    DataType.INT8: mochowElementType.INT8,
72    DataType.INT16: mochowElementType.INT16,
73    DataType.INT32: mochowElementType.INT32,
74    DataType.INT64: mochowElementType.INT64,
75
76    DataType.FLOAT: mochowElementType.FLOAT,
77    DataType.DOUBLE: mochowElementType.DOUBLE,
78
79    DataType.VARCHAR: mochowElementType.STRING,
80}
81
82basic_data_type = [
83    DataType.BOOL,
84    DataType.INT8,
85    DataType.INT16,
86    DataType.INT32,
87    DataType.INT64,
88
89    DataType.FLOAT,
90    DataType.DOUBLE,
91
92    DataType.VARCHAR,
93    DataType.STRING,
94]
95
96scalar_index_supported = {
97    "STL_SORT", "Trie"
98}
99
100vector_index_supported = {
101    "HNSW": mochowIndexType.HNSW,
102    "FLAT": mochowIndexType.FLAT,
103}
104
105metirc_type_map = {
106    "L2": mochowMetricType.L2,
107    "IP": mochowMetricType.IP,
108    "COSINE": mochowMetricType.COSINE,
109}
110    
111# check field
112
113def is_legal_field_name(field_name: str) -> bool:
114    pattern = r"(^[A-Za-z][A-Za-z0-9_]{0,254}$)"
115    return bool(re.match(pattern, field_name))
116
117def check_valid_field(field) -> dict:
118    if field.dtype not in milvus_type_to_mochow_type:
119        return {
120            "valid": False,
121            "error": f"field type not supported yet: {field.to_dict()}"
122        }
123    if field.dtype == DataType.ARRAY and field.element_type not in milvus_array_element_type_to_mochow_array_element_type:
124        return {
125            "valid": False,
126            "error": f"element type not supported yet: {field.to_dict()}"
127        }
128    if not is_legal_field_name(field.name):
129        return {
130            "valid": False,
131            "error": f"invalid field name, Only uppercase and lowercase letters, numbers and underscores (_) are supported. It must start with a letter and the length is limited to 1-255: {field.to_dict()}"
132        }
133    return {
134        "valid": True,
135        "error": ""
136    }
137
138def get_mochow_field_from_milvus_field(field, is_not_null: bool) -> mochowField:
139    field_type = milvus_type_to_mochow_type[field.dtype]
140    primary_key = False
141    partition_key = False
142    auto_increment = False
143    not_null = False
144    dimension = 0
145    if field.is_primary:
146        primary_key = True
147        auto_increment = field.auto_id
148        not_null = True
149    if field.is_partition_key:
150        partition_key = True
151        not_null = True
152    if field.dtype == DataType.FLOAT_VECTOR:
153        if "dim" not in field.params:
154            raise Exception(f"not found param 'dim' of FLOAT_VECTOR")
155        dimension = field.params["dim"]
156        not_null = True
157    if is_not_null:
158        not_null = True
159    element_type = None
160    max_capacity = None
161    if field.dtype == DataType.ARRAY:
162        element_type = milvus_array_element_type_to_mochow_array_element_type[field.element_type]
163        if "max_capacity" not in field.params:
164            raise Exception(f"not found param 'max_capacity' of array")
165        max_capacity = field.params["max_capacity"]
166    mochow_field = mochowField(
167        field_name=field.name,
168        field_type=field_type,
169        primary_key=primary_key,
170        partition_key=partition_key,
171        auto_increment=auto_increment,
172        not_null=not_null,
173        dimension=dimension,
174        element_type=element_type,
175        max_capacity=max_capacity,
176    )
177    return mochow_field
178
179def get_mochow_index_from_milvus_index(index: Index) -> mochowVectorIndex:
180    if 'index_type' not in index.params:
181        logger.warning(f"index type not supported yet: {index.to_dict()}")
182        return None
183    index_type = index.params['index_type']
184    if index_type in scalar_index_supported:
185        return mochowSecondaryIndex(
186            index_name=index.index_name,
187            field=index.field_name
188        )
189    if index_type in vector_index_supported:
190        mochow_index_type = vector_index_supported[index_type]
191        if 'metric_type' not in index.params:
192            raise Exception(f"not found param 'metric_type' of vector index: {index.to_dict()}")
193        metric_type = index.params['metric_type']
194        if metric_type not in metirc_type_map:
195            return None
196        mochow_metirc_type = metirc_type_map[metric_type]
197        params = None
198        if mochow_index_type == mochowIndexType.HNSW:
199            if 'M' not in index.params or 'efConstruction' not in index.params:
200                raise Exception(f"invalid HNSW param of vector index: {index.to_dict()}")
201            params = mochowHNSWParams(
202                m=int(index.params['M']),
203                efconstruction=int(index.params['efConstruction'])
204                )
205        return mochowVectorIndex(
206            index_name=index.index_name,
207            field=index.field_name,
208            index_type=mochow_index_type,
209            metric_type=mochow_metirc_type,
210            params=params
211        )
212    logger.warning(f"index type not supported yet: {index.to_dict()}")
213    return None
214
215def get_partition_info(collection_name: str) -> mochowPartition:
216    client = MilvusClient(**milvus_config)
217    collection_desc = client.describe_collection(collection_name=collection_name)
218    if 'num_partitions' not in collection_desc:
219        raise Exception(f"not found param 'num_partitions' of collection: {collection_desc}")
220    return mochowPartition(
221        partition_num=collection_desc['num_partitions'],
222    )
223
224def dump_collection_schema(collection_name: str, logger: logging.Logger, log_file: str) -> bool:
225    connections.connect(**milvus_config)
226    collection = Collection(collection_name)
227
228    mochow_indexes = []
229    mochow_index_field_set = set()
230    has_no_support_index_type = False
231    for index in collection.indexes:
232        mochow_index = get_mochow_index_from_milvus_index(index)
233        if mochow_index is None:
234            has_no_support_index_type = True
235            continue
236        mochow_index_field_set.add(mochow_index.field)
237        mochow_indexes.append(mochow_index)
238    if has_no_support_index_type:
239        print(f"found index_type which is not supported yet")
240
241    not_support_fields = {}
242    mochow_fields = []
243    has_invalid_field = False
244    for field in collection.schema.fields:
245        check_field = check_valid_field(field)
246        if  not check_field["valid"]:
247            not_support_fields[field.name] = field
248            has_invalid_field = True
249        else:
250            mochow_fields.append(get_mochow_field_from_milvus_field(field, field.name in mochow_index_field_set))
251    if has_invalid_field:
252        for field in not_support_fields.values():
253            check_field = check_valid_field(field)
254            logger.error(check_field["error"])
255        return False
256    
257
258    schema = mochowSchema(
259        fields=mochow_fields,
260        indexes=mochow_indexes,
261    )
262    partiton = get_partition_info(collection_name)
263    create_table_args = {
264        "database": milvus_config["db_name"],
265        "table": collection_name,
266        "replication": export_config["replication"],
267        "partition": partiton.to_dict(),
268        "schema": schema.to_dict(),
269        "enableDynamicField": False,
270    }
271    if collection.schema.description != "":
272        create_table_args["description"] = collection.schema.description
273        
274    json_bytes = orjson.dumps(create_table_args)
275    output_file_name = os.path.join(export_config["output_path"], collection_name, "table_config.json")
276    with open(output_file_name, 'wb') as json_file:  
277        json_file.write(json_bytes)
278    print(f"generate table config of {collection_name} succeed")
279    return True
280
281# convert data to str
282
283# def convert_to_binary(binary_data):
284#     decimal_value = int.from_bytes(binary_data, byteorder='big')
285#     binary_string = bin(decimal_value)[2:].zfill(len(binary_data) * 8)
286#     return ','.join(list(binary_string))
287
288def basic_data_convert_to_str(data, dtype, delimeter):
289    if dtype == DataType.BOOL:
290        return "true" if data else "false"
291    elif dtype in [DataType.INT8, DataType.INT16,
292                   DataType.INT32, DataType.INT64,
293                   DataType.FLOAT, DataType.DOUBLE]:
294        return str(data)
295    elif dtype in [DataType.STRING, DataType.VARCHAR]:
296        return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"")
297    
298    raise Exception(f"Unsupported Basic DataType {dtype}")
299
300def data_convert_array_to_str(data, element_type, delimeter="|"):
301    if element_type in milvus_array_element_type_to_mochow_array_element_type:
302        if element_type == DataType.VARCHAR or element_type == DataType.STRING:
303            tmp_data = [basic_data_convert_to_str(x, element_type, delimeter) for x in data ]
304            orjson_str = orjson.dumps(tmp_data).decode("utf-8")
305            return json.dumps(orjson_str)
306        return "[" + ", ".join(basic_data_convert_to_str(x, element_type, delimeter) for x in data ) + "]"
307    
308    raise Exception(f"Unsupported ElementType {element_type}")
309
310def data_convert_to_str(data, dtype, delimeter):
311    if dtype in basic_data_type:
312        return basic_data_convert_to_str(data, dtype, delimeter)
313    elif dtype == DataType.FLOAT_VECTOR:
314        return data
315    elif dtype == DataType.ARRAY:
316        return data
317
318    raise Exception(f"Unsupported DataType {dtype}")
319
320def csv_write_rows(datum, fd, fields_types, delimiter="|"):
321    for data in datum:
322        for i in range(len(data)):
323            ftype = fields_types[i]["dtype"]
324            data[i] = data_convert_to_str(data[i], ftype, delimiter)
325        fd.writerow(data)
326
327
328def csv_write_header(headers: list, fd, delimiter="|"):
329    fd.writerow(headers)
330
331def dump_collection(collection_name: str, logger: logging.Logger):
332    results = []
333    file_cnt = 0
334    print("connecting to milvus...")
335    connections.connect(**milvus_config)
336
337    export_config = config["export"]
338    collection = Collection(collection_name)
339    collection.load()
340    tmp_path = os.path.join(export_config["output_path"], collection_name)
341    if not os.path.exists(tmp_path):
342        os.mkdir(tmp_path)
343
344    fields_types = []
345    headers = []
346    for schema in collection.schema.fields:
347        field_type = {"dtype": schema.dtype}
348        if schema.dtype == DataType.ARRAY:
349            field_type["element_type"] = schema.element_type
350        fields_types.append(field_type)
351        headers.append(schema.name)
352
353    total_num = collection.num_entities
354    collection.load()
355    query_iterator = collection.query_iterator(batch_size=1000, expr="", output_fields=headers)
356
357    def write_to_csv_file(col_names, data):
358        if len(results) == 0:
359            return
360        nonlocal file_cnt
361        assert(file_cnt <= 1e9)
362        output_file_name = os.path.join(export_config["output_path"], collection_name, f"{str(file_cnt).zfill(10)}.csv")
363        with open(output_file_name, "w", newline="", encoding='utf-8') as csv_file:
364            writer = csv.writer(csv_file, delimiter="|")
365            # write header
366            csv_write_header(col_names, writer)
367            # write data
368            csv_write_rows(data, writer, fields_types)
369            file_cnt += 1
370            results.clear()
371
372    with tqdm(total=total_num, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") as pbar:
373        while True:
374            res = query_iterator.next()
375            if len(res) == 0:
376                # close the iterator
377                query_iterator.close()
378                break
379            for row in res:
380                row_list = []
381                for i in range(len(headers)):
382                    field = row[headers[i]]
383                    if isinstance(field, list) and fields_types[i]["dtype"] == DataType.FLOAT_VECTOR:
384                        row_list.append("[" + ", ".join(str(x) for x in field) + "]")
385                    elif isinstance(field, list) and fields_types[i]["dtype"] == DataType.ARRAY:
386                        row_list.append(data_convert_array_to_str(field, fields_types[i]["element_type"]))
387                    elif isinstance(field, dict):
388                        row_list.append(orjson.dumps(field))
389                    else:
390                        row_list.append(field)
391                results.append(row_list)
392                if len(results) >= export_config["max_line_in_file"]:
393                    write_to_csv_file(headers, data=results)
394                pbar.update(1)
395
396    write_to_csv_file(headers, data=results)
397
398def check_db_collections():
399    connections.connect(**milvus_config)
400    for name in config["export"]["collections"]:
401        collection = Collection(name)
402
403if __name__ == "__main__":
404    check_db_collections()
405    for name in config["export"]["collections"]:
406        print(f"start export {name}")
407        logger = logging.getLogger(f"export: {name}")
408        dir = os.path.join(export_config["output_path"], name)
409        os.makedirs(dir, exist_ok=True)
410        log_file = os.path.join(export_config["output_path"], name, "warning.log")
411        file_handler = logging.FileHandler(log_file, mode='w')
412        file_handler.setLevel(logging.DEBUG)
413        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')  
414        file_handler.setFormatter(formatter)
415        logger.addHandler(file_handler)
416        logger.setLevel(logging.INFO)
417        start_time = time.time()
418        if dump_collection_schema(name, logger, log_file):
419            dump_collection(name, logger)
420            end_time = time.time()
421            elapsed_time_seconds = end_time - start_time
422            logger.info(f'time cost for export {name}: {elapsed_time_seconds:.4f} seconds')
423            print(f'time cost for export {name}: {elapsed_time_seconds:.4f} seconds')
424        else:
425            print(f"generate table config failed, please check {log_file}")yaml 文件配置
milvus2csv_config.yaml文件配置如下:
1milvus:
2  host: '<localhost>'        # Milvus服务主机地址
3  port: 19530                # Milvus服务端口
4  user: '<user_name>'        # 用户名
5  password: '<password>'     # 密码
6  db_name: '<database_name>' # 数据库名
7  token: '<token_id>'        # 访问token
8
9export:
10  collections:
11    - 'export_collection'
12    - 'test_collection'
13    # - 'hello_milvus'
14    # - 'car'
15    # - 'medium_articles_with_dynamic'
16    # 填写所有需要导出的collection名称
17  max_line_in_file: 40000     # 导出文件切分行数
18  output_path: './data'     # 导出目标目录,本文以./data为例
19  replication: 3              # 单个表分区的副本数量(含主副本),取值范围为[1,10]
20  partitionType: "HASH"       # 分区类型枚举,当前仅支持"HASH"。需要根据源端 Milvus 集群信息修改 yaml 文件中的相关配置项。
运行与检查
运行 export.py,并检查输出结果。输出结果如下:
1.
2├── export.py
3├── milvus2csv_config.yaml
4└── data
5    ├── medium_articles_with_json
6    |   ├── warning.log
7    │   ├── table_config.json
8    │   ├── 0000000000.csv
9    │   ├── 0000000001.csv
10    │   └── 0000000002.csv
11    └── test
12        ├── warning.log
13        ├── table_config.json
14        └── 0000000000.csv其中 table_config.json 为由 Milvus 表转化而来的、符合 VDB 概念的表结构,用于在 VDB 中创建对应的表。其结构如下:
1{
2    "database": "benchmark_db",
3    "table": "benchmark_table",
4    "replication": 3,
5    "partition": {
6        "partitionType": "HASH",
7        "partitionNum": 1
8    },
9    "schema": {
10        "fields": [
11            {
12                "fieldName": "id",
13                "fieldType": "UINT64",
14                "primaryKey": true,
15                "partitionKey": true,
16                "autoIncrement": false,
17                "notNull": true
18            },
19            {
20                "fieldName": "vector",
21                "fieldType": "FLOAT_VECTOR",
22                "dimension": 3,           
23                "notNull": true
24            }
25        ]
26    }
27}0000000000.csv 则是表中一部分数据,以 csv 格式呈现。其结构如下:
1// csv的第一行为列名,列之间采用|分隔
2my_id|my_vector
310|[-0.65697396, -0.25675339, 0.21634601, -0.7329633, 0.655407]
411|[0.61959845, -0.105865225, -0.27274612, 0.31226006, -0.9997308]
512|[-0.68236375, 0.07309595, 0.8319045, -0.34065887, -0.5734473]
613|[-0.026351633, -0.20818894, -0.8079838, -0.53878766, -0.6403443]
714|[0.27372813, -0.26643738, -0.5988725, -0.66084, 0.3210081]warning.log 中包含导出过程中告警日志,其内容如下:
12024-10-22 13:55:50,152 - export: export_collection - WARNING - index type not supported yet: {'collection': 'export_collection', 'field': 'name', 'index_name': 'name', 'index_param': {'index_type': 'INVERTED'}}
22024-10-22 13:55:50,152 - export: export_collection - WARNING - index type not supported yet: {'collection': 'export_collection', 'field': 'ivf_flat_embedding', 'index_name': 'ivf_flat_embedding', 'index_param': {'index_type': 'IVF_FLAT', 'metric_type': 'COSINE', 'params': {'nlist': 128}}}本地数据全量导入 VDB
本地文件结构
准备好导入脚本import.py、导入配置文件csv2vdb_config.yaml及需要导入的数据data(即在导出步骤中得到的data目录)。将导入脚本import.py、导出配置文件csv2vdb_config``.yaml及输出目录data存放至同一个目录下。目录层级如下:
1├── import.py
2├── csv2vdb_config.yaml
3└── data
4    ├── medium_articles_with_json
5    │   ├── table_config.json
6    │   ├── 0000000000.csv
7    │   ├── 0000000001.csv
8    │   └── 0000000002.csv
9    └── test
10        ├── table_config.json
11        └── 0000000000.csv导入脚本
导入脚本如下:
1import pymochow
2import yaml
3import orjson
4import json
5import glob
6import csv
7import os
8import time
9from pymochow.configuration import Configuration
10from pymochow.auth.bce_credentials import BceCredentials
11from pymochow.exception import ClientError, ServerError
12from pymochow.model.database import Database
13from pymochow.model.schema import (
14    Schema,
15    Field,
16    SecondaryIndex,
17    VectorIndex,
18    HNSWParams,
19    HNSWPQParams,
20    PUCKParams,
21    AutoBuildTiming
22)
23from pymochow.model.enum import FieldType, ElementType, IndexType, MetricType, PartitionType, ServerErrCode
24from pymochow.model.enum import TableState, IndexState
25from pymochow.model.table import (
26    Partition,
27    Row,
28)
29
30def get_fields(table_config) -> list[Field]:
31    # schema
32    table_schema = table_config['schema']
33    # fields in schema
34    fields = []
35    for field_schema in table_schema['fields']:
36        field_name = field_schema['fieldName']
37        field_type = FieldType(field_schema['fieldType'])
38        elment_type = None
39        max_capacity = None
40        if field_type == FieldType.ARRAY:
41            elment_type = ElementType(field_schema['elementType'])
42            max_capacity = field_schema['maxCapacity']
43
44        primary_key = False
45        if 'primaryKey' in field_schema:
46            primary_key = field_schema['primaryKey']
47
48        partition_key = False
49        if 'partitionKey' in field_schema:
50            partition_key = field_schema['partitionKey']
51
52        auto_increment = False
53        if 'autoIncrement' in field_schema:
54            auto_increment = field_schema['autoIncrement']
55
56        not_null = False
57        if 'notNull' in field_schema:
58            not_null = field_schema['notNull']
59
60        dimension = 0
61        if 'dimension' in field_schema:
62            dimension = field_schema['dimension']
63
64        field = Field(field_name = field_name,
65                      field_type = field_type,
66                      primary_key = primary_key,
67                      partition_key = partition_key,
68                      auto_increment = auto_increment,
69                      not_null = not_null,
70                      dimension = dimension,
71                      element_type = elment_type,
72                      max_capacity = max_capacity,
73                      )
74        fields.append(field)
75    return fields
76
77def get_indexes(table_config) -> list:
78    # schema
79    table_schema = table_config['schema']
80    # indexes in schema
81    indexes = []
82    if 'indexes' in table_schema:
83        for index_schema in table_schema['indexes']:
84            field = index_schema['field']
85            index_name = index_schema['indexName']
86            index_type = IndexType(index_schema['indexType'])
87            if index_type == IndexType.SECONDARY_INDEX:
88                index = SecondaryIndex(index_name = index_name,
89                                       field = field)
90                indexes.append(index)
91                continue
92
93            metric_type = MetricType[index_schema['metricType']]
94
95            if index_type == IndexType.HNSW:
96                m = index_schema['params']['M']
97                ef_construction = index_schema['params']['efConstruction']
98                params = HNSWParams(m = m,
99                                     efconstruction = ef_construction)
100                index = VectorIndex(index_name = index_name,
101                                    index_type = index_type,
102                                    field = field,
103                                    metric_type = metric_type,
104                                    params = params
105                                    )
106                indexes.append(index)
107            elif index_type == IndexType.HNSWPQ:
108                m = index_schema['params']['M']
109                ef_construction = index_schema['params']['efConstruction']
110                sample_rate = index_schema['params']['sampleRate']
111                NSQ = index_schema['params']['NSQ']
112                params = HNSWPQParams(m = m,
113                                     efconstruction = ef_construction,
114                                     NSQ=NSQ,
115                                     samplerate = sample_rate)
116                index = VectorIndex(index_name = index_name,
117                                    index_type = index_type,
118                                    field = field,
119                                    metric_type = metric_type,
120                                    params = params
121                                    )
122                indexes.append(index)
123            elif index_type == IndexType.PUCK:
124                course_cluster_count = index_schema['params']['courceClusterCount']
125                fine_cluster_count = index_schema['params']['fineClusterCount']
126                params = PUCKParams(courceClusterCount = course_cluster_count,
127                                    fineClusterCount = fine_cluster_count)
128                index = VectorIndex(index_name = index_name,
129                                    index_type = index_type,
130                                    field = field,
131                                    metric_type = metric_type,
132                                    params = params
133                                    )
134                indexes.append(index)
135            elif index_type == IndexType.FLAT:
136                index = VectorIndex(index_name = index_name,
137                                    index_type = index_type,
138                                    field = field,
139                                    metric_type = metric_type)
140                indexes.append(index)
141            else:
142                raise Exception("not support index type")
143    return indexes
144
145def create_table(db, table_config) -> bool:
146    db_name = table_config['database']
147    table_name = table_config['table']
148    replication = table_config['replication']
149
150    # check if table already exist
151    table_list = db.list_table()
152    for table_item in table_list:
153        if table_item.table_name == table_name:
154            print("")
155            print("Table already exist. database: {}, table:{}".format(db_name, table_name))
156            print("")
157            return False
158
159    # partition
160    partition_type = table_config['partition']['partitionType']
161    partition_num = table_config['partition']['partitionNum']
162    partition = Partition(partition_num = partition_num,
163                          partition_type = PartitionType[partition_type])
164
165    fields = get_fields(table_config)
166    indexes = get_indexes(table_config)
167
168    schema = Schema(fields = fields,
169                    indexes = indexes)
170
171    db.create_table(
172        table_name,
173        replication,
174        partition,
175        schema
176    )
177
178    while True:
179        time.sleep(2)
180        table = db.describe_table(table_name)
181        if table.state == TableState.NORMAL:
182            break
183
184    print("")
185    print("Succeed to create table. table: {}".format(table.to_dict()))
186    print("")
187    return True
188
189if __name__ == "__main__":
190    with open('csv2vdb_config.yaml', 'r', encoding='utf-8') as config_file:
191        config = yaml.safe_load(config_file)
192
193    print("current config:" + str(config))
194    print('#'*60)
195    print("")
196
197    db_host = config['database']['host']
198    db_port = config['database']['port']
199    account = config['database']['account']
200    api_key = config['database']['api_key']
201    db_name = config['database']['db_name']
202    data_path = config['data_path']
203
204    # initialize client
205    endpoint=f"http://{db_host}:{db_port}"
206    config = Configuration(credentials=BceCredentials(account, api_key),
207            endpoint=endpoint)
208    client = pymochow.MochowClient(config)
209
210    # create database if not exist
211    db_list = client.list_databases()
212    exist_flag = False
213    for db_item in db_list:
214        if db_item.database_name == db_name:
215            exist_flag = True
216            break
217    if not exist_flag:
218        client.create_database(db_name)
219    db = client.database(db_name)
220
221    for table_name in os.listdir(data_path):
222        print('+'*60)
223        table_folder = os.path.join(data_path, table_name)
224        # create table
225        table_config_file = os.path.join(table_folder, 'table_config.json')
226        if not os.path.exists(table_config_file):
227            print(f"No table_config.json found for table: {table_name}. Skipping...")
228            print('+'*60)
229            print("")
230            continue
231        succeed_flag = False
232        with open(table_config_file, 'r', encoding='utf-8') as file:
233            data = file.read()
234            table_config = orjson.loads(data)
235            succeed_flag = create_table(db, table_config)
236
237        if not succeed_flag:
238            print('+'*60)
239            print("")
240            continue
241
242        # start inserting table
243        table = db.table(table_name)
244        print(f"Begin Process table: {table_name}")
245        fields = get_fields(table_config)
246        field_dict = {field.field_name: field for field in fields}
247        if os.path.isdir(table_folder):
248            cnt = 0
249            csv_files = glob.glob(os.path.join(table_folder, '*.csv'))
250            for csv_file in csv_files:
251                rows = []
252                with open(csv_file, 'r', encoding='utf-8') as file:
253                    csv_reader = csv.reader(file, delimiter='|')
254                    # 读取第一行作为表头
255                    headers = next(csv_reader)
256                    # print("header:", headers)
257                    # 遍历数据行
258                    for row in csv_reader:
259                        for i in  range(0, len(row)):
260                            # skip string type field
261                            if field_dict[headers[i]].field_type == FieldType.STRING:
262                                continue
263                            if field_dict[headers[i]].field_type == FieldType.ARRAY and field_dict[headers[i]].element_type == ElementType.STRING:
264                                row[i] = json.loads(row[i])
265                            row[i] = orjson.loads(row[i])
266                        row_dict = dict(zip(headers, row))
267                        rows.append(Row(**row_dict))
268                        if len(rows) >= 1000:
269                            table.upsert(rows=rows)
270                            rows = []
271                    if len(rows) > 0:
272                        table.upsert(rows=rows)
273                        rows = []
274
275                cnt += 1
276                print(f"Imported data from: {csv_file} | {cnt}/{len(csv_files)} file(s) Done")
277        
278        table_stats = table.stats()
279        print(f"Finish importing table: {table_name}, stats: {table_stats}")
280        print('+'*60)
281        print("")
282
283    print(f"Finish importing all tables. Please consider rebuilding index before searching.")
284    print('#'*60)
285    print("")
286
287    client.close()yaml 文件配置
csv2vdb_config.yaml文件配置如下:
1database:
2  host: "192.16.XX.XX"            # VDB实例的IP地址
3  port: 5287                      # VDB实例端口号
4  account: "root"                 # VDB的账号
5  api_key: ""                     # VDB的api秘钥
6  db_name: "vector_database"      # 导入目标数据库名 
7
8data_path: "./data"            # 导入数据源需要根据目标端 VDB 集群信息修改 yaml 文件中的相关配置项。
运行与检查
运行 import.py,并检查输入结果。在 VDB 中检查数据是否成功导入。
使用mochow-cli 检查迁移后的数据表
1mochow-cli > list db
2+--------------------+
3| databases          |
4+====================+
5| vector_database    |
6+--------------------+
7
8mochow-cli > list tb -d vector_database
9+----------------------+
10| tables               |
11+======================+
12| export_collection    |
13+----------------------+
14| test_collection      |
15+----------------------+
16
17mochow-cli > stats tb -d vector_database -t export_collection
18+-------------+---------------------+-------------------+
19| rowCount    | memorySizeInByte    | diskSizeInByte    |
20+=============+=====================+===================+
21| 1000        | 242037              | 151525            |
22+-------------+---------------------+-------------------+重建索引
在使用前,推荐用户重建向量索引以提升查询速度。相关操作可查阅 OpenAPI 或 SDK 相关文档。
