数据迁移
所有文档
menu

Elasticsearch BES

数据迁移

产品详情自助选购

用户在接入百度智能云Elasticsearch服务时,如需要百度智能云Elasticsearch服务间进行数据迁移或自建Elasticsearch服务数据迁移至百度智能云 Elasticsearch,可以根据自己的业务需求选择合适的迁移方案。本文介绍各迁移方案适用的场景,帮助您根据业务选择合适的场景进行迁移。

  • BOS快照迁移
  • 在线reindex
  • Logstash
  • 通过Spark迁移数据
  • HDFS快照迁移

BOS快照迁移

snapshot api 是Elasticsearch用于对数据进行备份和恢复的一组 api 接口,可以通过 snapshot api 进行跨集群的数据迁移,原理就是从源 Elasticsearch 集群创建数据快照,然后在目标 Elasticsearch 集群中进行恢复。

百度智能云Elasticsearch基于百度存储BOS(Baidu Object Storage),实现了snapshot api,能够帮助您在本集群内和多集群间进行快照备份与恢复。

具体使用方式参照基于BOS的快照与恢复

适用场景

  • 源端数据量较大(GB、TB、PB级别)的场景
  • 恢复数据过程中需要关闭目的集群index
  • 源Elasticsearch集群和目的集群Elasticsearch版本符合快照备份与恢复场景的匹配
Snapshot version new Cluster version 2.x 5.x 6.x 7.x 8.x
1.x icon-yes.png
2.x icon-yes.png icon-yes.png
5.x icon-yes.png icon-yes.png
6.x icon-yes.png icon-yes.png
7.x icon-yes.png icon-yes.png

使用方式

  1. 两个集群分别创建基于BOS的仓库

源Elasticsearch集群需要添加BOS信息来创建仓库。创建命令如下:

POST /_snapshot/es_repo
{
    "type": "bos",
    "settings": {
        "access_key": "your access_key",
        "secret_key": "your secret_key",
        "endpoint": "s3.bj.bcebos.com",
        "bucket": "es-repo",
        "base_path": "test_base_path"
    }
}

相关参数意义:

参数 作用
type 仓库的类型,在这里您应该填bos
access_key 百度智能云的access_key,可以在百度智能云的console中看到
secret_key 百度智能云的secret_key,可以在百度智能云的console中看到
endpoint BOS对应各个region的服务域名
bucket BOS的bucket,务必保证对应的用户身份有读写bucket权限
base_path 仓库的起始位置,默认为根目录

BOS对应各个region的服务域名:

区域 访问Endpoint
北京 s3.bj.bcebos.com
保定 s3.bd.bcebos.com
苏州 s3.su.bcebos.com
广州 s3.gz.bcebos.com
香港 s3.hkg.bcebos.com
金融云武汉专区 s3.fwh.bcebos.com

在目的Elasticsearch集群创建仓库时,除了添加BOS信息外,还需要添加readonly参数,以保证只有一个集群可以对仓库进行操作。创建命令如下:

POST /_snapshot/es_repo
{
    "type": "bos",
    "settings": {
        "access_key": "your access_key",
        "secret_key": "your secret_key",
        "endpoint": "s3.bj.bcebos.com",
        "bucket": "es-repo",
        "base_path": "test_base_path",
        "readonly": true
    }
}
  1. 在源Elasticsearch集群创建数据的快照
PUT /_snapshot/es_repo/snapshot_2020_01_01?wait_for_completion=true
{
  "indices": "index_1,index_2",
  "ignore_unavailable": true,
  "include_global_state": false
}
  1. 在目的Elasticsearch集群恢复数据快照
POST /_snapshot/es_repo/snapshot_2020_01_01/_restore
{
  "indices": "index_1,index_2",
  "include_global_state": false
}

具体参数详解参照基于BOS的快照备份恢复

注意事项

  • 需要开通BOS服务并且创建bucket。
  • 并且源集群和目的集群能够访问BOS网络。
  • 源集群进行快照时,同时只能有一个快照运行,且有删除快照的操作时,不能进行快照。
  • 目的集群创建仓库时,需要添加readonly参数,以保证只有一个集群可以对仓库进行写操作。
  • 目的集群恢复数据时,恢复的index可以不存在,如果存在必须是closed状态。
  • 对源集群和目的集群的版本匹配有严格要求,具体要求参见上文表格。
  • 使用BOS跨region迁移数据时,可能会产生外网流量费用。
  • 对索引做快照的时候,对索引的变更操作不会在快照中出现,用户应该避免这种情况。如果必须变更数据,需要在迁移完后在新的集群进行回放变更操作。同理,其他迁移方式也要注意这点。

在线reindex

reindex from a remote cluster api是Elasticsearch提供的一个 api接口,可以把数据从源Elasticsearch集群导入到当前Elasticsearch集群,实现数据的迁移。原理是从源 Elasticsearch 集群中查询数据,然后写入到目标 Elasticsearch 集群中。

适用场景

  • 在线迁移数据
  • 对迁移速度要求不高
  • 可以对源端集群数据进行查询

使用方式

  • 在目的集群的elasticsearch.yml配置reindex白名单
reindex.remote.whitelist: {IP}:{port}
  • 在目的集群创建index(如果不需要特殊处理分片数和mapping等,可以省略这一步)
  • 做reindex操作
POST _reindex
{
  "source": {
    "remote": {
      "host": "http://{oldhost}:{oldport}",
      "username": "{username}",
      "password": "{password}"
    },
    "index": "source_index",
    "query": {
      "match_all": {}
    }
  },
  "dest": {
    "index": "dest_index"
  }
}

详细使用方式参照reindex from a remote cluster api

