百度智能云

All Product Document

          Log Service

          Real-Time Consumption

          Overview

          Log service supports consumption via the Kafka protocol, meaning a logstore can be treated as a Kafka Topic for consumption. The following describes the steps for consuming log data via the Kafka protocol.

          Basic concepts

          Concepts Description
          Consumer group (ConsumerGroup) A consumer group is a virtual collection of multiple consumers. When consuming log data from the consumer groupdimension, all consumers in the consumer group subscribe to the same logstore and collectively consume data from that logstore.
          Consumer A client that consumes data from the log service and is part of a consumer group. Each consumer within the same consumer group must have a unique name. At any given time, a shard of the logstore will be assigned to one consumer in the consumer group, and a single consumer may be responsible for multiple shards.
          Checkpoint During the consumption of a shard by a consumer, the current checkpoint (i.e., cursor progress) of the shard will be continuously recorded and reported to the server. This serves as the starting consumption cursor when the program restarts, ensuring data is not consumed repeatedly.

          Fee

          Free during the public beta period. Specific charge time will be notified via email, SMS, and in-site messages.

          Console operations

          Enable real-time consumption

          • Enable real-time consumption: Enable real-time consumption from the logstore dimension. Enable real-time consumption from the logstore dimension by navigating to Operation - More - Enable Real-time Consumption for the logstore. (Note: Enabling real-time consumption may affect log query performance and cannot be disabled once enabled.)

          image.png

          Real-time consumption task

          • How to consume:

          Support consumption using the Kafka protocol, which is largely consistent with consuming Kafka.

          You need to configure the following parameters when using the Kafka client.

          Parameters Example Description
          Connection type SASL_SSL To ensure the security of log transmission, the SASL_SSL connection protocol must be used. The corresponding username is the log service project name, and the password is the account key.
          username default Kafka SASL username. It should be configured as the project name of the log service.
          password ALTAKOGSZ***#ALTAK9sZr** Kafka SASL user password. It should be configured as the Baidu AI Cloud account key. The format is ${AccessKey}#${SecretKey}, where: ${AccessKey} should be replaced with your AccessKey. ${SecretKey} should be replaced with your SecretKey.
          host bls-log.bj.baidubce.com:8201 Initial cluster connection address, formatted as service address: port number, e.g., bls-log.bj.baidubce.com:8201, where the service address is the service entry for the log service in the current region. Please select the correct service entry based on the region. For details, refer to the service address. The port number is fixed at 8201.
          topic log-online Configure as the logstore name

          Example of go version consumption code is as follows:

          import (
          	"context"
          	"fmt"
          	"log"
          	"time"
          	"github.com/IBM/sarama"
          	"google.golang.org/grpc"
          )
          type ConsumerHandler struct {
          }
          func (h *ConsumerHandler) Setup(s sarama.ConsumerGroupSession) error {
          	for topic, partitions := range s.Claims() {
          		fmt.Printf("Assigned topic: %s Assigned partitions: %v\n", topic, partitions)
          	}
          	log.Println("Consumer setup")
          	return nil
          }
          func (h *ConsumerHandler) Cleanup(s sarama.ConsumerGroupSession) error {
          	log.Println("Consumer cleanup")
          	return nil
          }
          func (h *ConsumerHandler) ConsumeClaim(s sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
          	for msg := range claim.Messages() {
          		fmt.Printf("Message: topic=%s partition=%d offset=%d key=%s value=%s\n",
          			msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
          		s.MarkMessage(msg, "") // Manually mark the message as consumed
          	}
          	return nil
          }
          func main() {
          	config := sarama.NewConfig()
          	config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()
          	config.Metadata.Full = false
          	config.Net.SASL.Mechanism = "PLAIN"
          	config.Net.SASL.Enable = true
              config.Net.TLS.Enable = true
          	// Enter the applied username
          	config.Net.SASL.User = "${project}"
          	// Enter the password corresponding to the username
          	config.Net.SASL.Password = "${AccessKey}#${SecretKey}"
              // Enter the bls domain name+8201 port for the address, e.g., for the bj region, enter bls-log.bj.baidubce.com:8201
          	client, _ := sarama.NewConsumerGroup([]string{"bls_endpoint:8201"}, "your_group_id", config)
          	for {
          		if err := client.Consume(context.Background(), []string{"${topic}"}, &ConsumerHandler{
          		}); err != nil {
          			fmt.Println("Error from consumer:", err)
          		}
          	}
          }

          Note: When using Java Kafka SDK, the version must be 1.0.0-2.3.1, for example

          <dependency>
              <groupId>org.apache.kafka</groupId>
              <artifactId>kafka-clients</artifactId>
              <version>2.2.2</version>
          </dependency>
          • Consumption task list: Real-time consumption task progress can be viewed in the real-time consumption module. The left side displays the list of consumer groups, and after clicking, the right side displays the consumption status of the corresponding logstore consumed by the corresponding consumption group. (Note: Consumer groups inactive for 7 days will be automatically removed from the list.)
          • Reset checkpoint: You can reset the shard corresponding to an entire consumer group or a specific shard. Please close the consumption process before resetting. Reset supports selecting the earliest position, the latest position, or a specified time point.

          image.png

          image.png

          Real-time consumption monitor

          Support viewing the monitor of the total count of unconsumed messages for a consumer group. Select a logstore to view the count of unconsumed messages for a specific logstore, and select both a logstore and a shard to view the count of unconsumed messages for a specific shard.

          image.png

          Previous
          Scheduled SQL Analysis
          Next
          Log Applications