EDAP资源组FlinkSQL作业读取Kafka维表关联MySQL写入EDAPDatalake表
更新时间:2024-08-28
首先,在【元数据管理】界面创建如下图的EDAPDataLake表,用来作为待写入的flink作业sink端。
在FlinkSQL作业中写入如下SQL,实现读取Kafka维表关联MySQL写入EDAPDatalake表。需要保证Kafka、MySQL服务与资源组网络联通,例如均处于同一个VPC内。
CREATE TABLE kafka_source (
log_time STRING,
log_ts AS TO_TIMESTAMP(log_time,'yyyy-MM-dd HH:mm:ss.SSS'),
user_id INT,
behavior STRING,
proctime as PROCTIME(),
WATERMARK FOR log_ts AS log_ts - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'eventLogJson',
'properties.bootstrap.servers' = 'kafka_nodeip:9093',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'SCRAM-SHA-512',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="***";'
);
CREATE TABLE mysql_user_profile (
user_id INT,
city STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysql_ip:3306/biz_online?useSSL=false',
'table-name' = 'user_profile',
'username' = 'xxx',
'password' = '***',
'lookup.cache' = 'PARTIAL',
'lookup.cache.max-rows' = '1000',
'lookup.partial-cache.expire-after-write' = '60 min'
);
CREATE CATALOG edap_catalog WITH ('type' = 'edap');
INSERT INTO edap_catalog.`default`.test_hive_sink_bos_datalake
/*+OPTIONS('partition.time-extractor.timestamp-pattern'='$day $hour:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='5 min',
'sink.partition-commit.policy.kind'='metastore,success-file')*/
SELECT ods.user_id, behavior, dim.city, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
FROM kafka_source as ods
LEFT JOIN mysql_user_profile FOR SYSTEM_TIME AS OF ods.proctime as dim ON ods.user_id = dim.user_id;
同时,如下图需要在【引擎设置】中配置"state.checkpoints.dir"="bos://<bucket name>/xx/xx",使得flink作业生成持久化的checkpoint文件、flink作业重启时可用于状态恢复。
在kafka topic中写入该数据后
{"log_time":"2023-01-05 13:08:23.867","user_id":"123","behavior":"matchV2"}
{"log_time":"2023-01-05 13:05:23.867","user_id":"678","behavior":"match"}
,同时mysql维表中包含下图数据
经过flinksql作业lookup join维表关联写入EDAPDatalake表后,可在EDAPDatalake表中查询到如下图结果数据。
flink作业运行过程中,如下图可在BOS控制台查看到配置的checkpoint bos路径中,在不断生成新的checkpoint文件。在修改了flink作业sql逻辑后,如果期望flink作业重启时不使用checkpoint文件进行状态恢复,在BOS控制台删除checkpoint bos路径中的所有文件即可。