Flink接入Kafka专享版
更新时间:2023-07-05
接入准备
1. 购买专享版消息服务for Kafka集群
开通消息服务 for Kafka服务后,在控制台页面点击创建集群,即可进行购买,详情可参考创建集群。
2. 为购买的集群创建主题
在控制台页面点击集群名称,进入集群详情页面。
在左侧的边栏中点击主题管理,进入主题管理页面。
在主题管理页面点击创建主题,进行主题的创建,详情参考创建主题。
接入步骤
步骤一:获取集群接入点
具体请参考:查看集群接入点。
步骤二:添加Maven配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink_sdk</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.example.KafkaConsumerDemo</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
步骤三:编写测试代码
- 需要关注并自行修改的参数
参数名 | 含义 |
---|---|
access_point | 接入点信息 |
topic | 主题名称 |
value | 消息的具体内容 |
group_id | 消费组id |
生产者示例代码
创建KafkaProducerDemo.java文件,具体代码示例如下:
package org.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerDemo {
public static void main(String args[]) throws Exception {
//接入点设置
String access_point = "x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092";
//填写需要发送消息的主题名称
String topic = "topic_name";
//填写需要发送消息
String value = "test flink message";
// 创建配置类
Properties props = new Properties();;
// 指定kafka服务端所在位置
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, access_point);
// Kafka消息的序列化方式。
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 请求的最长等待时间。
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
// 设置客户端内部重试次数。
props.put(ProducerConfig.RETRIES_CONFIG, 5);
// 设置客户端内部重试间隔。
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
// 构造 Producer 对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
try {
// 测试发送 100 条消息
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> kafkaMessage = new ProducerRecord<>(topic, value + ": " + i);
kafkaProducer.send(kafkaMessage, (RecordMetadata recordMetadata, Exception e) -> {
if (e == null) {
System.out.println("send success:" + recordMetadata.toString());
} else {
e.printStackTrace();
System.err.println("send fail");
}
});
}
} catch (Exception e) {
System.out.println("Something error has happened");
e.printStackTrace();
} finally {
kafkaProducer.close();
}
}
}
消费者实例
创建KafkaConsumerDemo.java文件,具体代码示例如下
package org.example;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaConsumerDemo {
public static void main(String args[]) throws Exception {
//接入点设置
String access_point = "x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092";
String group_id = "flink_group";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建配置类
Properties properties = new Properties();
// 设置接入点
properties.setProperty("bootstrap.servers", access_point);
// 设置消费组id
properties.setProperty("group.id", group_id);
// FlinkKafkaConsumer的第一个参数填创建的topic名称
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("topic_name", new SimpleStringSchema(), properties));
// 打印输出
stream.print();
env.execute();
}
}
步骤四:编译并运行
通过maven工具将上述代码打包后,上传至指定的服务器,并执行以下命令:
java -jar flink_sdk-1.0-SNAPSHOT.jar
步骤五:查看集群监控
查看消息是否发送成功或消费成功有两种方式:
- 在服务器端查看jar包运行日志。
- 在专享版消息服务 for Kafka控制台查看集群监控,获取集群生产、消息情况。
推荐使用第二种方式,下面介绍如何查看集群监控。
(1)在专享版消息服务 for Kafka的控制台页面找到需要连接的集群,点击集群名称进入集群详情页面。
(2)页面跳转后,进入左侧边中的集群详情页面。
(3)点击左侧边栏中的集群监控,进入集群监控页面。
(4)通过查看集群监控页面,提供的不同纬度的监控信息(集群监控、节点监控、主题监控、消费组监控),即可获知集群的生产和消费情况。
集群监控的具体使用请参考:集群监控