Uploading Logs Using Kafka Protocol
The log service supports uploading log data to the server via the Kafka protocol, meaning you can use the Kafka Producer SDK to collect log data and upload it to the log service via the Kafka protocol. This text introduces the operational steps for uploading logs to the log service via the Kafka protocol after collecting logs with a collection tool.
Background
Kafka, as a high-throughput message middleware, is commonly used as a message pipeline in self-built logstore scenarios. For example, logs can be collected from the log source server via the open-source collection tools or log data can be directly written by the Producer. The log service supports uploading log data via the Kafka protocol. The log upload function via the Kafka protocol does not require additional activation function or the installation of data collection tools on the data source side. Simple configuration enables Kafka Producer to collect and upload log information to the log service.
Limitations
- The supported Kafka protocol versions are 2.1.0-2.3.1.
- Supported compression methods include gzip, snappy, and lz4.
- To ensure the security of log transmission, the SASL_SSL connection protocol must be used. The corresponding username is the project name, and the password is the log service account key.
- When uploading logs via the Kafka protocol, the log format must be valid JSON, and the log timestamp must include the
@timestampfield (formatted as 2006-01-02T15:04:05Z07:00); An error will be returned for an invalid JSON format.
Parameter description
When uploading logs via the Kafka protocol, you need to configure the following parameters
| Parameters | Example | Description |
|---|---|---|
| Connection type | SASL_SSL | To ensure the security of log transmission, the SASL_SSL connection protocol must be used. The corresponding username is the log service project name, and the password is the account key. |
| username | default | Kafka SASL username. It should be configured as the project name of the log service. |
| password | ALTAKOGSZ***#ALTAK9sZr** | Kafka SASL user password. It should be configured as the Baidu AI Cloud account key. The format is ${AccessKey}#${SecretKey}, where: ${AccessKey} should be replaced with your AccessKey. ${SecretKey} should be replaced with your SecretKey. |
| host | bls-log.bj.baidubce.com:8200 | Initial cluster connection address, formatted as service address: port number, e.g., bls-log.bj.baidubce.com:8200, where the service address is the service entry for the log service in the current region. Please select the correct service entry based on the region. For details, refer to the service address. The port number is fixed at 8200. |
| topic | log-online | Configure as the logstore name |
Example
Uploading logs via Filebeat
- Configuration example
The parameter configuration used in the example can be found in the parameter description.
## Multiple input sources, writing to different logstores via the fields parameter
- type: log
fields:
logstore : "filebeat_1"
paths:
- /root/log/*.log
- type: log
fields:
logstore : "filebeat_2"
paths:
- /root/log/*.txt
## Increasing the internal queue of filebeat can improve the log upload rate (Reference documentation: https://www.elastic.co/docs/reference/beats/filebeat/configuring-internal-queue)
queue.mem:
events: 100000
## Kafka output configuration
output.kafka:
hosts: "[${host}]"
username: "${project}"
password: "${AccessKey}#${SecretKey}"
topic : '%{[fields.logstore]}' #Linked with input parameters to output to multiple different logstores
required_acks: 1
sasl.mechanism: PLAIN
ssl.enabled: true-
Limitations
- Filebeat's log output must use JSON format
- To ensure the security of log transmission, sasl.enabled:true and sasl.mechanism:PLAIN must be set.
- The default queue.mem.events configuration for filebeat is 3200. It is recommended to set it to 20000-100000 to improve log push performance.
Upload logs via Kafka Go SDK
- Dependencies
go get github.com/IBM/sarama- Code example
package main
import (
"crypto/tls"
"fmt"
"log"
"time"
"github.com/IBM/sarama"
)
func main() {
config := sarama.NewConfig()
config.Metadata.Full = false
config.Net.SASL.Mechanism = "PLAIN"
config.Net.SASL.Enable = true
config.Net.TLS.Enable = true
// username is the project name
config.Net.SASL.User = "${project}"
config.Producer.Return.Errors = true
// Baidu AI Cloud key
config.Net.SASL.Password = "${AccessKey}#${SecretKey}"
// hosts is the service address. For detailed instructions, refer to the parameter descriptions in this document
producer, err := sarama.NewAsyncProducer([]string{"{$hosts}"}, config)
if err != nil {
fmt.Println("new producer error:" + err.Error())
panic(err)
}
go func() {
for e := range producer.Errors() {
fmt.Println(e)
}
}()
channel := producer.Input()
for i := 0; i < 10; i++ {
channel <- &sarama.ProducerMessage{
// ${logStoreName} is the logstore name, e.g., log-online
Topic: "${logStoreName}",
Value: sarama.StringEncoder("{\"@timestamp\":\"2025-09-10T04:41:12.220Z\",\"level\":\"info\",\"latency\":0.008858347,\"status\":200,\"method\":\"POST\"}"),
}
}
time.Sleep(time.Minute)
producer.Close()
}Upload logs via Kafka Java SDK
- Dependencies
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>- Code example
package org.wjr.test;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
Properties props = new Properties();
// hosts is the service address. For detailed instructions, refer to the parameter descriptions in this document
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "${hosts}");
// Project name
String username = "${project}"
// Baidu AI Cloud key
String password = "${AccessKey}#${SecretKey}"
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule " +
"required username=\"" + username + "\" password=\"" + password + "\";");
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
// Call the send method.
Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>("java_test", "{\"@timestamp\":\"2025-09-17T02:41:12.220Z\",\"level\":\"info\",\"latency\":0.008858347,\"status\":200,\"method\":\"POST\"}"));
RecordMetadata recordMetadata = meta.get(100, TimeUnit.SECONDS);
// Wait for Kafka Producer to asynchronously send data
Thread.sleep(1000000);
producer.close();
}
}