Spark访问Es常见问题
说明
本文档主要介绍了通过elasticsearch-hadoop
中的Spark
访问ES时常见配置项意义。本文中的es-spark
是elasticsearch-hadoop
中和Spark
相关联的包,用户通过自己的Spark
集群读写ES集群,elasticsearch-hadoop
基本上兼容了目前ES所有的版本
版本号检测异常
es-spark 运行时通常会自动检测ES集群的版本号,获取的版本号主要是用来对不同集群的版本做API的兼容处理
一般情况下用户不用关注ES版本号,但是在云上有时候自动检测集群的版本号会发生一些莫名其妙的检测不到的错误,可以通过配置解决:
配置项:
es.internal.es.version:"6.5.3"
在一些较新版本的es-spark
包中同样需要配置:
es.internal.es.cluster.name:"Your Cluter Name"
实现原理:
配置完以后,es-spark不会请求 / 目录,解析version,会直接使用用户配置的version:
INTERNAL_ES_VERSION = "es.internal.es.version"
INTERNAL_ES_CLUSTER_NAME = "es.internal.es.cluster.name"
public static EsMajorVersion discoverEsVersion(Settings settings, Log log) {
return discoverClusterInfo(settings, log).getMajorVersion();
}
// 不同版本的elasticsearch-hadoop可能会有差异
public static ClusterInfo discoverClusterInfo(Settings settings, Log log) {
ClusterName remoteClusterName = null;
EsMajorVersion remoteVersion = null;
// 尝试从配置中获取集群名字
String clusterName = settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_CLUSTER_NAME);
// 尝试从配置中获取集群UUID
String clusterUUID = settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_CLUSTER_UUID);
// 尝试从配置中获取ES version
String version = settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_VERSION);
// 如果集群名字和版本号没有从配置文件中拿到,则发起网络请求(请求根目录)
if (StringUtils.hasText(clusterName) && StringUtils.hasText(version)) {
if (log.isDebugEnabled()) {
log.debug(String.format("Elasticsearch cluster [NAME:%s][UUID:%s][VERSION:%s] already present in configuration; skipping discovery",
clusterName, clusterUUID, version));
}
remoteClusterName = new ClusterName(clusterName, clusterUUID);
remoteVersion = EsMajorVersion.parse(version);
return new ClusterInfo(remoteClusterName, remoteVersion);
}
....
}
如果启用集群名、版本号自动检测功能,需要保证分配给es-spark的用户有访问根目录/
的GET
权限
GET /
数据节点发现
配置项:
es.nodes.wan.only: false 默认为false
es.nodes.discovery: true 默认为true
百度云ES集群前面有一个BLB负载均衡,配置es.nodes
的时候写这个BLB的地址即可,需要保证es-spark
可以访问BLB的地址
es.nodes.wan.only: false
,es.nodes.discovery: true
: Spark会通过访问es.nodes
中指定的host
(可以为多个) 得到ES集群所有开启HTTP
服务节点的ip
和port
,后续对数据的访问会直接访问分片数据所在的节点上(需要保证ES集群所有节点都能够被Spark
集群访问到)es.nodes.wan.only: true
,es.nodes.discovery: false
或不设置:Spark发送给ES的所有请求都需要通过这个节点进行转发,效率相对比较低
具体代码逻辑:
ES_NODES_DISCOVERY = "es.nodes.discovery"
ES_NODES_WAN_ONLY = "es.nodes.wan.only"
ES_NODES_WAN_ONLY_DEFAULT = "false"
InitializationUtils#discoverNodesIfNeeded
public static List<NodeInfo> discoverNodesIfNeeded(Settings settings, Log log) {
if (settings.getNodesDiscovery()) { // 需要读取配置项
RestClient bootstrap = new RestClient(settings);
try {
List<NodeInfo> discoveredNodes = bootstrap.getHttpNodes(false);
if (log.isDebugEnabled()) {
log.debug(String.format("Nodes discovery enabled - found %s", discoveredNodes));
}
SettingsUtils.addDiscoveredNodes(settings, discoveredNodes);
return discoveredNodes;
} finally {
bootstrap.close();
}
}
return null;
}
public boolean getNodesDiscovery() {
// by default, if not set, return a value compatible with the WAN setting
// otherwise return the user value.
// this helps validate the configuration
return Booleans.parseBoolean(getProperty(ES_NODES_DISCOVERY), !getNodesWANOnly()); //默认值是!getNodesWANOnly()
}
public boolean getNodesWANOnly() {
return Booleans.parseBoolean(getProperty(ES_NODES_WAN_ONLY, ES_NODES_WAN_ONLY_DEFAULT));
}
用户权限问题
如果启动节点发现功能,需要保证es-spark使用的用户有访问
GET /_nodes/http
GET /{index}/_search_shards
的权限,否则会导致整个Job失败
配置Bulk导入有错误的处理方式
Spark 写ES的时候发生错误且经过几次尝试后会中断job,默认的情况下会直接中断当前Job,导致整个任务失败,如果只是想把导入失败的文档打印到日志中,可以通过如下配置解决:
配置错误发生错误的处理机制
es.write.rest.error.handlers = log
es.write.rest.error.handler.log.logger.name: es_error_handler
设置后当出现bulk写错误的时候,Job不会中断,会以log的形式输出到日志中,日志前缀为es_error_handler
如何拿到除_source外的其他文档元数据
正常情况下,我们通过调用ES的_search API返回的每条数据如下:
{
"_index": "d_index",
"_type": "doc",
"_id": "51rrB2sBaX4YjyPY-2EG",
"_score": 1,
"_source": {
"A": "field A value",
"B": "field B value"
}
}
但是用Spark去读ES的时候,默认是不读除_source
字段意外的其他字段,如_id
_version
, 在一些场景下,业务可能需要拿到_id
,可以通过如下配置:
es.read.metadata:true //这个配置默认是false
es.read.metadata.field: "_id" // 配置需要读取的元数据字段
es.read.metadata.version: 默认false 读取es的版本号
文档的元数据字段信息会放在一个_metadata
的字段里面
导入的时候指定id、version的方法
做数据迁移的时候,比如从一个低版本的ES集群迁移到高版的ES集群,我们可以用es-spark边度边写,如果需要指定_id
, _rouring
, _version
这些信息,可以设置:
es.mapping.id:”_meta._id” 指定json中id的路径
es.mapping.routing:”_meta._rouring” 指定json中id的路径
es.mapping.version:”_meta._version” 指定json中id的路径
_meta.xxx
是所需的字段在Json文档中的路径
spark默认写ES的时候refresh的会在每次bulk结束的时候调用
我们建议设置为false,有ES内部index
的 refresh_interval
来控制refresh
,否则ES集群会有大量的线程在refesh
会带来很大的CPU和磁盘压力
es.batch.write.refresh: false 默认是true
控制每次Bulk写入的量
es.batch.size.bytes:1mb 每次bulk写入文档的大小,默认1mb
es.batch.size.entries:1000 每次bulk写入的文档数,默认是1000
用户可根据ES集群套餐合理设置
读相关设置
es.scroll.size: 50, 默认是50 这个值相对来说比较小,可以适当增大至1000-10000
es.input.use.sliced.partitions: true 为了提高并发,es会进行scroll-slice进行切分
es.input.max.docs.per.partition: 100000 根据这个值进行切分slice
es-spark 读取 ES集群数据的时候,会按照每个分片的总数进行切分做scroll-slice
处理:
int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition);
numDocs为单个分片的文档总数,如果文档有5千万,这时候会切分成500个sclice,会对后端的线上ES集群造成巨大的CPU压力,所以一般建议关闭scroll-slice
,以避免影响在线业务
建议的参数:
es.scroll.size: 2000 //尽量根据文档的大小来选择
es.input.use.sliced.partitions: false
控制需要写入的文档字段
有时候业务导入数据的时候,希望有一些字段不被写如ES,可以设置:
es.mapping.exclude:默认是none
// 多个字段用逗号进行分割,直接 . * 表达
es.mapping.exclude = *.description
es.mapping.include = u*, foo.*
执行upsert操作
示例:
String update_params = "parmas:update_time";
String update_script = "ctx._source.update_time = params.update_time";
// 设置sparkConfig
SparkConf sparkConf = new SparkConf()
.setAppName("YourAppName”)
.set("es.net.http.auth.user", user)
.set("es.net.http.auth.pass", pwd)
.set("es.nodes", nodes)
.set("es.port", port)
.set("es.batch.size.entries", "50")
.set("es.http.timeout","5m")
.set("es.read.metadata.field", "_id")
.set("es.write.operation","upsert")
.set("es.update.script.params", update_params)
.set("es.update.script.inline", update_script)
.set("es.nodes.wan.only", "true");
注:
es.update.script.params 为执行更新需要的参数列表
es.update.script.inline 为执行update使用script脚本