百度智能云

All Product Document

          Log Service

          Uploading Logs Using Kafka Protocol

          The log service supports uploading log data to the server via the Kafka protocol, meaning you can use the Kafka Producer SDK to collect log data and upload it to the log service via the Kafka protocol. This text introduces the operational steps for uploading logs to the log service via the Kafka protocol after collecting logs with a collection tool.

          Background

          Kafka, as a high-throughput message middleware, is commonly used as a message pipeline in self-built logstore scenarios. For example, logs can be collected from the log source server via the open-source collection tools or log data can be directly written by the Producer. The log service supports uploading log data via the Kafka protocol. The log upload function via the Kafka protocol does not require additional activation function or the installation of data collection tools on the data source side. Simple configuration enables Kafka Producer to collect and upload log information to the log service.

          Limitations

          • The supported Kafka protocol versions are 2.1.0-2.3.1.
          • Supported compression methods include gzip, snappy, and lz4.
          • To ensure the security of log transmission, the SASL_SSL connection protocol must be used. The corresponding username is the project name, and the password is the log service account key.
          • When uploading logs via the Kafka protocol, the log format must be valid JSON, and the log timestamp must include the @timestamp field (formatted as 2006-01-02T15:04:05Z07:00); An error will be returned for an invalid JSON format.

          Parameter description

          When uploading logs via the Kafka protocol, you need to configure the following parameters

          Parameters Example Description
          Connection type SASL_SSL To ensure the security of log transmission, the SASL_SSL connection protocol must be used. The corresponding username is the log service project name, and the password is the account key.
          username default Kafka SASL username. It should be configured as the project name of the log service.
          password ALTAKOGSZ***#ALTAK9sZr** Kafka SASL user password. It should be configured as the Baidu AI Cloud account key. The format is ${AccessKey}#${SecretKey}, where: ${AccessKey} should be replaced with your AccessKey. ${SecretKey} should be replaced with your SecretKey.
          host bls-log.bj.baidubce.com:8200 Initial cluster connection address, formatted as service address: port number, e.g., bls-log.bj.baidubce.com:8200, where the service address is the service entry for the log service in the current region. Please select the correct service entry based on the region. For details, refer to the service address. The port number is fixed at 8200.
          topic log-online Configure as the logstore name

          Example

          Uploading logs via Filebeat

          • Configuration example
            The parameter configuration used in the example can be found in the parameter description.
          ## Multiple input sources, writing to different logstores via the fields parameter
          - type: log
            fields:
              logstore : "filebeat_1"
            paths:
              - /root/log/*.log
          - type: log
            fields:
              logstore : "filebeat_2"
            paths:
              - /root/log/*.txt
          
          ##  Increasing the internal queue of filebeat can improve the log upload rate (Reference documentation: https://www.elastic.co/docs/reference/beats/filebeat/configuring-internal-queue)
          queue.mem:
            events: 100000
          ##  Kafka output configuration
          output.kafka:
            hosts: "[${host}]"
            username: "${project}"
            password: "${AccessKey}#${SecretKey}"
            topic : '%{[fields.logstore]}' #Linked with input parameters to output to multiple different logstores
            required_acks: 1
            sasl.mechanism: PLAIN
            ssl.enabled: true
          • Limitations

            • Filebeat's log output must use JSON format
            • To ensure the security of log transmission, sasl.enabled:true and sasl.mechanism:PLAIN must be set.
            • The default queue.mem.events configuration for filebeat is 3200. It is recommended to set it to 20000-100000 to improve log push performance.

          Upload logs via Kafka Go SDK

          • Dependencies
          go get github.com/IBM/sarama
          • Code example
          package main
          import (
          	"crypto/tls"
          	"fmt"
          	"log"
          	"time"
          	"github.com/IBM/sarama"
          )
          func main() {
          	config := sarama.NewConfig()
          	config.Metadata.Full = false
          	config.Net.SASL.Mechanism = "PLAIN"
          	config.Net.SASL.Enable = true
          	config.Net.TLS.Enable = true
              // username is the project name
          	config.Net.SASL.User = "${project}"
          	config.Producer.Return.Errors = true
              // Baidu AI Cloud key
          	config.Net.SASL.Password = "${AccessKey}#${SecretKey}"
              // hosts is the service address. For detailed instructions, refer to the parameter descriptions in this document
          	producer, err := sarama.NewAsyncProducer([]string{"{$hosts}"}, config)
              if err != nil {
                  fmt.Println("new producer error:" + err.Error())
                  panic(err)
              }
          	go func() {
          		for e := range producer.Errors() {
          			fmt.Println(e)
          		}
          	}()
          	channel := producer.Input()
          	for i := 0; i < 10; i++ {
          		channel <- &sarama.ProducerMessage{
                      // ${logStoreName} is the logstore name, e.g., log-online
          			Topic: "${logStoreName}",
          			Value: sarama.StringEncoder("{\"@timestamp\":\"2025-09-10T04:41:12.220Z\",\"level\":\"info\",\"latency\":0.008858347,\"status\":200,\"method\":\"POST\"}"),
          		}
          	}
          	time.Sleep(time.Minute)
          	producer.Close()
          }

          Upload logs via Kafka Java SDK

          • Dependencies
          <dependency>
              <groupId>org.apache.kafka</groupId>
              <artifactId>kafka-clients</artifactId>
              <version>2.3.1</version>
          </dependency>
          • Code example
          package org.wjr.test;
          import org.apache.kafka.clients.CommonClientConfigs;
          import org.apache.kafka.clients.consumer.ConsumerConfig;
          import org.apache.kafka.clients.producer.Producer;
          import org.apache.kafka.clients.producer.ProducerConfig;
          import org.apache.kafka.clients.producer.ProducerRecord;
          import org.apache.kafka.clients.producer.RecordMetadata;
          import org.apache.kafka.common.config.SaslConfigs;
          import java.util.Properties;
          import java.util.concurrent.ExecutionException;
          import java.util.concurrent.Future;
          import java.util.concurrent.TimeUnit;
          import java.util.concurrent.TimeoutException;
          public class Main {
              public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
                  Properties props = new Properties();
                  // hosts is the service address. For detailed instructions, refer to the parameter descriptions in this document
                  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "${hosts}");
                  // Project name
                  String username = "${project}"
                  // Baidu AI Cloud key
                  String password = "${AccessKey}#${SecretKey}"
                  props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
                  props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
                  props.put(SaslConfigs.SASL_JAAS_CONFIG,
                          "org.apache.kafka.common.security.plain.PlainLoginModule " +
                                  "required username=\"" + username + "\" password=\"" + password + "\";");
                  props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
                  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
                  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
                  Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
                  // Call the send method.
                  Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>("java_test", "{\"@timestamp\":\"2025-09-17T02:41:12.220Z\",\"level\":\"info\",\"latency\":0.008858347,\"status\":200,\"method\":\"POST\"}"));
                  RecordMetadata recordMetadata = meta.get(100, TimeUnit.SECONDS);
                  // Wait for Kafka Producer to asynchronously send data
                  Thread.sleep(1000000);
                  producer.close();
              }
          }
          Previous
          Transmission Task Collection
          Next
          Logset Management