使用Kafka协议上传日志
更新时间:2025-09-24
日志服务支持通过Kafka协议上传日志数据到服务端,即可以使用Kafka Producer SDK来采集日志数据,并通过Kafka协议上传到日志服务。文本介绍通过采集工具采集到日志后,使用Kafka协议将日志上传到日志服务的操作步骤
背景信息
Kafka 作为高吞吐量的消息中间件,常用于自建日志采集场景中的消息管道。例如,可在日志源服务器通过开源采集工具采集日志,或由 Producer 直接写入日志数据。日志服务支持通过 Kafka 协议上传日志数据。 使用 Kafka 协议上传日志功能,无需额外开启功能,无需在数据源侧安装数据采集工具,基于简单的配置即可实现 Kafka Producer 采集并上传日志信息到日志服务。
限制说明
- 支持的Kafka协议版本为2.1.0-2.3.1。
- 支持压缩方式包括 gzip、snappy、lz4。
- 为保证日志传输的安全性,必须使用 SASL_SSL 连接协议。对应的用户名为日志组名称,密码为日志服务账号密钥。
- 通过Kafka协议上传日志时,要求日志格式必须为合法的JSON格式,且日志时间戳应包含
@timestamp
字段(格式为2006-01-02T15:04:05Z07:00);对于不合法的JSON格式,会返回错误。
参数说明
使用 Kafka 协议上传日志时,您需要配置以下参数。
参数 | 示例 | 说明 |
---|---|---|
连接类型 | SASL_SSL | 为保证日志传输的安全性,必须使用 SASL_SSL 连接协议。对应的用户名为日志服务日志组名,密码为账号密钥。 |
username | default | Kafka SASL 用户名。应配置为日志服务的日志组名。 |
password | ALTAKOGSZ***#ALTAK9sZr** | Kafka SASL 用户密码。应配置为百度云账户密钥。格式为 ${AccessKey}#${SecretKey},其中:${AccessKey} 应替换为您的 AccessKey。${SecretKey} 应替换为您的SecretKey |
host | bls-log.bj.baidubce.com:8200 | 初始连接的集群地址,格式为服务地址:端口号,例如 bls-log.bj.baidubce.com:8200,其中:服务地址为当前地域下日志服务的服务入口。请根据地域选择正确的服务入口,详细信息请参见服务地址。端口号固定为 8200。 |
topic | log-online | 配置为日志集名称 |
示例
通过Filebeat上传日志
- 配置示例
示例中用的的参数配置请参见参数说明
YAML
1## 多输入源,通过fields参数实现写不同的日志集
2- type: log
3 fields:
4 logstore : "filebeat_1"
5 paths:
6 - /root/log/*.log
7- type: log
8 fields:
9 logstore : "filebeat_2"
10 paths:
11 - /root/log/*.txt
12
13## 增大filebeat内部队列,可提高日志上传速率 (参考文档:https://www.elastic.co/docs/reference/beats/filebeat/configuring-internal-queue)
14queue.mem:
15 events: 100000
16
17## kafka输出配置
18output.kafka:
19 hosts: "[${host}]"
20 username: "${project}"
21 password: "${AccessKey}#${SecretKey}"
22 topic : '%{[fields.logstore]}' #联动Input参数实现输出到多个不同的日志集
23 required_acks: 1
24 sasl.mechanism: PLAIN
25 ssl.enabled: true
-
限制说明
- filebeat的日志输出必须使用JSON格式
- 为保证日志传输的安全性,必须设置sasl.enabled:true和sasl.mechanism:PLAIN
- filebeat默认配置的queue.mem.events为3200,推荐设置20000-100000,以提高日志推送性能
通过Kafka Go SDK上传日志
- 依赖
Palin
1go get github.com/IBM/sarama
- 代码示例
Go
1package main
2
3import (
4 "crypto/tls"
5 "fmt"
6 "log"
7 "time"
8
9 "github.com/IBM/sarama"
10)
11
12func main() {
13 config := sarama.NewConfig()
14 config.Metadata.Full = false
15 config.Net.SASL.Mechanism = "PLAIN"
16 config.Net.SASL.Enable = true
17 config.Net.TLS.Enable = true
18 // username为日志组名称
19 config.Net.SASL.User = "${project}"
20 config.Producer.Return.Errors = true
21 // 百度云的密钥
22 config.Net.SASL.Password = "${AccessKey}#${SecretKey}"
23 // hosts为服务地址。具体说明,请参考本文中的参数说明
24 producer, err := sarama.NewAsyncProducer([]string{"{$hosts}"}, config)
25 if err != nil {
26 fmt.Println("new producer error:" + err.Error())
27 panic(err)
28 }
29 go func() {
30 for e := range producer.Errors() {
31 fmt.Println(e)
32 }
33 }()
34 channel := producer.Input()
35 for i := 0; i < 10; i++ {
36 channel <- &sarama.ProducerMessage{
37 // ${logStoreName}为日志集名称,例如log-online
38 Topic: "${logStoreName}",
39 Value: sarama.StringEncoder("{\"@timestamp\":\"2025-09-10T04:41:12.220Z\",\"level\":\"info\",\"latency\":0.008858347,\"status\":200,\"method\":\"POST\"}"),
40 }
41 }
42 time.Sleep(time.Minute)
43 producer.Close()
44}
通过Kafka Java SDK上传日志
- 依赖
XML
1<dependency>
2 <groupId>org.apache.kafka</groupId>
3 <artifactId>kafka-clients</artifactId>
4 <version>2.3.1</version>
5</dependency>
- 代码示例
Java
1package org.wjr.test;
2
3
4import org.apache.kafka.clients.CommonClientConfigs;
5import org.apache.kafka.clients.consumer.ConsumerConfig;
6import org.apache.kafka.clients.producer.Producer;
7import org.apache.kafka.clients.producer.ProducerConfig;
8import org.apache.kafka.clients.producer.ProducerRecord;
9import org.apache.kafka.clients.producer.RecordMetadata;
10import org.apache.kafka.common.config.SaslConfigs;
11
12import java.util.Properties;
13import java.util.concurrent.ExecutionException;
14import java.util.concurrent.Future;
15import java.util.concurrent.TimeUnit;
16import java.util.concurrent.TimeoutException;
17
18public class Main {
19 public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
20 Properties props = new Properties();
21 // hosts为服务地址。具体说明,请参考本文中的参数说明
22 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "${hosts}");
23 // 日志组名称
24 String username = "${project}"
25 // 百度云密钥
26 String password = "${AccessKey}#${SecretKey}"
27 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
28 props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
29 props.put(SaslConfigs.SASL_JAAS_CONFIG,
30 "org.apache.kafka.common.security.plain.PlainLoginModule " +
31 "required username=\"" + username + "\" password=\"" + password + "\";");
32 props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
33 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
34 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
35 Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
36 // 调用 send 方法。
37 Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>("java_test", "{\"@timestamp\":\"2025-09-17T02:41:12.220Z\",\"level\":\"info\",\"latency\":0.008858347,\"status\":200,\"method\":\"POST\"}"));
38 RecordMetadata recordMetadata = meta.get(100, TimeUnit.SECONDS);
39 // 等待Kafka Producer异步发送数据
40 Thread.sleep(1000000);
41 producer.close();
42 }
43}