Filebeat接入Kafka专享版
更新时间:2024-01-03
Filebeat
Filebeat是用于转发和集中日志数据的轻量级传送工具。Filebeat监视您指定的日志文件或位置,收集日志事件,并将它们转发到Elasticsearch或 Logstash进行索引。
Filebeat的工作方式如下:启动Filebeat时,它将启动一个或多个输入,这些输入将在为日志数据指定的位置中查找。对于Filebeat所找到的每个日志,Filebeat都会启动收集器。每个收集器都读取单个日志以获取新内容,并将新日志数据发送到libbeat,libbeat将聚集事件,并将聚集的数据发送到为Filebeat配置的输出。
工作流程图如下:
前提条件
- 下载并安装Filebeat(参见Download Filebeat)
- 下载并安装1.8或以上版本 JDK
- 已创建消息服务 for kafka集群
Filebeat接入
步骤一:获取接入点
具体请参考查看集群接入点
SSL/SASL方式下载证书参考:如何下载证书?。
步骤二:创建/获取主题
步骤三:Filebeat发送消息
准备配置文件
进入 Filebeat 的安装目录,创建配置监控文件 filebeat.yml,参数详情可参考Filebeat官网。
filebeat.inputs:
- input_type: log
# 此处为监听文件路径
paths:
- /var/log/*.log
#------------------------------- Kafka output ----------------------------------
output.kafka:
# 是否启用
enabled: true
# The list of Kafka broker addresses from which to fetch the cluster metadata.
# The cluster metadata contain the actual Kafka brokers events are published
# to
# 获取集群元数据的 Kafka 代理地址列表(接入点)
hosts: ["x.x.x.x:9095","x.x.x.x:9095","x.x.x.x:9095"]
# 用于生成事件的 Kafka 主题, 可以使用任何事件 field 的格式字符串, 若要从文档类型设置主题,请使用 `%{[type]}`.
topic: test
# 设置sasl协议字段
sasl.mechanism: "SCRAM-SHA-512"
# 设置 SSL协议字段
#SSL/SASL_SSL方式请下载证书文件: kafka-key.zip
#ssl.certificate_authorities:[/***/client_ssl.properties]
partition.random:
# If enabled, events will only be published to partitions with reachable leaders. Default is false.
# reachable_only 设置为true,则事件将仅发布到可用的分区
# 必须是 random, round_robin, hash 三种的一种
# 默认为 false
reachable_only: false
# Authentication details. Password is required if username is set.
# 身份验证的细节。如果设置了用户名,则需要密码。
# 没有设置可保持空值
username: ''
password: ''
# Sets the output compression codec. Must be one of none, snappy and gzip. The default is gzip.
# 设置输出压缩编解码器
# 必须是 none, snappy 和 gzip 中的一个
# 默认是 gzip
compression: none
# Set the compression level. Currently only gzip provides a compression level between 0 and 9. The default value is chosen by the compression algorithm.
# 设置压缩级别
# 目前只有 gzip 提供 0 到 9 之间的压缩级别
# 默认值由压缩算法选择
compression_level: 4
# The maximum permitted size of JSON-encoded messages. Bigger messages will be dropped. The default value is 1000000 (bytes). This value should be equal to r less than the broker's message.max.bytes.
# json编码消息的最大允许大小, 更大的消息将被删除。默认值是 1000000 ( 字节 ) 。这个值应该等于 r,小于 broker 的 message.max.bytes
max_message_bytes: 1000000
# The ACK reliability level required from broker. 0=no response, 1=wait for local commit, -1=wait for all replicas to commit. The default is 1. Note: If set to 0, no ACKs are returned by Kafka. Messages might be lost silently on error.
# 代理要求的ACK可靠性级别
# 0=无响应,1=等待本地提交,-1=等待所有副本提交
# 默认值是1
# 注意:如果设置为0,Kafka不会返回任何ack。出错时,消息可能会悄无声息地丢失。
required_acks: 1
# The configurable ClientID used for logging, debugging, and auditing purposes. The default is "beats".
# 可配置的ClientID,用于日志记录、调试和审计。默认是“beats”。
client_id: beats
Filebeat 发送消息
在filebeat安装路径执行如下命令
./filebeat -e -c filebeat.yml
步骤四:Filebeat消费消息
准备配置文件
filebeat.inputs:
- type: kafka
hosts:
- ip:端口
- ip:端口
- ip:端口
# 身份验证的细节。如果设置了用户名,则需要密码。
# 没有设置可保持空值
username: ""
password: ""
#Topic的名称。
topics: ["filebeat_test"]
#Group的名称。
group_id: "filebeat_group"
#证书所在位置
#SSL/SASL_SSL方式请下载证书文件: kafka-key.zip
#ssl.certificate_authorities:[/***/client_ssl.properties]
ssl.verification_mode: none
output.console:
pretty: true
Filebeat消费消息
在filebeat安装路径执行如下命令
./filebeat -c ./input.yml
步骤五:查看集群监控
查看消息是否发送成功或消费成功有两种方式:
- 在服务器端查看jar包运行日志。
- 在专享版消息服务 for Kafka控制台查看集群监控,获取集群生产、消息情况。
推荐使用第二种方式,下面介绍如何查看集群监控。
(1)在专享版消息服务 for Kafka的控制台页面找到需要连接的集群,点击集群名称进入集群详情页面。
(2)页面跳转后,进入左侧边中的集群详情页面。
(3)点击左侧边栏中的集群监控,进入集群监控页面。
(4)通过查看集群监控页面,提供的不同纬度的监控信息(集群监控、节点监控、主题监控、消费组监控),即可获知集群的生产和消费情况。
集群监控的具体使用请参考:集群监控