ElasticsearchBES

    数据迁移

    用户在接入百度智能云Elasticsearch服务时,如需要百度智能云Elasticsearch服务间进行数据迁移或自建Elasticsearch服务数据迁移至百度智能云 Elasticsearch,可以根据自己的业务需求选择合适的迁移方案。本文介绍各迁移方案适用的场景,帮助您根据业务选择合适的场景进行迁移。

    • BOS快照迁移
    • 在线reindex
    • Logstash
    • 通过Spark迁移数据
    • HDFS快照迁移

    BOS快照迁移

    snapshot api 是Elasticsearch用于对数据进行备份和恢复的一组 api 接口,可以通过 snapshot api 进行跨集群的数据迁移,原理就是从源 Elasticsearch 集群创建数据快照,然后在目标 Elasticsearch 集群中进行恢复。

    百度智能云Elasticsearch基于百度存储BOS(Baidu Object Storage),实现了snapshot api,能够帮助您在本集群内和多集群间进行快照备份与恢复。

    具体使用方式参照基于BOS的快照备份恢复

    适用场景

    • 源端数据量较大(GB、TB、PB级别)的场景
    • 恢复数据过程中需要关闭目的集群index
    • 源Elasticsearch集群和目的集群Elasticsearch版本符合快照备份与恢复场景的匹配
    Snapshot version new Cluster version 2.x 5.x 6.x 7.x 8.x
    1.x icon-yes.png
    2.x icon-yes.png icon-yes.png
    5.x icon-yes.png icon-yes.png
    6.x icon-yes.png icon-yes.png
    7.x icon-yes.png icon-yes.png

    使用方式

    1. 两个集群分别创建基于BOS的仓库

    源Elasticsearch集群需要添加BOS信息来创建仓库。创建命令如下:

    POST /_snapshot/es_repo
    {
        "type": "bos",
        "settings": {
            "access_key": "your access_key",
            "secret_key": "your secret_key",
            "endpoint": "s3.bj.bcebos.com",
            "bucket": "es-repo",
            "base_path": "/test_base_path"
        }
    }

    相关参数意义:

    参数 作用
    type 仓库的类型,在这里您应该填bos
    access_key 百度智能云的access_key,可以在百度智能云的console中看到
    secret_key 百度智能云的secret_key,可以在百度智能云的console中看到
    endpoint BOS对应各个region的服务域名
    bucket BOS的bucket,务必保证对应的用户身份有读写bucket权限
    base_path 仓库的起始位置,默认为根目录

    BOS对应各个region的服务域名:

    区域 访问Endpoint
    北京 s3.bj.bcebos.com
    保定 s3.bd.bcebos.com
    苏州 s3.su.bcebos.com
    广州 s3.gz.bcebos.com
    香港 s3.hkg.bcebos.com
    金融云武汉专区 s3.fwh.bcebos.com

    在目的Elasticsearch集群创建仓库时,除了添加BOS信息外,还需要添加readonly参数,以保证只有一个集群可以对仓库进行操作。创建命令如下:

    POST /_snapshot/es_repo
    {
        "type": "bos",
        "settings": {
            "access_key": "your access_key",
            "secret_key": "your secret_key",
            "endpoint": "s3.bj.bcebos.com",
            "bucket": "es-repo",
            "base_path": "/test_base_path",
            "readonly": true
        }
    }
    1. 在源Elasticsearch集群创建数据的快照
    PUT /_snapshot/es_repo/snapshot_2020_01_01?wait_for_completion=true
    {
      "indices": "index_1,index_2",
      "ignore_unavailable": true,
      "include_global_state": false
    }
    1. 在目的Elasticsearch集群恢复数据快照
    POST /_snapshot/es_repo/snapshot_2020_01_01/_restore
    {
      "indices": "index_1,index_2",
      "include_global_state": false
    }

    具体参数详解参照基于BOS的快照备份恢复

    注意事项

    • 对于百度智能云Elasticsearch的集群,可以直接使用基于BOS的快照备份与恢复功能;对于用户自建Elasticsearch集群,需要安装百度智能云Elasticsearch repository-bos插件(如需安装,请在百度云控制台提交工单)。
    • 需要开通BOS服务并且创建bucket。
    • 并且源集群和目的集群能够访问BOS网络。
    • 源集群进行快照时,同时只能有一个快照运行,且有删除快照的操作时,不能进行快照。
    • 目的集群创建仓库时,需要添加readonly参数,以保证只有一个集群可以对仓库进行写操作。
    • 目的集群恢复数据时,恢复的index可以不存在,如果存在必须是closed状态。
    • 对源集群和目的集群的版本匹配有严格要求,具体要求参见上文表格。
    • 使用BOS跨region迁移数据时,可能会产生外网流量费用。
    • 对索引做快照的时候,对索引的变更操作不会在快照中出现,用户应该避免这种情况。如果必须变更数据,需要在迁移完后在新的集群进行回放变更操作。同理,其他迁移方式也要注意这点。

    在线reindex

    reindex from a remote cluster api是Elasticsearch提供的一个 api接口,可以把数据从源Elasticsearch集群导入到当前Elasticsearch集群,实现数据的迁移。原理是从源 Elasticsearch 集群中查询数据,然后写入到目标 Elasticsearch 集群中。

    适用场景

    • 在线迁移数据
    • 对迁移速度要求不高
    • 可以对源端集群数据进行查询

    使用方式

    • 在目的集群的elasticsearch.yml配置reindex白名单
    reindex.remote.whitelist:{IP}:{port}
    • 在目的集群创建index(如果不需要特殊处理分片数和mapping等,可以省略这一步)
    • 做reindex操作
    POST _reindex
    {
      "source": {
        "remote": {
          "host": "http://{oldhost}:{oldport}",
          "username": "{username}",
          "password": "{password}"
        },
        "index": "source_index",
        "query": {
          "match_all": {}
        }
      },
      "dest": {
        "index": "dest_index"
      }
    }

    详细使用方式参照reindex from a remote cluster api

    注意事项

    • 目的集群需要连通源集群的网络。
    • 需要重启目的集群配置reindex白名单(如需配置,请在百度云控制台提交工单)。
    • 迁移数据时,最好不要对索引有变更操作;如果必须变更数据,需要在迁移完后在新的集群进行回放变更操作。

    Logstash

    适用场景

    • 在线迁移数据
    • 可以对源端集群数据进行查询
    • 请注意 Logstash 版本的版本是否与Elastic相匹配,建议与 Elasticsearch 版本保持一致。版本兼容性说明请参见Elasticsearch系列产品兼容性

    使用方式

    • 安装部署Logstash与Java8以上的jdk环境。
    wget https://artifacts.elastic.co/downloads/logstash/logstash-7.4.2.tar.gz
    tar xvf logstash-7.4.2.tar.gz 
    • 根据数据源类型自定义配置文件*.conf,配置文件内容如下:
    input {
        elasticsearch {
            hosts => ["http://{oldhost}:{oldport}"]
            index => "*"
            docinfo => true
        }
    }
    output {
        elasticsearch {
            hosts => ["http://{newhost}:{newport}"]
            index => "%{[@metadata][_index]}"
        }
    }
    • 执行Logstash。
     nohup ./bin/logstash -f ~/*.conf 2>&1 >/dev/null &

    上述配置文件将源Elasticsearch集群的所有索引同步到目标集群中,也可以设置只同步指定的索引,Logstash 的更多功能可查阅 Logstash 官方文档

    注意事项

    • 需要准备Logstash环境并连通Elasticsearch集群的网络。

    通过Spark迁移数据

    Spark支持从一个Elasticsearch集群中读取数据然后写入到另一个Elasticsearch集群。

    使用方式

    Spark代码参考如下:

    import org.apache.spark.SparkConf;
    import org.apache.spark.SparkContext;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark.esRDD;
    
    public class MigrateESToES {
    
        public static void main(String[] args) {
    
            SparkConf conf = new SparkConf();
    
            conf.setAppName("BaiDu-Cloud-Migrate-ES-Example");
            conf.set("es.index.auto.create", "true");
    
            SparkContext sc = new SparkContext();
    
            // source cluster configuration
            Map<String, String> inputCfg = new HashMap<>();
            // node ip:port
            inputCfg.put("es.nodes", "{Source Elasticseach Host}:8200");
            inputCfg.put("es.resource", "test_size/_doc");
            // 如果没有开启认证的话,这两个选项可以不写
            // username
            // inputCfg.put("es.net.http.auth.user", "superuser");
            // password
            // inputCfg.put("es.net.http.auth.pass", "xxxx");
            // es_version,条件可以的话建议写上
            inputCfg.put("es.internal.es.version", "7.4.2");
            // 读取_version 字段
            // inputCfg.put("es.read.metadata.version", "true");
            // 读取_matedata 字段
            // inputCfg.put("es.read.metadata", "true");
    
            // dst cluster configuration
            Map<String, String> outputCfg = new HashMap<>();
            outputCfg.put("es.nodes", "{Dst Cluster}:8200");
            outputCfg.put("es.net.http.auth.user", "root");
            outputCfg.put("es.net.http.auth.pass", "root");
            outputCfg.put("es.internal.es.version", "7.4.2");
            outputCfg.put("es.resource", "test_size/doc");
    
            // 指定写入version type 为外部指定
            // outputCfg.put("es.mapping.version.type", "external");
            // 指定 index id
            // outputCfg.put("es.mapping.id", "_metadata._id");
            // 指定 routing id
            // outputCfg.put("es.mapping.routing", "_metadata._id");
            // 指定 doc version
            // outputCfg.put("es.mapping.version", "_metadata._version");
            // 关闭refresh
            outputCfg.put("es.batch.write.refresh", "false");
            // 单次写15mb
            outputCfg.put("es.batch.size.bytes", "15mb");
            //size (in entries) for batch writes using Elasticsearch bulk API
            outputCfg.put("es.batch.size.entries", "5000");
            // 异常不处理,打log,避免conflict 导致task failed
            outputCfg.put("es.write.rest.error.handlers", "log");
            // 异常日志前缀
            outputCfg.put("es.write.rest.error.handler.log.logger.name", "es_error_handler");
            // outputCfg.put("es.mapping.exclude", "_metadata");
    
            // create spark context
            JavaSparkContext jsc = new JavaSparkContext(sc);
    
            JavaRDD<Map<String, Object>> sourceRDD = esRDD(jsc, inputCfg).values();
    
            JavaEsSpark.saveToEs(sourceRDD, outputCfg);
            sc.stop();
        }
    }

    注意事项

    • 需要准备Spark环境并连通Elasticsearch集群的网络。
    • 需要自行实现Spark业务逻辑。

    HDFS快照迁移

    使用方式

    安装有基于HDFS的snapshot插件的Elasticsearch集群,可以通过HDFS进行快照备份并恢复。使用方式与基于BOS的快照备份与恢复相类似,只是中转媒介换成了HDFS。

    注意事项

    • 需要准备资源足够的HDFS环境并连通网络。
    • 需要安装有基于HDFS的snapshot插件(如需安装,请在百度云控制台提交工单)。
    • 目的集群恢复数据时,需要关闭相应index。
    • 对源集群和目的集群的版本匹配有严格要求,具体要求参见上文表格。
    上一篇
    备份恢复
    下一篇
    开发指南