Milvus离线迁移VectorDB解决方案
概述
本方案旨在帮助用户将 Milvus 中的数据离线全量迁移至 Baidu VectorDB 中。
前置准备
- 本方案支持 Milvus 2.3.0 及以上版本迁移至 Baidu VDB 2.1 及以上版本
- 需要 python 3.8.0 及以上环境
-
需要安装以下 python 库
- pymochow版本 >= 2.1
- pymilvus版本 >= 2.3.0
pip install pymochow
pip install pymilvus
pip install pyaml
pip install tqdm
迁移限制
导出脚本会自动完成集合的 schema 映射:
- 如果字段类型不支持迁移,则该集合导出失败
- 如果索引类型不支持迁移,则会跳过该索引,需要在迁移结束后手动创建索引
结构对象迁移
逻辑概念
Milvus 与 VDB 逻辑概念对比如下:
逻辑概念 | Mochow | Milvus |
---|---|---|
库 | Database | Database |
表/集合 | Table | Collection |
分片/分区 | Partition | Partition |
分段(一个分区内部进一步分段) | 不对外暴露 | Segment |
列族 | 不对外暴露 | 无 |
行/记录/对象/实体 | Row | Entity |
列/字段 | Field | Field |
索引 | Index | Index |
数据类型
Milvus 与 VDB 数据类型对比如下:
数据类型 | Mochow | Milvus |
---|---|---|
布尔类型 | BOOL | BOOL |
8/16/32/64 位整型 | INT8/16/32/64 | INT8/16/32/64 |
8/16/32/64 位无符号整型 | UINT8/16/32/64 | 无 |
单/双精度浮点类型 | FLOAT/DOUBLE | FLOAT/DOUBLE |
日期类型 | DATE | 无 |
日期时间类型 | DATETIME | 无 |
UUID | UUID | 无 |
字符串 | STRING | VARCHAR |
二进制 | BINARY | 无 |
各类文本类型 | TEXT/TEXT_GBK/TEXT_GB18030 | 无 |
JSON | JSON(即将上线) | JSON |
ARRAY | ARRAY | ARRAY |
单精度浮点向量(稠密) | FLOAT_VECTOR | FLOAT_VECTOR |
半精度浮点向量(稠密) | 无 | B/FLOAT16_VECTOR |
单精度稀疏浮点向量 | SPARSE_FLOAT_VECTOR(即将上线) | SPARSE_FLOAT_VECTOR |
二进制向量 | BINARY_VECTOR(即将上线) | BINARY_VECTOR |
空/未知类型 | 无 | NONE/UNKNOWN |
Array 支持的元素类型
数据类型 | Mochow | Milvus |
---|---|---|
布尔类型 | BOOL | BOOL |
8/16/32/64 位整型 | INT8/16/32/64 | INT8/16/32/64 |
8/16/32/64 位无符号整型 | UINT8/16/32/64 | 无 |
单/双精度浮点类型 | FLOAT/DOUBLE | FLOAT/DOUBLE |
日期类型 | DATE | 无 |
日期时间类型 | DATETIME | 无 |
UUID | UUID | 无 |
字符串 | STRING | VARCHAR |
二进制 | BINARY | 无 |
各类文本类型 | TEXT/TEXT_GBK/TEXT_GB18030 | 无 |
JSON | 无 | JSON |
ARRAY | 无 | ARRAY |
索引概念
标量索引概念对比如下:
标量索引概念 | Mochow | Milvus |
---|---|---|
主键索引 | 隐含 | 隐含 |
二级索引 | SECONDARY | STL_SORT TRIE |
倒排文本索引 | INVERTED | INVERTED |
过滤索引 | FILTERING | 无 |
向量索引概念对比如下:
向量索引概念 | Mochow | Milvus |
---|---|---|
HNSW | HNSW | HNSW |
HNSW 标量量化 | HNSW_SQ | 无 |
HNSW 乘积量化 | HNSW_PQ | 无 |
平坦索引(KNN) | FLAT | FLAT |
倒排结构索引 | PUCK PUCK_PQ | IVF_*系列 |
DiskANN | 无 | DISKANN |
稀疏向量索引 | SPARSE_OPTIMIZED_FLAT(即将上线) | SPARSE_*系列 |
二进制向量索引 | 无需指定索引,默认为 FLAT | BIN_*系列 |
自动索引 | 无 | AUTOINDEX |
GPU 索引 | 无 | GPU_*系列 |
全量迁移流程
Milvus 全量导出到本地
本地文件结构
准备好导出脚本export.py
及导出配置文件milvus2csv_config.yaml
,并创建输出目录,本文以data
为例。将导出脚本export.py
、导出配置文件milvus2csv_config.yaml
及输出目录data
存放至同一个目录下。目录层级如下:
├── export.py
├── milvus2csv_config.yaml
└── data
导出脚本
导出脚本如下:
import yaml
import orjson
import json
import os
import csv
import logging
import re
import time
from tqdm import tqdm
from pymilvus import (
connections,
DataType,
Collection,
MilvusClient
)
from pymilvus.orm.index import (
Index
)
from pymochow.model.enum import (
FieldType as mochowFieldType,
IndexType as mochowIndexType,
MetricType as mochowMetricType,
ElementType as mochowElementType,
)
from pymochow.model.schema import (
Schema as mochowSchema,
Field as mochowField,
SecondaryIndex as mochowSecondaryIndex,
VectorIndex as mochowVectorIndex,
HNSWParams as mochowHNSWParams,
)
from pymochow.model.table import Partition as mochowPartition
with open("./export.yaml", "r") as f:
config = yaml.safe_load(f)
print("configuration:")
print(config)
milvus_config = config["milvus"]
export_config = config["export"]
milvus_type_to_mochow_type = {
DataType.BOOL: mochowFieldType.BOOL,
DataType.INT8: mochowFieldType.INT8,
DataType.INT16: mochowFieldType.INT16,
DataType.INT32: mochowFieldType.INT32,
DataType.INT64: mochowFieldType.INT64,
DataType.FLOAT: mochowFieldType.FLOAT,
DataType.DOUBLE: mochowFieldType.DOUBLE,
DataType.STRING: mochowFieldType.STRING,
DataType.VARCHAR: mochowFieldType.STRING,
DataType.FLOAT_VECTOR: mochowFieldType.FLOAT_VECTOR,
DataType.ARRAY: mochowFieldType.ARRAY,
# not support now
# DataType.FLOAT16_VECTOR: mochowFieldType.FLOAT_VECTOR,
# DataType.BFLOAT16_VECTOR: mochowFieldType.FLOAT_VECTOR,
# DataType.JSON: mochowFieldType.JSON,
# DataType.SPARSE_FLOAT_VECTOR: mochowFieldType.SPARSE_FLOAT_VECTOR,
# DataType.BINARY_VECTOR: mochowFieldType.BINARY_VECTOR,
# DataType.UNKNOWN: "NULL",
}
milvus_array_element_type_to_mochow_array_element_type = {
DataType.BOOL: mochowElementType.BOOL,
DataType.INT8: mochowElementType.INT8,
DataType.INT16: mochowElementType.INT16,
DataType.INT32: mochowElementType.INT32,
DataType.INT64: mochowElementType.INT64,
DataType.FLOAT: mochowElementType.FLOAT,
DataType.DOUBLE: mochowElementType.DOUBLE,
DataType.VARCHAR: mochowElementType.STRING,
}
basic_data_type = [
DataType.BOOL,
DataType.INT8,
DataType.INT16,
DataType.INT32,
DataType.INT64,
DataType.FLOAT,
DataType.DOUBLE,
DataType.VARCHAR,
DataType.STRING,
]
scalar_index_supported = {
"STL_SORT", "Trie"
}
vector_index_supported = {
"HNSW": mochowIndexType.HNSW,
"FLAT": mochowIndexType.FLAT,
}
metirc_type_map = {
"L2": mochowMetricType.L2,
"IP": mochowMetricType.IP,
"COSINE": mochowMetricType.COSINE,
}
# check field
def is_legal_field_name(field_name: str) -> bool:
pattern = r"(^[A-Za-z][A-Za-z0-9_]{0,254}$)"
return bool(re.match(pattern, field_name))
def check_valid_field(field) -> dict:
if field.dtype not in milvus_type_to_mochow_type:
return {
"valid": False,
"error": f"field type not supported yet: {field.to_dict()}"
}
if field.dtype == DataType.ARRAY and field.element_type not in milvus_array_element_type_to_mochow_array_element_type:
return {
"valid": False,
"error": f"element type not supported yet: {field.to_dict()}"
}
if not is_legal_field_name(field.name):
return {
"valid": False,
"error": f"invalid field name, Only uppercase and lowercase letters, numbers and underscores (_) are supported. It must start with a letter and the length is limited to 1-255: {field.to_dict()}"
}
return {
"valid": True,
"error": ""
}
def get_mochow_field_from_milvus_field(field, is_not_null: bool) -> mochowField:
field_type = milvus_type_to_mochow_type[field.dtype]
primary_key = False
partition_key = False
auto_increment = False
not_null = False
dimension = 0
if field.is_primary:
primary_key = True
auto_increment = field.auto_id
not_null = True
if field.is_partition_key:
partition_key = True
not_null = True
if field.dtype == DataType.FLOAT_VECTOR:
if "dim" not in field.params:
raise Exception(f"not found param 'dim' of FLOAT_VECTOR")
dimension = field.params["dim"]
not_null = True
if is_not_null:
not_null = True
element_type = None
max_capacity = None
if field.dtype == DataType.ARRAY:
element_type = milvus_array_element_type_to_mochow_array_element_type[field.element_type]
if "max_capacity" not in field.params:
raise Exception(f"not found param 'max_capacity' of array")
max_capacity = field.params["max_capacity"]
mochow_field = mochowField(
field_name=field.name,
field_type=field_type,
primary_key=primary_key,
partition_key=partition_key,
auto_increment=auto_increment,
not_null=not_null,
dimension=dimension,
element_type=element_type,
max_capacity=max_capacity,
)
return mochow_field
def get_mochow_index_from_milvus_index(index: Index) -> mochowVectorIndex:
if 'index_type' not in index.params:
logger.warning(f"index type not supported yet: {index.to_dict()}")
return None
index_type = index.params['index_type']
if index_type in scalar_index_supported:
return mochowSecondaryIndex(
index_name=index.index_name,
field=index.field_name
)
if index_type in vector_index_supported:
mochow_index_type = vector_index_supported[index_type]
if 'metric_type' not in index.params:
raise Exception(f"not found param 'metric_type' of vector index: {index.to_dict()}")
metric_type = index.params['metric_type']
if metric_type not in metirc_type_map:
return None
mochow_metirc_type = metirc_type_map[metric_type]
params = None
if mochow_index_type == mochowIndexType.HNSW:
if 'M' not in index.params or 'efConstruction' not in index.params:
raise Exception(f"invalid HNSW param of vector index: {index.to_dict()}")
params = mochowHNSWParams(
m=int(index.params['M']),
efconstruction=int(index.params['efConstruction'])
)
return mochowVectorIndex(
index_name=index.index_name,
field=index.field_name,
index_type=mochow_index_type,
metric_type=mochow_metirc_type,
params=params
)
logger.warning(f"index type not supported yet: {index.to_dict()}")
return None
def get_partition_info(collection_name: str) -> mochowPartition:
client = MilvusClient(**milvus_config)
collection_desc = client.describe_collection(collection_name=collection_name)
if 'num_partitions' not in collection_desc:
raise Exception(f"not found param 'num_partitions' of collection: {collection_desc}")
return mochowPartition(
partition_num=collection_desc['num_partitions'],
)
def dump_collection_schema(collection_name: str, logger: logging.Logger, log_file: str) -> bool:
connections.connect(**milvus_config)
collection = Collection(collection_name)
mochow_indexes = []
mochow_index_field_set = set()
has_no_support_index_type = False
for index in collection.indexes:
mochow_index = get_mochow_index_from_milvus_index(index)
if mochow_index is None:
has_no_support_index_type = True
continue
mochow_index_field_set.add(mochow_index.field)
mochow_indexes.append(mochow_index)
if has_no_support_index_type:
print(f"found index_type which is not supported yet")
not_support_fields = {}
mochow_fields = []
has_invalid_field = False
for field in collection.schema.fields:
check_field = check_valid_field(field)
if not check_field["valid"]:
not_support_fields[field.name] = field
has_invalid_field = True
else:
mochow_fields.append(get_mochow_field_from_milvus_field(field, field.name in mochow_index_field_set))
if has_invalid_field:
for field in not_support_fields.values():
check_field = check_valid_field(field)
logger.error(check_field["error"])
return False
schema = mochowSchema(
fields=mochow_fields,
indexes=mochow_indexes,
)
partiton = get_partition_info(collection_name)
create_table_args = {
"database": milvus_config["db_name"],
"table": collection_name,
"replication": export_config["replication"],
"partition": partiton.to_dict(),
"schema": schema.to_dict(),
"enableDynamicField": False,
}
if collection.schema.description != "":
create_table_args["description"] = collection.schema.description
json_bytes = orjson.dumps(create_table_args)
output_file_name = os.path.join(export_config["output_path"], collection_name, "table_config.json")
with open(output_file_name, 'wb') as json_file:
json_file.write(json_bytes)
print(f"generate table config of {collection_name} succeed")
return True
# convert data to str
# def convert_to_binary(binary_data):
# decimal_value = int.from_bytes(binary_data, byteorder='big')
# binary_string = bin(decimal_value)[2:].zfill(len(binary_data) * 8)
# return ','.join(list(binary_string))
def basic_data_convert_to_str(data, dtype, delimeter):
if dtype == DataType.BOOL:
return "true" if data else "false"
elif dtype in [DataType.INT8, DataType.INT16,
DataType.INT32, DataType.INT64,
DataType.FLOAT, DataType.DOUBLE]:
return str(data)
elif dtype in [DataType.STRING, DataType.VARCHAR]:
return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"")
raise Exception(f"Unsupported Basic DataType {dtype}")
def data_convert_array_to_str(data, element_type, delimeter="|"):
if element_type in milvus_array_element_type_to_mochow_array_element_type:
if element_type == DataType.VARCHAR or element_type == DataType.STRING:
tmp_data = [basic_data_convert_to_str(x, element_type, delimeter) for x in data ]
orjson_str = orjson.dumps(tmp_data).decode("utf-8")
return json.dumps(orjson_str)
return "[" + ", ".join(basic_data_convert_to_str(x, element_type, delimeter) for x in data ) + "]"
raise Exception(f"Unsupported ElementType {element_type}")
def data_convert_to_str(data, dtype, delimeter):
if dtype in basic_data_type:
return basic_data_convert_to_str(data, dtype, delimeter)
elif dtype == DataType.FLOAT_VECTOR:
return data
elif dtype == DataType.ARRAY:
return data
raise Exception(f"Unsupported DataType {dtype}")
def csv_write_rows(datum, fd, fields_types, delimiter="|"):
for data in datum:
for i in range(len(data)):
ftype = fields_types[i]["dtype"]
data[i] = data_convert_to_str(data[i], ftype, delimiter)
fd.writerow(data)
def csv_write_header(headers: list, fd, delimiter="|"):
fd.writerow(headers)
def dump_collection(collection_name: str, logger: logging.Logger):
results = []
file_cnt = 0
print("connecting to milvus...")
connections.connect(**milvus_config)
export_config = config["export"]
collection = Collection(collection_name)
collection.load()
tmp_path = os.path.join(export_config["output_path"], collection_name)
if not os.path.exists(tmp_path):
os.mkdir(tmp_path)
fields_types = []
headers = []
for schema in collection.schema.fields:
field_type = {"dtype": schema.dtype}
if schema.dtype == DataType.ARRAY:
field_type["element_type"] = schema.element_type
fields_types.append(field_type)
headers.append(schema.name)
total_num = collection.num_entities
collection.load()
query_iterator = collection.query_iterator(batch_size=1000, expr="", output_fields=headers)
def write_to_csv_file(col_names, data):
if len(results) == 0:
return
nonlocal file_cnt
assert(file_cnt <= 1e9)
output_file_name = os.path.join(export_config["output_path"], collection_name, f"{str(file_cnt).zfill(10)}.csv")
with open(output_file_name, "w", newline="") as csv_file:
writer = csv.writer(csv_file, delimiter="|")
# write header
csv_write_header(col_names, writer)
# write data
csv_write_rows(data, writer, fields_types)
file_cnt += 1
results.clear()
with tqdm(total=total_num, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") as pbar:
while True:
res = query_iterator.next()
if len(res) == 0:
# close the iterator
query_iterator.close()
break
for row in res:
row_list = []
for i in range(len(headers)):
field = row[headers[i]]
if isinstance(field, list) and fields_types[i]["dtype"] == DataType.FLOAT_VECTOR:
row_list.append("[" + ", ".join(str(x) for x in field) + "]")
elif isinstance(field, list) and fields_types[i]["dtype"] == DataType.ARRAY:
row_list.append(data_convert_array_to_str(field, fields_types[i]["element_type"]))
elif isinstance(field, dict):
row_list.append(orjson.dumps(field))
else:
row_list.append(field)
results.append(row_list)
if len(results) >= export_config["max_line_in_file"]:
write_to_csv_file(headers, data=results)
pbar.update(1)
write_to_csv_file(headers, data=results)
def check_db_collections():
connections.connect(**milvus_config)
for name in config["export"]["collections"]:
collection = Collection(name)
if __name__ == "__main__":
check_db_collections()
for name in config["export"]["collections"]:
print(f"start export {name}")
logger = logging.getLogger(f"export: {name}")
dir = os.path.join(export_config["output_path"], name)
os.makedirs(dir, exist_ok=True)
log_file = os.path.join(export_config["output_path"], name, "warning.log")
file_handler = logging.FileHandler(log_file, mode='w')
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.setLevel(logging.INFO)
start_time = time.time()
if dump_collection_schema(name, logger, log_file):
dump_collection(name, logger)
end_time = time.time()
elapsed_time_seconds = end_time - start_time
logger.info(f'time cost for export {name}: {elapsed_time_seconds:.4f} seconds')
print(f'time cost for export {name}: {elapsed_time_seconds:.4f} seconds')
else:
print(f"generate table config failed, please check {log_file}")
yaml 文件配置
milvus2csv_config.yaml
文件配置如下:
milvus:
host: '<localhost>' # Milvus服务主机地址
port: 19530 # Milvus服务端口
user: '<user_name>' # 用户名
password: '<password>' # 密码
db_name: '<database_name>' # 数据库名
token: '<token_id>' # 访问token
export:
collections:
- 'export_collection'
- 'test_collection'
# - 'hello_milvus'
# - 'car'
# - 'medium_articles_with_dynamic'
# 填写所有需要导出的collection名称
max_line_in_file: 40000 # 导出文件切分行数
output_path: './data' # 导出目标目录,本文以./data为例
replication: 3 # 单个表分区的副本数量(含主副本),取值范围为[1,10]
partitionType: "HASH" # 分区类型枚举,当前仅支持"HASH"。
需要根据源端 Milvus 集群信息修改 yaml 文件中的相关配置项。
运行与检查
运行 export.py,并检查输出结果。输出结果如下:
.
├── export.py
├── milvus2csv_config.yaml
└── data
├── medium_articles_with_json
| ├── warning.log
│ ├── table_config.json
│ ├── 0000000000.csv
│ ├── 0000000001.csv
│ └── 0000000002.csv
└── test
├── warning.log
├── table_config.json
└── 0000000000.csv
其中 table_config.json 为由 Milvus 表转化而来的、符合 VDB 概念的表结构,用于在 VDB 中创建对应的表。其结构如下:
{
"database": "benchmark_db",
"table": "benchmark_table",
"replication": 3,
"partition": {
"partitionType": "HASH",
"partitionNum": 1
},
"schema": {
"fields": [
{
"fieldName": "id",
"fieldType": "UINT64",
"primaryKey": true,
"partitionKey": true,
"autoIncrement": false,
"notNull": true
},
{
"fieldName": "vector",
"fieldType": "FLOAT_VECTOR",
"dimension": 3,
"notNull": true
}
]
}
}
0000000000.csv 则是表中一部分数据,以 csv 格式呈现。其结构如下:
// csv的第一行为列名,列之间采用|分隔
my_id|my_vector
10|[-0.65697396, -0.25675339, 0.21634601, -0.7329633, 0.655407]
11|[0.61959845, -0.105865225, -0.27274612, 0.31226006, -0.9997308]
12|[-0.68236375, 0.07309595, 0.8319045, -0.34065887, -0.5734473]
13|[-0.026351633, -0.20818894, -0.8079838, -0.53878766, -0.6403443]
14|[0.27372813, -0.26643738, -0.5988725, -0.66084, 0.3210081]
warning.log 中包含导出过程中告警日志,其内容如下:
2024-10-22 13:55:50,152 - export: export_collection - WARNING - index type not supported yet: {'collection': 'export_collection', 'field': 'name', 'index_name': 'name', 'index_param': {'index_type': 'INVERTED'}}
2024-10-22 13:55:50,152 - export: export_collection - WARNING - index type not supported yet: {'collection': 'export_collection', 'field': 'ivf_flat_embedding', 'index_name': 'ivf_flat_embedding', 'index_param': {'index_type': 'IVF_FLAT', 'metric_type': 'COSINE', 'params': {'nlist': 128}}}
本地数据全量导入 VDB
本地文件结构
准备好导入脚本import.py
、导入配置文件csv2vdb_config.yaml
及需要导入的数据data(即在导出步骤中得到的data
目录)。将导入脚本import.py
、导出配置文件csv2vdb_config``.yaml
及输出目录data
存放至同一个目录下。目录层级如下:
├── import.py
├── csv2vdb_config.yaml
└── data
├── medium_articles_with_json
│ ├── table_config.json
│ ├── 0000000000.csv
│ ├── 0000000001.csv
│ └── 0000000002.csv
└── test
├── table_config.json
└── 0000000000.csv
导入脚本
导入脚本如下:
import pymochow
import yaml
import orjson
import json
import glob
import csv
import os
import time
from pymochow.configuration import Configuration
from pymochow.auth.bce_credentials import BceCredentials
from pymochow.exception import ClientError, ServerError
from pymochow.model.database import Database
from pymochow.model.schema import (
Schema,
Field,
SecondaryIndex,
VectorIndex,
HNSWParams,
HNSWPQParams,
PUCKParams,
AutoBuildTiming
)
from pymochow.model.enum import FieldType, ElementType, IndexType, MetricType, PartitionType, ServerErrCode
from pymochow.model.enum import TableState, IndexState
from pymochow.model.table import (
Partition,
Row,
)
def get_fields(table_config) -> list[Field]:
# schema
table_schema = table_config['schema']
# fields in schema
fields = []
for field_schema in table_schema['fields']:
field_name = field_schema['fieldName']
field_type = FieldType(field_schema['fieldType'])
elment_type = None
max_capacity = None
if field_type == FieldType.ARRAY:
elment_type = ElementType(field_schema['elementType'])
max_capacity = field_schema['maxCapacity']
primary_key = False
if 'primaryKey' in field_schema:
primary_key = field_schema['primaryKey']
partition_key = False
if 'partitionKey' in field_schema:
partition_key = field_schema['partitionKey']
auto_increment = False
if 'autoIncrement' in field_schema:
auto_increment = field_schema['autoIncrement']
not_null = False
if 'notNull' in field_schema:
not_null = field_schema['notNull']
dimension = 0
if 'dimension' in field_schema:
dimension = field_schema['dimension']
field = Field(field_name = field_name,
field_type = field_type,
primary_key = primary_key,
partition_key = partition_key,
auto_increment = auto_increment,
not_null = not_null,
dimension = dimension,
element_type = elment_type,
max_capacity = max_capacity,
)
fields.append(field)
return fields
def get_indexes(table_config) -> list:
# schema
table_schema = table_config['schema']
# indexes in schema
indexes = []
if 'indexes' in table_schema:
for index_schema in table_schema['indexes']:
field = index_schema['field']
index_name = index_schema['indexName']
index_type = IndexType(index_schema['indexType'])
if index_type == IndexType.SECONDARY_INDEX:
index = SecondaryIndex(index_name = index_name,
field = field)
indexes.append(index)
continue
metric_type = MetricType[index_schema['metricType']]
if index_type == IndexType.HNSW:
m = index_schema['params']['M']
ef_construction = index_schema['params']['efConstruction']
params = HNSWParams(m = m,
efconstruction = ef_construction)
index = VectorIndex(index_name = index_name,
index_type = index_type,
field = field,
metric_type = metric_type,
params = params
)
indexes.append(index)
elif index_type == IndexType.HNSWPQ:
m = index_schema['params']['M']
ef_construction = index_schema['params']['efConstruction']
sample_rate = index_schema['params']['sampleRate']
NSQ = index_schema['params']['NSQ']
params = HNSWPQParams(m = m,
efconstruction = ef_construction,
NSQ=NSQ,
samplerate = sample_rate)
index = VectorIndex(index_name = index_name,
index_type = index_type,
field = field,
metric_type = metric_type,
params = params
)
indexes.append(index)
elif index_type == IndexType.PUCK:
course_cluster_count = index_schema['params']['courceClusterCount']
fine_cluster_count = index_schema['params']['fineClusterCount']
params = PUCKParams(courceClusterCount = course_cluster_count,
fineClusterCount = fine_cluster_count)
index = VectorIndex(index_name = index_name,
index_type = index_type,
field = field,
metric_type = metric_type,
params = params
)
indexes.append(index)
elif index_type == IndexType.FLAT:
index = VectorIndex(index_name = index_name,
index_type = index_type,
field = field,
metric_type = metric_type)
indexes.append(index)
else:
raise Exception("not support index type")
return indexes
def create_table(db, table_config) -> bool:
db_name = table_config['database']
table_name = table_config['table']
replication = table_config['replication']
# check if table already exist
table_list = db.list_table()
for table_item in table_list:
if table_item.table_name == table_name:
print("")
print("Table already exist. database: {}, table:{}".format(db_name, table_name))
print("")
return False
# partition
partition_type = table_config['partition']['partitionType']
partition_num = table_config['partition']['partitionNum']
partition = Partition(partition_num = partition_num,
partition_type = PartitionType[partition_type])
fields = get_fields(table_config)
indexes = get_indexes(table_config)
schema = Schema(fields = fields,
indexes = indexes)
db.create_table(
table_name,
replication,
partition,
schema
)
while True:
time.sleep(2)
table = db.describe_table(table_name)
if table.state == TableState.NORMAL:
break
print("")
print("Succeed to create table. table: {}".format(table.to_dict()))
print("")
return True
if __name__ == "__main__":
with open('csv2vdb_config.yaml', 'r') as config_file:
config = yaml.safe_load(config_file)
print("current config:" + str(config))
print('#'*60)
print("")
db_host = config['database']['host']
db_port = config['database']['port']
account = config['database']['account']
api_key = config['database']['api_key']
db_name = config['database']['db_name']
data_path = config['data_path']
# initialize client
endpoint=f"http://{db_host}:{db_port}"
config = Configuration(credentials=BceCredentials(account, api_key),
endpoint=endpoint)
client = pymochow.MochowClient(config)
# create database if not exist
db_list = client.list_databases()
exist_flag = False
for db_item in db_list:
if db_item.database_name == db_name:
exist_flag = True
break
if not exist_flag:
client.create_database(db_name)
db = client.database(db_name)
for table_name in os.listdir(data_path):
print('+'*60)
table_folder = os.path.join(data_path, table_name)
# create table
table_config_file = os.path.join(table_folder, 'table_config.json')
if not os.path.exists(table_config_file):
print(f"No table_config.json found for table: {table_name}. Skipping...")
print('+'*60)
print("")
continue
succeed_flag = False
with open(table_config_file, 'r') as file:
data = file.read()
table_config = orjson.loads(data)
succeed_flag = create_table(db, table_config)
if not succeed_flag:
print('+'*60)
print("")
continue
# start inserting table
table = db.table(table_name)
print(f"Begin Process table: {table_name}")
fields = get_fields(table_config)
field_dict = {field.field_name: field for field in fields}
if os.path.isdir(table_folder):
cnt = 0
csv_files = glob.glob(os.path.join(table_folder, '*.csv'))
for csv_file in csv_files:
rows = []
with open(csv_file, 'r') as file:
csv_reader = csv.reader(file, delimiter='|')
# 读取第一行作为表头
headers = next(csv_reader)
# print("header:", headers)
# 遍历数据行
for row in csv_reader:
for i in range(0, len(row)):
# skip string type field
if field_dict[headers[i]].field_type == FieldType.STRING:
continue
if field_dict[headers[i]].field_type == FieldType.ARRAY and field_dict[headers[i]].element_type == ElementType.STRING:
row[i] = json.loads(row[i])
row[i] = orjson.loads(row[i])
row_dict = dict(zip(headers, row))
rows.append(Row(**row_dict))
if len(rows) >= 1000:
table.upsert(rows=rows)
rows = []
if len(rows) > 0:
table.upsert(rows=rows)
rows = []
cnt += 1
print(f"Imported data from: {csv_file} | {cnt}/{len(csv_files)} file(s) Done")
table_stats = table.stats()
print(f"Finish importing table: {table_name}, stats: {table_stats}")
print('+'*60)
print("")
print(f"Finish importing all tables. Please consider rebuilding index before searching.")
print('#'*60)
print("")
client.close()
yaml 文件配置
csv2vdb_config.yaml
文件配置如下:
database:
host: "192.16.XX.XX" # VDB实例的IP地址
port: 5287 # VDB实例端口号
account: "root" # VDB的账号
api_key: "" # VDB的api秘钥
db_name: "vector_database" # 导入目标数据库名
data_path: "./data" # 导入数据源
需要根据目标端 VDB 集群信息修改 yaml 文件中的相关配置项。
运行与检查
运行 import.py,并检查输入结果。在 VDB 中检查数据是否成功导入。
使用mochow-cli 检查迁移后的数据表
mochow-cli > list db
+--------------------+
| databases |
+====================+
| vector_database |
+--------------------+
mochow-cli > list tb -d vector_database
+----------------------+
| tables |
+======================+
| export_collection |
+----------------------+
| test_collection |
+----------------------+
mochow-cli > stats tb -d vector_database -t export_collection
+-------------+---------------------+-------------------+
| rowCount | memorySizeInByte | diskSizeInByte |
+=============+=====================+===================+
| 1000 | 242037 | 151525 |
+-------------+---------------------+-------------------+
重建索引
在使用前,推荐用户重建向量索引以提升查询速度。相关操作可查阅 OpenAPI 或 SDK 相关文档。