数据迁移
用户在接入百度智能云Elasticsearch服务时,如需要百度智能云Elasticsearch服务间进行数据迁移或自建Elasticsearch服务数据迁移至百度智能云 Elasticsearch,可以根据自己的业务需求选择合适的迁移方案。本文介绍各迁移方案适用的场景,帮助您根据业务选择合适的场景进行迁移。
- BOS快照迁移
- 在线reindex
- Logstash
- 通过Spark迁移数据
- HDFS快照迁移
BOS快照迁移
snapshot api 是Elasticsearch用于对数据进行备份和恢复的一组 api 接口,可以通过 snapshot api 进行跨集群的数据迁移,原理就是从源 Elasticsearch 集群创建数据快照,然后在目标 Elasticsearch 集群中进行恢复。
百度智能云Elasticsearch基于百度存储BOS(Baidu Object Storage),实现了snapshot api,能够帮助您在本集群内和多集群间进行快照备份与恢复。
具体使用方式参照基于BOS的快照与恢复。
适用场景
- 源端数据量较大(GB、TB、PB级别)的场景
- 恢复数据过程中需要关闭目的集群index
- 源Elasticsearch集群和目的集群Elasticsearch版本符合快照备份与恢复场景的匹配
Snapshot version → new Cluster version | 2.x | 5.x | 6.x | 7.x | 8.x |
---|---|---|---|---|---|
1.x → | |||||
2.x → | |||||
5.x → | |||||
6.x → | |||||
7.x → |
使用方式
- 两个集群分别创建基于BOS的仓库
源Elasticsearch集群需要添加BOS信息来创建仓库。创建命令如下:
POST /_snapshot/es_repo
{
"type": "bos",
"settings": {
"access_key": "your access_key",
"secret_key": "your secret_key",
"endpoint": "s3.bj.bcebos.com",
"bucket": "es-repo",
"base_path": "test_base_path"
}
}
相关参数意义:
参数 | 作用 |
---|---|
type | 仓库的类型,在这里您应该填bos |
access_key | 百度智能云的access_key,可以在百度智能云的console中看到 |
secret_key | 百度智能云的secret_key,可以在百度智能云的console中看到 |
endpoint | BOS对应各个region的服务域名 |
bucket | BOS的bucket,务必保证对应的用户身份有读写bucket权限 |
base_path | 仓库的起始位置,默认为根目录 |
BOS对应各个region的服务域名:
区域 | 访问Endpoint |
---|---|
北京 | s3.bj.bcebos.com |
保定 | s3.bd.bcebos.com |
苏州 | s3.su.bcebos.com |
广州 | s3.gz.bcebos.com |
香港 | s3.hkg.bcebos.com |
金融云武汉专区 | s3.fwh.bcebos.com |
在目的Elasticsearch集群创建仓库时,除了添加BOS信息外,还需要添加readonly参数,以保证只有一个集群可以对仓库进行操作。创建命令如下:
POST /_snapshot/es_repo
{
"type": "bos",
"settings": {
"access_key": "your access_key",
"secret_key": "your secret_key",
"endpoint": "s3.bj.bcebos.com",
"bucket": "es-repo",
"base_path": "test_base_path",
"readonly": true
}
}
- 在源Elasticsearch集群创建数据的快照
PUT /_snapshot/es_repo/snapshot_2020_01_01?wait_for_completion=true
{
"indices": "index_1,index_2",
"ignore_unavailable": true,
"include_global_state": false
}
- 在目的Elasticsearch集群恢复数据快照
POST /_snapshot/es_repo/snapshot_2020_01_01/_restore
{
"indices": "index_1,index_2",
"include_global_state": false
}
具体参数详解参照基于BOS的快照备份恢复。
注意事项
- 需要开通BOS服务并且创建bucket。
- 并且源集群和目的集群能够访问BOS网络。
- 源集群进行快照时,同时只能有一个快照运行,且有删除快照的操作时,不能进行快照。
- 目的集群创建仓库时,需要添加readonly参数,以保证只有一个集群可以对仓库进行写操作。
- 目的集群恢复数据时,恢复的index可以不存在,如果存在必须是
closed
状态。 - 对源集群和目的集群的版本匹配有严格要求,具体要求参见上文表格。
- 使用BOS跨region迁移数据时,可能会产生外网流量费用。
- 对索引做快照的时候,对索引的变更操作不会在快照中出现,用户应该避免这种情况。如果必须变更数据,需要在迁移完后在新的集群进行回放变更操作。同理,其他迁移方式也要注意这点。
在线reindex
reindex from a remote cluster api是Elasticsearch提供的一个 api接口,可以把数据从源Elasticsearch集群导入到当前Elasticsearch集群,实现数据的迁移。原理是从源 Elasticsearch 集群中查询数据,然后写入到目标 Elasticsearch 集群中。
适用场景
- 在线迁移数据
- 对迁移速度要求不高
- 可以对源端集群数据进行查询
使用方式
- 在目的集群的elasticsearch.yml配置reindex白名单
reindex.remote.whitelist: {IP}:{port}
- 在目的集群创建index(如果不需要特殊处理分片数和mapping等,可以省略这一步)
- 做reindex操作
POST _reindex
{
"source": {
"remote": {
"host": "http://{oldhost}:{oldport}",
"username": "{username}",
"password": "{password}"
},
"index": "source_index",
"query": {
"match_all": {}
}
},
"dest": {
"index": "dest_index"
}
}
详细使用方式参照reindex from a remote cluster api。
注意事项
- 目的集群需要连通源集群的网络。
- 需要重启目的集群配置reindex白名单。
白名单修改可以在百度云页面【配置修改】进行修改,
注意:
1、修改配置后,需要重启集群才生效。
2、配置格式为yml格式。注意,在【冒号】后【配置值】前要有一个“空格”。
- 迁移数据时,最好不要对索引有变更操作;如果必须变更数据,需要在迁移完后在新的集群进行回放变更操作。
Logstash
适用场景
- 在线迁移数据
- 可以对源端集群数据进行查询
- 请注意 Logstash 版本的版本是否与Elastic相匹配,建议与 Elasticsearch 版本保持一致。版本兼容性说明请参见Elasticsearch系列产品兼容性。
使用方式
- 安装部署Logstash与Java8以上的jdk环境。
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.4.2.tar.gz
tar xvf logstash-7.4.2.tar.gz
- 根据数据源类型自定义配置文件
*.conf
,配置文件内容如下:
input {
elasticsearch {
hosts => ["http://{oldhost}:{oldport}"]
index => "*"
docinfo => true
}
}
output {
elasticsearch {
hosts => ["http://{newhost}:{newport}"]
index => "%{[@metadata][_index]}"
}
}
- 执行Logstash。
nohup ./bin/logstash -f ~/*.conf 2>&1 >/dev/null &
上述配置文件将源Elasticsearch集群的所有索引同步到目标集群中,也可以设置只同步指定的索引,Logstash 的更多功能可查阅 Logstash 官方文档。
注意事项
- 需要准备Logstash环境并连通Elasticsearch集群的网络。
通过Spark迁移数据
Spark支持从一个Elasticsearch集群中读取数据然后写入到另一个Elasticsearch集群。
使用方式
Spark代码参考如下:
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark.esRDD;
public class MigrateESToES {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("Baidu-Cloud-Migrate-ES-Example");
conf.set("es.index.auto.create", "true");
SparkContext sc = new SparkContext();
// source cluster configuration
Map<String, String> inputCfg = new HashMap<>();
// node ip:port
inputCfg.put("es.nodes", "{Source Elasticseach Host}:8200");
// 要迁移的索引和文档类型,例如test_size索引,文档类型_doc
inputCfg.put("es.resource", "test_size/_doc");
// 如果没有开启认证的话,这两个选项可以不写
// username
// inputCfg.put("es.net.http.auth.user", "superuser");
// password
// inputCfg.put("es.net.http.auth.pass", "xxxx");
// es_version,条件可以的话建议写上
inputCfg.put("es.internal.es.version", "7.4.2");
// 读取_version 字段
// inputCfg.put("es.read.metadata.version", "true");
// 读取_matedata 字段
// inputCfg.put("es.read.metadata", "true");
// dst cluster configuration
Map<String, String> outputCfg = new HashMap<>();
outputCfg.put("es.nodes", "{Dst Cluster}:8200");
outputCfg.put("es.net.http.auth.user", "root");
outputCfg.put("es.net.http.auth.pass", "root");
outputCfg.put("es.internal.es.version", "7.4.2");
// 目标索引和文档类型
outputCfg.put("es.resource", "test_size/_doc");
// 指定写入version type 为外部指定
// outputCfg.put("es.mapping.version.type", "external");
// 指定 index id
// outputCfg.put("es.mapping.id", "_metadata._id");
// 指定 routing id
// outputCfg.put("es.mapping.routing", "_metadata._id");
// 指定 doc version
// outputCfg.put("es.mapping.version", "_metadata._version");
// 关闭refresh
outputCfg.put("es.batch.write.refresh", "false");
// 单次写15mb
outputCfg.put("es.batch.size.bytes", "15mb");
//size (in entries) for batch writes using Elasticsearch bulk API
outputCfg.put("es.batch.size.entries", "5000");
// 异常不处理,打log,避免conflict 导致task failed
outputCfg.put("es.write.rest.error.handlers", "log");
// 异常日志前缀
outputCfg.put("es.write.rest.error.handler.log.logger.name", "es_error_handler");
// outputCfg.put("es.mapping.exclude", "_metadata");
// create spark context
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaRDD<Map<String, Object>> sourceRDD = esRDD(jsc, inputCfg).values();
JavaEsSpark.saveToEs(sourceRDD, outputCfg);
sc.stop();
}
}
注意事项
- 需要准备Spark环境并连通Elasticsearch集群的网络。
- 需要自行实现Spark业务逻辑。
HDFS快照迁移
使用方式
BES已预装了repository-hdfs插件。您可以通过HDFS进行快照备份并恢复,使用方式与基于BOS的快照备份与恢复相类似,只是中转媒介换成了HDFS。
创建快照仓库的命令示例:
PUT _snapshot/my_hdfs_repository
{
"type": "hdfs",
"settings": {
"uri": "hdfs://192.168.1.101:8020",
"path": "/tmp/hadoop/es_repo"
}
}
配置详情可参考ES官网介绍repository-hdfs-config。
注意事项
- 需要准备资源足够的HDFS环境并连通网络。
- BES已预装了repository-hdfs插件。
- 目的集群恢复数据时,需要关闭相应index。
- 对源集群和目的集群的版本匹配有严格要求,具体要求参见上文表格。