实时Flink作业开发
实时Flink作业创建
点击新建按钮,弹出【新建作业】弹窗。选择Flink作业类型,目前Flink作业分为两大类:FlinkSQL作业 和Flink自定义作业。
FlinkSQL:用户通过写SQL的方式就能够进行实时Flink作业的开发。
Flink自定义:用户需要在线下编辑好Flink作业包,上传到平台之上。
选择【FlinkSQL】类型后,输入作业名称,点击【确定】。在【任务开发】列表中,显示创建任务。
选择【FlinkJar】类型后,点击【确定】,进行在【任务开发】列表中显示创建的任务。
实时Flink作业编辑
FlinkSQL作业
创建的FlinkSQL作业中,用户能够通过类SQL脚本进行任务开发。
SQL脚本模板如下:
--创建kafka表
drop table if EXISTS kafka_run_log;
create table kafka_run_log(
carId int,
fuel_consumption int,
traffic string,
speed int,
latitude double,
longitude double,
create_time TIMESTAMP(3)
) with (
'connector.type' = 'kafka',
'connector.version' = '0.10',
'connector.topic' = 'vehicle_run_info',
'connector.properties.bootstrap.servers' = ' XX.XX.XX.XX:9093',
'connector.startup-mode' = 'latest-offset',
'connector.properties.enable.auto.commit' = 'false',
'format.type' = 'csv'
);
/*
创建mysql表语句
*/
drop table if EXISTS mysql_run_log;
CREATE TABLE mysql_run_log (
carId int,
fuel_consumption INT,
traffic varchar(50),
speed int,
latitude double,
longitude double,
create_time TIMESTAMP(3)
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://XX.XX.XX.XX:3306/car_run_log?useSSL=false',
'connector.table' = 'mysql_run_log',
'connector.username' = 'edap_test',
'connector.password' = 'edap_test@1'
);
insert into
mysql_run_log
select
carId,
fuel_consumption,
traffic,
speed,
latitude,
longitude,
create_time
from
kafka_run_log;
select
count(carId)
from
mysql_run_log;
用户可以在脚本编辑框中,编写相应的FlinkSQL脚本。
FlinkJAR作业
用户可以在开发界面中调整jar包的存储路径。
实时Flink作业配置
在Flink作业开发完成之后,点击【基本信息】,显示可视化作业的基本信息,并能够进行描述修改。
点击【参数设置】,弹出Flink作业的参数设置。设置的参数可以在Flink作业中进行引用。
点击【引擎设置】弹出引擎设置弹出框。设置引擎相关的配置信息。
实时Flink作业的保存及测试运行
Flink作业开发完成后,点击上面【保存】按钮,进行作业保存。
点击【执行】,进行Flink作业测试运行,且在【执行信息】中弹出执行日志信息。
实时Flink作业导入导出
仅支持顶级【任务开发】目录下的文件夹/作业的导入导出(涵盖可视化、Flink、Spark)。
导入
点击【导入】按钮,页面弹窗,选择用户本地文件,支持单文件、且zip类型文件导入,如果选择文件非zip类型,会提示:「请选择zip压缩包」:
导入完成后,弹窗展示导入失败的文件名。
注意:同名脚本/作业进行覆盖。
导出
点击【导出】按钮,弹窗提示“是否确认导出该文件夹下所有作业?”
点击确定后,导出的文件会被打包为一个zip压缩包。
压缩包内包含文件夹及文件夹内所有的作业,作业格式仍为json格式,json包含内容与单作业导出内容保持一致(仅包含作业元信息,不包含引用的jar、文件)。