Logstash接入Kafka专享版
更新时间:2024-01-03
前提条件
- 已创建消息服务 for kafka集群
- 下载并安装Logstash。具体操作,请参见Download Logstash。
- 下载并安装JDK 8。具体操作,请参见Download JDK 8。
Logstash接入
步骤一:获取接入点
具体请参考查看集群接入点
步骤二:创建/获取主题
步骤三:下载证书文件
SSL/SASL方式下载证书参考:如何下载证书?。
Logstash 发送消息
准备配置文件
创建jaas.conf配置文件
输入以下内容。
C++
1KafkaClient {
2 org.apache.kafka.common.security.plain.PlainLoginModule required
3 # 用户管理中创建用户的用户名
4 username="XXX"
5 # 用户管理中创建用户的密码
6 password="XXX";
7};
创建 output.conf 配置文件
输入以下内容。
C++
1input {
2 stdin{}
3}
4
5output {
6 kafka {
7 bootstrap_servers => "接入点"
8 topic_id => "topic名称"
9 # 固定为 SSL
10 security_protocol => "SSL"
11 # jaas.conf 文件地址
12 jaas_path => "/home/kafka/logstash-7.10.2/jaas.conf"
13 # 配置为 client.truststore.jks 文件路径
14 ssl.truststore.location => "/home/kafka/logstash-7.10.2/client.truststore.jks"
15 # 固定为 bms@kafka
16 ssl.truststore.password => "bms@kafka"
17 # 配置为 kafka_client_jaas.conf 文件路径
18 ssl.keystore.location => "/home/kafka/logstash-7.10.2/client.keystore.jks"
19 # 设置为client_ssl.properties文件内ssl.keystore.password
20 ssl.keystore.password => "qKiHmaov8GNR"
21 # SSL接入点辨识算法。6.x及以上版本Logstash需要加上该参数。空值
22 ssl_endpoint_identification_algorithm => ""
23 }
24}
Logstash发送消息
在安装目录下bin执行以下命令
Plain
1./logstash -f output.conf
Logstash 消费消息
准备配置文件
创建jaas.conf配置文件
输入以下内容。
C++
1KafkaClient {
2 org.apache.kafka.common.security.plain.PlainLoginModule required
3 username="XXX"
4 password="XXX";
5};
创建 input.conf 配置文件
输入以下内容。
C++
1input {
2 kafka {
3 bootstrap_servers => "接入点"
4 # topic名称
5 topics => ["logstash_test"]
6 # 固定为 SSL
7 security_protocol => "SSL"
8 # jaas.conf 文件地址
9 jaas_path => "/home/kafka/logstash-7.10.2/jaas.conf"
10 # 配置为 client.truststore.jks 文件路径
11 ssl.truststore.location => "/home/kafka/logstash-7.10.2/client.truststore.jks"
12 # 固定为 bms@kafka
13 ssl.truststore.password => "bms@kafka"
14 # 配置为 kafka_client_jaas.conf 文件路径
15 ssl.keystore.location => "/home/kafka/logstash-7.10.2/client.keystore.jks"
16 # 设置为client_ssl.properties文件内ssl.keystore.password
17 ssl.keystore.password => "qKiHmaov8GNR"
18
19 group_id => "logstash_group"
20 consumer_threads => 3
21 auto_offset_reset => "earliest"
22 }
23}
24
25output {
26 stdout {
27 codec => rubydebug
28 }
29}
Logstash消费消息
C++
1./logstash -f input.conf
查看集群监控
查看消息是否发送成功或消费成功有两种方式:
- 在服务器端查看jar包运行日志。
- 在专享版消息服务 for Kafka控制台查看集群监控,获取集群生产、消息情况。
推荐使用第二种方式,下面介绍如何查看集群监控。
(1)在专享版消息服务 for Kafka的控制台页面找到需要连接的集群,点击集群名称进入集群详情页面。
(2)页面跳转后,进入左侧边中的集群详情页面。
(3)点击左侧边栏中的集群监控,进入集群监控页面。
(4)通过查看集群监控页面,提供的不同纬度的监控信息(集群监控、节点监控、主题监控、消费组监控),即可获知集群的生产和消费情况。
集群监控的具体使用请参考:集群监控