数据迁移
所有文档

          Elasticsearch BES

          数据迁移

          用户在接入百度智能云Elasticsearch服务时,如需要百度智能云Elasticsearch服务间进行数据迁移或自建Elasticsearch服务数据迁移至百度智能云 Elasticsearch,可以根据自己的业务需求选择合适的迁移方案。本文介绍各迁移方案适用的场景,帮助您根据业务选择合适的场景进行迁移。

          • BOS快照迁移
          • 在线reindex
          • Logstash
          • 通过Spark迁移数据
          • HDFS快照迁移

          BOS快照迁移

          snapshot api 是Elasticsearch用于对数据进行备份和恢复的一组 api 接口,可以通过 snapshot api 进行跨集群的数据迁移,原理就是从源 Elasticsearch 集群创建数据快照,然后在目标 Elasticsearch 集群中进行恢复。

          百度智能云Elasticsearch基于百度存储BOS(Baidu Object Storage),实现了snapshot api,能够帮助您在本集群内和多集群间进行快照备份与恢复。

          具体使用方式参照基于BOS的快照与恢复

          适用场景

          • 源端数据量较大(GB、TB、PB级别)的场景
          • 恢复数据过程中需要关闭目的集群index
          • 源Elasticsearch集群和目的集群Elasticsearch版本符合快照备份与恢复场景的匹配
          Snapshot version new Cluster version 2.x 5.x 6.x 7.x 8.x
          1.x icon-yes.png
          2.x icon-yes.png icon-yes.png
          5.x icon-yes.png icon-yes.png
          6.x icon-yes.png icon-yes.png
          7.x icon-yes.png icon-yes.png

          使用方式

          1. 两个集群分别创建基于BOS的仓库

          源Elasticsearch集群需要添加BOS信息来创建仓库。创建命令如下:

          POST /_snapshot/es_repo
          {
              "type": "bos",
              "settings": {
                  "access_key": "your access_key",
                  "secret_key": "your secret_key",
                  "endpoint": "s3.bj.bcebos.com",
                  "bucket": "es-repo",
                  "base_path": "/test_base_path"
              }
          }

          相关参数意义:

          参数 作用
          type 仓库的类型,在这里您应该填bos
          access_key 百度智能云的access_key,可以在百度智能云的console中看到
          secret_key 百度智能云的secret_key,可以在百度智能云的console中看到
          endpoint BOS对应各个region的服务域名
          bucket BOS的bucket,务必保证对应的用户身份有读写bucket权限
          base_path 仓库的起始位置,默认为根目录

          BOS对应各个region的服务域名:

          区域 访问Endpoint
          北京 s3.bj.bcebos.com
          保定 s3.bd.bcebos.com
          苏州 s3.su.bcebos.com
          广州 s3.gz.bcebos.com
          香港 s3.hkg.bcebos.com
          金融云武汉专区 s3.fwh.bcebos.com

          在目的Elasticsearch集群创建仓库时,除了添加BOS信息外,还需要添加readonly参数,以保证只有一个集群可以对仓库进行操作。创建命令如下:

          POST /_snapshot/es_repo
          {
              "type": "bos",
              "settings": {
                  "access_key": "your access_key",
                  "secret_key": "your secret_key",
                  "endpoint": "s3.bj.bcebos.com",
                  "bucket": "es-repo",
                  "base_path": "/test_base_path",
                  "readonly": true
              }
          }
          1. 在源Elasticsearch集群创建数据的快照
          PUT /_snapshot/es_repo/snapshot_2020_01_01?wait_for_completion=true
          {
            "indices": "index_1,index_2",
            "ignore_unavailable": true,
            "include_global_state": false
          }
          1. 在目的Elasticsearch集群恢复数据快照
          POST /_snapshot/es_repo/snapshot_2020_01_01/_restore
          {
            "indices": "index_1,index_2",
            "include_global_state": false
          }

          具体参数详解参照基于BOS的快照备份恢复

          注意事项

          • 对于百度智能云Elasticsearch的集群,可以直接使用基于BOS的快照备份与恢复功能;对于用户自建Elasticsearch集群,需要安装百度智能云Elasticsearch repository-bos插件(如需安装,请在百度云控制台提交工单)。
          • 需要开通BOS服务并且创建bucket。
          • 并且源集群和目的集群能够访问BOS网络。
          • 源集群进行快照时,同时只能有一个快照运行,且有删除快照的操作时,不能进行快照。
          • 目的集群创建仓库时,需要添加readonly参数,以保证只有一个集群可以对仓库进行写操作。
          • 目的集群恢复数据时,恢复的index可以不存在,如果存在必须是closed状态。
          • 对源集群和目的集群的版本匹配有严格要求,具体要求参见上文表格。
          • 使用BOS跨region迁移数据时,可能会产生外网流量费用。
          • 对索引做快照的时候,对索引的变更操作不会在快照中出现,用户应该避免这种情况。如果必须变更数据,需要在迁移完后在新的集群进行回放变更操作。同理,其他迁移方式也要注意这点。

          在线reindex

          reindex from a remote cluster api是Elasticsearch提供的一个 api接口,可以把数据从源Elasticsearch集群导入到当前Elasticsearch集群,实现数据的迁移。原理是从源 Elasticsearch 集群中查询数据,然后写入到目标 Elasticsearch 集群中。

          适用场景

          • 在线迁移数据
          • 对迁移速度要求不高
          • 可以对源端集群数据进行查询

          使用方式

          • 在目的集群的elasticsearch.yml配置reindex白名单
          reindex.remote.whitelist:{IP}:{port}
          • 在目的集群创建index(如果不需要特殊处理分片数和mapping等,可以省略这一步)
          • 做reindex操作
          POST _reindex
          {
            "source": {
              "remote": {
                "host": "http://{oldhost}:{oldport}",
                "username": "{username}",
                "password": "{password}"
              },
              "index": "source_index",
              "query": {
                "match_all": {}
              }
            },
            "dest": {
              "index": "dest_index"
            }
          }

          详细使用方式参照reindex from a remote cluster api

          注意事项

          • 目的集群需要连通源集群的网络。
          • 需要重启目的集群配置reindex白名单(如需配置,请在百度云控制台提交工单)。
          • 迁移数据时,最好不要对索引有变更操作;如果必须变更数据,需要在迁移完后在新的集群进行回放变更操作。

          Logstash

          适用场景

          • 在线迁移数据
          • 可以对源端集群数据进行查询
          • 请注意 Logstash 版本的版本是否与Elastic相匹配,建议与 Elasticsearch 版本保持一致。版本兼容性说明请参见Elasticsearch系列产品兼容性

          使用方式

          • 安装部署Logstash与Java8以上的jdk环境。
          wget https://artifacts.elastic.co/downloads/logstash/logstash-7.4.2.tar.gz
          tar xvf logstash-7.4.2.tar.gz 
          • 根据数据源类型自定义配置文件*.conf,配置文件内容如下:
          input {
              elasticsearch {
                  hosts => ["http://{oldhost}:{oldport}"]
                  index => "*"
                  docinfo => true
              }
          }
          output {
              elasticsearch {
                  hosts => ["http://{newhost}:{newport}"]
                  index => "%{[@metadata][_index]}"
              }
          }
          • 执行Logstash。
           nohup ./bin/logstash -f ~/*.conf 2>&1 >/dev/null &

          上述配置文件将源Elasticsearch集群的所有索引同步到目标集群中,也可以设置只同步指定的索引,Logstash 的更多功能可查阅 Logstash 官方文档

          注意事项

          • 需要准备Logstash环境并连通Elasticsearch集群的网络。

          通过Spark迁移数据

          Spark支持从一个Elasticsearch集群中读取数据然后写入到另一个Elasticsearch集群。

          使用方式

          Spark代码参考如下:

          import org.apache.spark.SparkConf;
          import org.apache.spark.SparkContext;
          import org.apache.spark.api.java.JavaRDD;
          import org.apache.spark.api.java.JavaSparkContext;
          import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
          
          import java.util.HashMap;
          import java.util.Map;
          
          import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark.esRDD;
          
          public class MigrateESToES {
          
              public static void main(String[] args) {
          
                  SparkConf conf = new SparkConf();
          
                  conf.setAppName("BaiDu-Cloud-Migrate-ES-Example");
                  conf.set("es.index.auto.create", "true");
          
                  SparkContext sc = new SparkContext();
          
                  // source cluster configuration
                  Map<String, String> inputCfg = new HashMap<>();
                  // node ip:port
                  inputCfg.put("es.nodes", "{Source Elasticseach Host}:8200");
                  inputCfg.put("es.resource", "test_size/_doc");
                  // 如果没有开启认证的话,这两个选项可以不写
                  // username
                  // inputCfg.put("es.net.http.auth.user", "superuser");
                  // password
                  // inputCfg.put("es.net.http.auth.pass", "xxxx");
                  // es_version,条件可以的话建议写上
                  inputCfg.put("es.internal.es.version", "7.4.2");
                  // 读取_version 字段
                  // inputCfg.put("es.read.metadata.version", "true");
                  // 读取_matedata 字段
                  // inputCfg.put("es.read.metadata", "true");
          
                  // dst cluster configuration
                  Map<String, String> outputCfg = new HashMap<>();
                  outputCfg.put("es.nodes", "{Dst Cluster}:8200");
                  outputCfg.put("es.net.http.auth.user", "root");
                  outputCfg.put("es.net.http.auth.pass", "root");
                  outputCfg.put("es.internal.es.version", "7.4.2");
                  outputCfg.put("es.resource", "test_size/doc");
          
                  // 指定写入version type 为外部指定
                  // outputCfg.put("es.mapping.version.type", "external");
                  // 指定 index id
                  // outputCfg.put("es.mapping.id", "_metadata._id");
                  // 指定 routing id
                  // outputCfg.put("es.mapping.routing", "_metadata._id");
                  // 指定 doc version
                  // outputCfg.put("es.mapping.version", "_metadata._version");
                  // 关闭refresh
                  outputCfg.put("es.batch.write.refresh", "false");
                  // 单次写15mb
                  outputCfg.put("es.batch.size.bytes", "15mb");
                  //size (in entries) for batch writes using Elasticsearch bulk API
                  outputCfg.put("es.batch.size.entries", "5000");
                  // 异常不处理,打log,避免conflict 导致task failed
                  outputCfg.put("es.write.rest.error.handlers", "log");
                  // 异常日志前缀
                  outputCfg.put("es.write.rest.error.handler.log.logger.name", "es_error_handler");
                  // outputCfg.put("es.mapping.exclude", "_metadata");
          
                  // create spark context
                  JavaSparkContext jsc = new JavaSparkContext(sc);
          
                  JavaRDD<Map<String, Object>> sourceRDD = esRDD(jsc, inputCfg).values();
          
                  JavaEsSpark.saveToEs(sourceRDD, outputCfg);
                  sc.stop();
              }
          }

          注意事项

          • 需要准备Spark环境并连通Elasticsearch集群的网络。
          • 需要自行实现Spark业务逻辑。

          HDFS快照迁移

          使用方式

          安装有基于HDFS的snapshot插件的Elasticsearch集群,可以通过HDFS进行快照备份并恢复。使用方式与基于BOS的快照备份与恢复相类似,只是中转媒介换成了HDFS。

          注意事项

          • 需要准备资源足够的HDFS环境并连通网络。
          • 需要安装有基于HDFS的snapshot插件(如需安装,请在百度云控制台提交工单)。
          • 目的集群恢复数据时,需要关闭相应index。
          • 对源集群和目的集群的版本匹配有严格要求,具体要求参见上文表格。
          上一篇
          监控报警
          下一篇
          账户管理