Flink接入Kafka专享版
所有文档
menu

消息服务 for Kafka

Flink接入Kafka专享版

产品详情立即开通

接入准备

1. 购买专享版消息服务for Kafka集群

开通消息服务 for Kafka服务后,在控制台页面点击创建集群,即可进行购买,详情可参考创建集群

Flink接入1.png

2. 为购买的集群创建主题

在控制台页面点击集群名称,进入集群详情页面。

在左侧的边栏中点击主题管理,进入主题管理页面。

Flink接入2.png

在主题管理页面点击创建主题,进行主题的创建,详情参考创建主题

接入步骤

步骤一:获取集群接入点

具体请参考:查看集群接入点

步骤二:添加Maven配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink_sdk</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.7.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.example.KafkaConsumerDemo</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

步骤三:编写测试代码

  • 需要关注并自行修改的参数
参数名 含义
access_point 接入点信息
topic 主题名称
value 消息的具体内容
group_id 消费组id

生产者示例代码

创建KafkaProducerDemo.java文件,具体代码示例如下:

package org.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerDemo {
    public static void main(String args[]) throws Exception {

        //接入点设置
        String access_point = "x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092";
        //填写需要发送消息的主题名称
        String topic = "topic_name";
        //填写需要发送消息
        String value = "test flink message";
        // 创建配置类
        Properties props = new Properties();;
        // 指定kafka服务端所在位置
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, access_point);
        // Kafka消息的序列化方式。
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 请求的最长等待时间。
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        // 设置客户端内部重试次数。
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        // 设置客户端内部重试间隔。
        props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
        // 构造 Producer 对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);

        try {
            // 测试发送 100 条消息
            for (int i = 0; i < 100; i++) {
                ProducerRecord<String, String> kafkaMessage =  new ProducerRecord<>(topic, value + ": " + i);
                kafkaProducer.send(kafkaMessage, (RecordMetadata recordMetadata, Exception e) -> {
                    if (e == null) {
                        System.out.println("send success:" + recordMetadata.toString());
                    } else {
                        e.printStackTrace();
                        System.err.println("send fail");
                    }
                });
            }
        } catch (Exception e) {
            System.out.println("Something error has happened");
            e.printStackTrace();
        } finally {
            kafkaProducer.close();
        }
    }
}

消费者实例

创建KafkaConsumerDemo.java文件,具体代码示例如下

package org.example;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;


public class KafkaConsumerDemo {
    public static void main(String args[]) throws Exception {

        //接入点设置
        String access_point = "x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092";
        String group_id = "flink_group";
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建配置类
        Properties properties = new Properties();
        // 设置接入点
        properties.setProperty("bootstrap.servers", access_point);
        // 设置消费组id
        properties.setProperty("group.id", group_id);
        // FlinkKafkaConsumer的第一个参数填创建的topic名称
        DataStream<String> stream = env
                .addSource(new FlinkKafkaConsumer<>("topic_name", new SimpleStringSchema(), properties));
        // 打印输出
        stream.print();
        env.execute();
    }
}

步骤四:编译并运行

通过maven工具将上述代码打包后,上传至指定的服务器,并执行以下命令:

java -jar flink_sdk-1.0-SNAPSHOT.jar

步骤五:查看集群监控

查看消息是否发送成功或消费成功有两种方式:

  1. 在服务器端查看jar包运行日志。
  2. 在专享版消息服务 for Kafka控制台查看集群监控,获取集群生产、消息情况。

推荐使用第二种方式,下面介绍如何查看集群监控。

(1)在专享版消息服务 for Kafka的控制台页面找到需要连接的集群,点击集群名称进入集群详情页面。

监控信息1.png

(2)页面跳转后,进入左侧边中的集群详情页面。

监控信息2.png

(3)点击左侧边栏中的集群监控,进入集群监控页面。

监控信息3.png

(4)通过查看集群监控页面,提供的不同纬度的监控信息(集群监控、节点监控、主题监控、消费组监控),即可获知集群的生产和消费情况。

集群监控的具体使用请参考:集群监控

监控信息4.png

上一篇
Filebeat接入Kafka专享版
下一篇
Logstash接入Kafka专享版