EDAP资源组SparkSQL作业读取BOS写入RDS
更新时间:2024-08-28
首先,在【元数据管理】中选择【EDAPDataLake】中的某个database新建表。 假设BOS文件内容如下,每行为一个json字符串,每行之间以换行符分隔。
{"key1":"value1","key2":"value2","properties":{"temperature":15}}
{"key1":"value1+","key2":"value2+","properties":{"temperature":18}}
{"key1":"value1++","key2":"value2++","properties":{"temperature":27}}
表类型选择【物理表】【外部表】,存储路径指定为BOS路径。 【数据存储格式】选择【TEXTFILE】,【分隔符】输入"\n",添加value字段,date分区。 注意:要保证表bos路径下的子目录以"date="为前缀,即刚刚创建表时指定的分区字段名称。 同时,新建test_dwd中间表用于存储从原始json字符串中提取的字段。 接下来需要在【我的项目】界面创建项目,【资源管理】界面创建资源组,并用该资源组绑定刚刚创建的项目。 需要保证RDS服务与资源组属于相同VPC,且资源组与RDS服务的安全组配置允许资源组访问RDS服务,资源组的VPC、安全组配置如下图。 RDS服务【安全管理】的【白名单】配置如下图。 从【我的项目】界面进入刚刚创建的项目,在【脚本作业开发】中创建SparkSQL作业,【数据源类型】选择EDAPDatalake,【计算资源】选择刚刚创建的资源组,使用如下SQL可读取BOS文件数据写入中间表。
-- 在分布式存储中直接添加新分区目录时,表的元数据存储不会自动感知到这些新分区,此时需要运行 MSCK REPAIR TABLE 命令来同步元数据存储
MSCK REPAIR TABLE test_json_tb;
INSERT OVERWRITE TABLE test_dwd PARTITION(date='20240402')
SELECT
get_json_object(value, "$.key1") AS key1,
get_json_object(value, "$.key2") AS key2,
cast(get_json_object(value, "$.properties.temperature") AS int) AS temperature
FROM test_json_tb
where date='20240402';
select * from test_dwd;
接下来将BOS数据写入RDS服务。在RDS服务中执行如下语句准备好database、table。
CREATE DATABASE test_db;
USE test_db;
CREATE TABLE test_ads (
min_temperature int,
max_temperature int,
avg_temperature double,
date varchar(255)
);
执行如下SQL可将BOS数据经过聚合计算后写入RDS服务。注意:url字段填写RDS服务的内网IP。
CREATE TEMPORARY VIEW jdbcTable
USING jdbc
OPTIONS (
url 'jdbc:mysql://192.168.64.20',
dbtable 'edap_test.test_ads',
user 'xx',
password 'xx'
);
INSERT INTO jdbcTable
SELECT
min(temperature),
max(temperature),
avg(temperature),
date
FROM test_dwd
WHERE date='20240402'
group by date;