简介:百度智能云数据仓库 Palo 是基于业内领先的 OLAP 数据库 Apache Doris 构建的
在数据分析的实际场景中,冷热数据往往面临着不同的查询频次及响应速度要求。例如在日志分析场景中,历史数据的访问频次很低,但需长时间备份以保证后续的审计和回溯的工作;在行为分析场景中,需支持近期流量数据的高频查询且时效性要求高,但为了保证历史数据随时可查,往往要求数据保存周期更为久远。
通常来说,历史数据的应用价值会随着时间推移而降低,且需要应对的查询需求也会随之锐减。而随着历史数据的不断增多,如果我们将所有数据存储在本地,将造成大量的资源浪费。
为了解决以上问题,冷热数据分层技术应运而生。顾名思义,冷热分离是将冷热数据分别存储在成本不同的存储介质上,这项技术目前被广泛用于各个数仓产品。百度智能云数据仓库 Palo 2.0 for Apache Doris 版本提供了冷热分层的功能,把部分冷数据放到对象存储中,以此实现成本效益的最大化。
百度智能云数据仓库 Palo 是基于业内领先的 OLAP 数据库 Apache Doris 构建的 MPP 架构云数据仓库,本文也将围绕「冷热分离功能的使用及实现原理」重点介绍。
添加远端存储:创建转冷策略表和绑定转冷策略。
#添加远端存储,使用对象存储的 Bucket 以及 AK/SK 创建 Resource。
CREATE EXTERNAL RESOURCE "baidu_bos_s3"
PROPERTIES
(
"type" = "s3",
"AWS_ENDPOINT" = "s3.bj.bcebos.com",
"AWS_REGION" = "bj",
"AWS_BUCKET" = "${BUCKET}",
"AWS_ROOT_PATH" = "/palo/storage",
"AWS_ACCESS_KEY" = "${AWS_ACCESS_KEY}",
"AWS_SECRET_KEY" = "${AWS_SECRET_KEY}",
"AWS_MAX_CONNECTIONS" = "50",
"AWS_REQUEST_TIMEOUT_MS" = "3000",
"AWS_CONNECTION_TIMEOUT_MS" = "3000"
);
#创建转冷策略
方式1:设置ttl时间 (推荐采用)
CREATE STORAGE POLICY testPolicy
PROPERTIES(
"storage_resource" = "baidu_bos_s3",
"cooldown_ttl" = "5"
);
方式2:设置固定的转冷时间
CREATE STORAGE POLICY testPolicy
PROPERTIES(
"storage_resource" = "baidu_bos_s3",
"cooldown_datetime" = "2023-06-07 21:00:00"
);
#策略绑定
方式1:整表绑定
CREATE TABLE TestTbl
(
aa BIGINT
)
ENGINE=olap
DISTRIBUTED BY HASH (aa) BUCKETS 1
PROPERTIES(
"replication_num" = "1",
"storage_policy" = "testPolicy"
);
方式2:指定分区绑定(推荐采用,每个分区可以采用不同的策略,控制更为灵活)
ALTER TABLE create_table_partition MODIFY PARTITION (*) SET("storage_policy"="test_policy");
插入数据:
insert into TestTbl values(1);
insert into TestTbl values(2);
insert into TestTbl values(3);
insert into TestTbl values(4);
insert into TestTbl values(5);
插入数据后在 BE 中产生 6 个数据文件:
超过 ttl 时间数据会进行转冷,转冷时的关键日志:
执行:grep “Upload rowset” http://be.INFO
通过 show tablets from xxx 查看每个 tablet 的转冷大小:
转冷后可以看到本地文件的数据被删除。
在对象存储 BOS 中的数据文件:
我们可以看到,当前增加了新的 meta 文件,这部分我们将在下文 4.3 小节详细介绍。
为了更直观地观测冷热的有关行为、便于运维监控。Doris 提供了冷热 4 个监控指标,监控指标支持在 grafana 中配置。
添加参数到 BE 节点的配置文件 conf/be.conf 中,并重启 BE 节点让配置生效。
查看 BE 上所有的配置项: http://Host:HttpPort/varz
开启 File Cache 后,在 query profile 可以查看缓存的命中情况:
冷热分离功能在实现上主要涉及到 FE 和 BE 元数据的操作。为了更深入地理解冷热分离的原理,首先需要了解 FE 元数据和 BE 元数据之间的组织关系及相关概念。
说明:
为便于大家理解,下面将以简单表举例:
CREATE TABLE `TestTbl` (
`aa` BIGINT NULL,
`b` int
) ENGINE=OLAP
DUPLICATE KEY(`aa`)
PARTITION BY RANGE(`aa`)
(PARTITION p1 VALUES [("-9223372036854775808"), ("10")),
PARTITION p2 VALUES [("10"), ("20")),
DISTRIBUTED BY HASH(`aa`) BUCKETS 1
//创建1个上卷表,就会生一个MaterializedIndex
create materialized view mv_max as select aa, max(bb) from TestTbl group by aa;
MySQL [test_db]> show tables;
+-------------------+
| Tables_in_test_db |
+-------------------+
| TestTbl |
+-------------------+
2 rows in set (0.001 sec)
#层级关系
TestTbl
|--P1
|-TestTbl
|-Tablet1
|-Replica1
|-Replica2
|-Replica3
|-mv_max
|-Tablet2
|-Replica1
|-Replica2
|-Replica3
|--P2
|-TestTbl
|-Tablet2
|-Replica1
|-Replica2
|-Replica2
|-mv_max
|-Tablet3
|-Replica1
通过 show proc 命令查看:
下面为大家介绍 show proc 的灵感来源。
Linux 中的 proc 系统类似,Doris 中的 proc 系统也被组织成一个类似目录的结构,根据用户指定的「目录路径(proc 路径)」,用来查看不同的系统信息。
proc 系统主要面向系统管理人员,方便系统管理人员查看系统内部的运行状态。如表 tablet 状态、集群均衡状态、各种作业的状态等。
一般查 FE 的元数据,只要记住上述这一个命令就够了。
比如用 stream load 导入 10G 的文件,会生成 1 个 rowset、40 个 segment 文件。
show tablet 命令:
直接在浏览器中打开 MetaUrl ,即可获得 tablet 在 BE 的层级信息。
在了解 FE 和 BE 的元数据组织方法后,我们将详细为大家介绍如何实现冷热分离。Doris 的每个表的每个分片默认有 3 个副本,转冷本质就是把冷数据挪到远端存储上,需要考虑下面 3 件事:
Doris 会副本中选择 1 个作为 leader 来负责转冷相关操作(包括数据上传/元数据上传/冷数据 Compaction /无效数据删除),其他的副本作为作为 follower,负责同步 leader 的转冷信息,包括数据和元数据。
问题来了,如何选择到合适的 replica 作为 leader ,目前有 2 种方式:
Doris 采用的是方案 2,随机选一个。因为方式 1 需要 FE 保存每个 replica 的 compaction 的进度。版本、复杂度提高,并且合并进度时刻动态变化。如当前时刻某个节点比较快,下一刻速度变慢,总体达不到少上传数据的目的,所以采用随机方案。
选择出 leader 后,FE 会把 leader 信息同步给每个 replica,并保存在 BE 的meta 信息里,leader 由 Cooldownreplicaid 和 term 来表示。term 是个自增的,表示第几个任期,BE 拿到 Cooldownreplicaid 后与自己的 replicaid 比较,如果一致说明是 leader ,从而进行下一步的相关数据操作。
控制 leader 对数据进行转冷粒度,有两种方案:
leader 进行转冷过程本身并不复杂,到期后把 rowset 上传、并更新 meta 即可。复杂一点是 follower 节点对数据进行同步,因为每个 follower 本身有自己的 compaction 逻辑,进度与同步的过程会有差异。整体主要分为 2 种情况:
接下来举例子来说明,假设有 1 个 TestTbl 表、3 个副本
CREATE TABLE
TestTbl(
bint ) ENGINE=OLAP
先后插入 5 条数据:
R1、R2、R3 的 compaction 的进度上左图所示,其中 R1 为 leader 未进行 compaction,版本最多有 5 个,R2 版本 [1-1][2-2] 合并为了 [1-2],只有 4 个 R3 合并的最快只剩 2 个 rowset,R1 中的 rowset 1 到期后,转冷过程就 2 个步骤:
注:leader 每次只会对 1 个 rowset 进行转冷,默认间隔 20s。
Follower 节点同步过程:
注:虚框表示数据在远端存储上。
R2、R3 从 meta 读取 rowset 后,发现转冷的 rowset 只有 1-1 版本,本地最低的版本是1-2、1-4、1-1,无重叠,无法覆盖本地。所以本次同步不更新 rowset 的信息,只更新 cooldown_meta_id 。转冷整个过程持续进行,所以leader 只要有转冷操作,cooldown_meta_id 就会变,follower 需要同步 leader 的变动。这时如果用命令查看副本的冷数据时会看到某些副本的 remotedatasize 为 0,有些不为 0,也是符合预期的。
假设先后插入上方 5 条数据后各副本的进度如下图所示,R1 没有进行过 merge,R2 版本 [2-2][3-3] merge 为了 [2-3]、R3[3-3][4-4] merge 为了 [3-4]。
Leader 的 rowset [1-1] 到期转冷到对象存储。
R2、R3 从 meta 读取 rowset 后,发现转冷的 rowset 1-1 与自己本地的 rowset 有重叠,则删除本地版本 1-1,拷贝远端 [1-1] 的 meta 信息, 并更新cooldown_meta_id,完成转冷同步。在这个过程中,您可以查看 rowset 中 meta 信息的 resource_id 字段是否为空来判断是本地 rowset /远端 rowset。
ttl 到期后, 最终 leader 节点上的所有 rowset 都会被转冷,同时 follower 会同步 leader 上的所有转冷的 rowset。
Doris 2.0 版本中支持了对冷却到对象存储的冷数据进行 Compaction(ColdDataCompaction)。通过冷数据 Compaction,将冷数据重新组织并压缩成更紧凑的格式,从而减少存储空间的占用,提高存储效率,整个过程如下图所示:
注:只有 leader 节点才会进行冷数据 Compaction。当前冷数据 Compaction 功能默认不打开。
经过冷数据的 Compaction 或用户删除了某些分区后,对象存储会存在出现一些不再使用的无效数据。假设 Compaction 后, [1-1][2-2][3-3][4-4][5-5] 合并为了 [1-5],需要进行清理原来不再使用的数据。
远端存储的文件清理流程较为复杂,不能直接进行删除。因为需要保证所有的 follower 都同步 leader 最新的转冷结果,所有删除的时候需要有两次确认的过程。清理流程如下:
上图 [1-1][2-2][3-3][4-4][5-5] 属于不再使用的数据,leader 跟 FE 确认所有 replica 同步完成后,进行清理。
因为 leader 是随机选择的,所以有概率存在 leader 的版本落后于 follower 的问题。因为对于 Doris 的写入来说,超过半数节点写成功就能提交。比如我们再插入 3 条数据:
R1 由于某些原因以上 3 条数据插入失败了, R2、R3 插入成功,R1 是 leader,落后 R2、R3 3 个版本, 在经过一段时间 R1 上的数据全部转冷,缺失的 3 个版本如何转冷呢?
Doris 支持 clone 修复,当 FE 发现某个副本版本缺失后,会自动进行版本补齐。所以 leader 缺失的版本补齐后再进行转冷,从而保证随机选择的 leader 在版本缺失的场景下也能正常完成转冷。下图展示 leader 落后的转冷过程:
为了优化冷数据为了查询的性能,Doris 2.0 引入 Cache 的概念。在冷却后首次命中,Doris 会将已经冷却的数据又重新加载到 BE 的本地磁盘,Cache 有以下特性:
为了解数据转冷后对性能的影响, 我们对 SSB-sf500 测试集进行了冷热测试,全部转冷后,相比本地性能下降 10 倍。
注:测试集群 3Fe+3BE, 8c*32G。
测试集数据:
https://github.com/apache/doris/tree/master/tools/ssb-tools/ssb-queries
为了提升冷数据的查询性能, 引入数据缓存 (File Cache) 通过缓存最近访问的远端存储系统 ( HDFS 或对象存储) 的数据文件,加速后续访问相同数据的查询速度。在频繁访问相同数据的查询场景中,File Cache 可以避免重复的远端数据访问开销,提升热点数据的查询分析性能和稳定性
其实现原理是:File Cache 将访问的远程数据缓存到本地的 BE 节点。原始的数据文件会根据访问的 I/O 大小切分为 Block。Block 被存储到本地文件cache_path/hash(filepath).substr(0, 3)/hash(filepath)/offset 中,并在 BE 节点中保存 Block 的元信息。Block 的默认大小为 1M。
访问相同的远程文件时,Doris 会检查本地缓存中是否存在该文件的缓存数据,并根据 block 的 offset 和 size,确认哪些数据从本地 block 读取,哪些数据从远程拉起,缓存远程拉取的新数据。BE 节点重启的时候,扫描 cache_path 目录,恢复 block 的元信息。当缓存大小达到阈值上限的时候,按照 LRU 原则清理长久未访问的 block。
有了 File Cache 后,下面为大家进一步介绍读取过程。比如,我们需要读取远端存储中 0-64KB 范围的内容。
磁盘上 blockcache 文件列表样例:
我们在配置 10G 大小的 File Cache 后,再测试下 ssb-sf500 的性能表现:
可以看到,相较没有 Cache 时,整体性能提升 3 倍。原因是通过 Cache 减少了网络的 I/O 同时由于 Doris 是列存预读效果好,比如 select sum(A) from xx 对 A 读取了 0-64KB 数据后,大概还会读取 64-128KB,而由于 64-128KB 的数据已经在 block 中了,缓存命中率高。
通过分析我们发现 Cache 的能发挥作用主要在于两点: 预读与减少网络 I/O。Cache 的大小似乎不太重要,因为只要能满足当次查询缓存的大小就能加速,为了验证这个结论,我们再做两组测试,
调整后,我们发现 Cache 加大后对性能的提升效果并不明显,这也符合我们的预期,同时 Cache 缩小 Cache 对性能也不会显著恶化,因此我们可以得出 1 个结论:有 Cache 很重要,但需要把握 Cache 调整的维度。比如: