Flink示例
更新时间:2025-01-23
前提条件
已完成创建 BMR 集群,并且配置了 Paimon、Flink 组件,详情请参见创建集群。
注意事项
- BMR Flink 不使用 Hive Metastore 元数据,可以使用文件系统存储元数据,可通过 Hive 和 Spark 操作.
操作示例
- SSH登录集群,参考SSH连接到集群;
- 参考以下命令,启动 /opt/bmr/flink/bin/yarn-session.sh:
su - hdfs
/opt/bmr/flink/bin/yarn-session.sh
- 执行 flink-sql a. 在新的命令行窗口,重复执行上述命令 注意:如果是存算分离类型的 BMR 集群,warehouse 地址需改为 BOS 地址,参考以下命令。
CREATE CATALOG my_catalog WITH (
'type'='paimon',
'warehouse'='hdfs:///warehouse/paimon/flink'
);
USE CATALOG my_catalog;
-- create a word count table
CREATE TABLE word_count (
word STRING PRIMARY KEY NOT ENFORCED,
cnt BIGINT
);
insert into word_count values('abc', 1);
-- 等一会,或者点开 yarn 界面,找到 application, 进入 application Master,点击 Running jobs 和 Completed Jobs。看到 insert 语句执行完就可以。
select * from word_count;
- 通过 Flink 创建 paimon 表,参考以下命令:
CREATE TABLE `my_catalog`.`default`.`word_count` (
`word` VARCHAR(2147483647) NOT NULL,
`cnt` BIGINT,
CONSTRAINT `PK_word` PRIMARY KEY (`word`) NOT ENFORCED
) WITH (
'path' = 'hdfs:/warehouse/paimon/flink/default.db/word_count'
)
- 在 Hive 中查询示例如下:注意需要创建外表。
CREATE EXTERNAL TABLE external_test_table
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
LOCATION 'hdfs:/warehouse/paimon/flink/default.db/word_count';
select * from external_test_table;