注意事项

  • 目的集群需要连通源集群的网络。
  • 需要重启目的集群配置reindex白名单。
白名单修改可以在百度云页面【配置修改】进行修改,
注意:
1、修改配置后,需要重启集群才生效。
2、配置格式为yml格式。注意,在【冒号】后【配置值】前要有一个“空格”。

image.png

  • 迁移数据时,最好不要对索引有变更操作;如果必须变更数据,需要在迁移完后在新的集群进行回放变更操作。

Logstash

适用场景

  • 在线迁移数据
  • 可以对源端集群数据进行查询
  • 请注意 Logstash 版本的版本是否与Elastic相匹配,建议与 Elasticsearch 版本保持一致。版本兼容性说明请参见Elasticsearch系列产品兼容性

使用方式

  • 安装部署Logstash与Java8以上的jdk环境。
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.4.2.tar.gz
tar xvf logstash-7.4.2.tar.gz 
  • 根据数据源类型自定义配置文件*.conf,配置文件内容如下:
input {
    elasticsearch {
        hosts => ["http://{oldhost}:{oldport}"]
        index => "*"
        docinfo => true
    }
}
output {
    elasticsearch {
        hosts => ["http://{newhost}:{newport}"]
        index => "%{[@metadata][_index]}"
    }
}
  • 执行Logstash。
 nohup ./bin/logstash -f ~/*.conf 2>&1 >/dev/null &

上述配置文件将源Elasticsearch集群的所有索引同步到目标集群中,也可以设置只同步指定的索引,Logstash 的更多功能可查阅 Logstash 官方文档

注意事项

  • 需要准备Logstash环境并连通Elasticsearch集群的网络。

通过Spark迁移数据

Spark支持从一个Elasticsearch集群中读取数据然后写入到另一个Elasticsearch集群。

使用方式

Spark代码参考如下:

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;

import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark.esRDD;

public class MigrateESToES {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf();

        conf.setAppName("Baidu-Cloud-Migrate-ES-Example");
        conf.set("es.index.auto.create", "true");

        SparkContext sc = new SparkContext();

        // source cluster configuration
        Map<String, String> inputCfg = new HashMap<>();
        // node ip:port
        inputCfg.put("es.nodes", "{Source Elasticseach Host}:8200");
        // 要迁移的索引和文档类型,例如test_size索引,文档类型_doc
        inputCfg.put("es.resource", "test_size/_doc");
        // 如果没有开启认证的话,这两个选项可以不写
        // username
        // inputCfg.put("es.net.http.auth.user", "superuser");
        // password
        // inputCfg.put("es.net.http.auth.pass", "xxxx");
        // es_version,条件可以的话建议写上
        inputCfg.put("es.internal.es.version", "7.4.2");
        // 读取_version 字段
        // inputCfg.put("es.read.metadata.version", "true");
        // 读取_matedata 字段
        // inputCfg.put("es.read.metadata", "true");

        // dst cluster configuration
        Map<String, String> outputCfg = new HashMap<>();
        outputCfg.put("es.nodes", "{Dst Cluster}:8200");
        outputCfg.put("es.net.http.auth.user", "root");
        outputCfg.put("es.net.http.auth.pass", "root");
        outputCfg.put("es.internal.es.version", "7.4.2");
        // 目标索引和文档类型
        outputCfg.put("es.resource", "test_size/_doc");

        // 指定写入version type 为外部指定
        // outputCfg.put("es.mapping.version.type", "external");
        // 指定 index id
        // outputCfg.put("es.mapping.id", "_metadata._id");
        // 指定 routing id
        // outputCfg.put("es.mapping.routing", "_metadata._id");
        // 指定 doc version
        // outputCfg.put("es.mapping.version", "_metadata._version");
        // 关闭refresh
        outputCfg.put("es.batch.write.refresh", "false");
        // 单次写15mb
        outputCfg.put("es.batch.size.bytes", "15mb");
        //size (in entries) for batch writes using Elasticsearch bulk API
        outputCfg.put("es.batch.size.entries", "5000");
        // 异常不处理,打log,避免conflict 导致task failed
        outputCfg.put("es.write.rest.error.handlers", "log");
        // 异常日志前缀
        outputCfg.put("es.write.rest.error.handler.log.logger.name", "es_error_handler");
        // outputCfg.put("es.mapping.exclude", "_metadata");

        // create spark context
        JavaSparkContext jsc = new JavaSparkContext(sc);

        JavaRDD<Map<String, Object>> sourceRDD = esRDD(jsc, inputCfg).values();

        JavaEsSpark.saveToEs(sourceRDD, outputCfg);
        sc.stop();
    }
}

注意事项

  • 需要准备Spark环境并连通Elasticsearch集群的网络。
  • 需要自行实现Spark业务逻辑。

HDFS快照迁移

使用方式

BES已预装了repository-hdfs插件。您可以通过HDFS进行快照备份并恢复,使用方式与基于BOS的快照备份与恢复相类似,只是中转媒介换成了HDFS。

创建快照仓库的命令示例:

PUT _snapshot/my_hdfs_repository
{
  "type": "hdfs",
  "settings": {
    "uri": "hdfs://192.168.1.101:8020",
    "path": "/tmp/hadoop/es_repo"
  }
}

配置详情可参考ES官网介绍repository-hdfs-config

注意事项

  • 需要准备资源足够的HDFS环境并连通网络。
  • BES已预装了repository-hdfs插件。
  • 目的集群恢复数据时,需要关闭相应index。
  • 对源集群和目的集群的版本匹配有严格要求,具体要求参见上文表格。
上一篇
插件管理
下一篇
账户管理