实时消费
更新时间:2025-09-19
概述
日志服务支持通过Kafka协议消费,即可以将一个日志集,当作一个 Kafka Topic 来消费。下面介绍通过 Kafka 协议消费日志数据的相关步骤。
基本概念
概念 | 说明 |
---|---|
消费组(ConsumerGroup) | 消费组是多个消费者组成的虚拟集合。以消费组维度消费日志数据时,消费组中的所有消费者订阅同一个日志集,共同消费一个日志集中的数据。 |
消费者(Consumer) | 一个从日志服务中消费数据的客户端,是消费组的组成部分。同一个消费组中的消费者名称唯一。同一时刻,日志集的一个Shard将会分配给消费组中的某一个消费者,一个消费者可能负责多个 Shard。 |
消费位点(Checkpoint) | 一个 Shard 在被一个消费者消费的过程中,会随时记录当前 Shard 的消费位点(即游标进度)并上报服务端,以此来作为程序重启时的起始消费游标,从而保证数据不会被重复消费。 |
费用
公测期间免费,具体收费时间将通过邮件、短信、站内信等方式通知。
控制台操作
开启实时消费
- 开启实时消费:按日志集维度开启实时消费,在日志集“操作->更多->开启实时消费”处按日志集维度开启实时消费(注:实时消费开启后可能影响日志查询性能,一经开启不可关闭)
实时消费任务
- 如何消费:
支持使用kafka协议进行消费,使用上与消费kafka基本一致。
您需要在使用kafka客户端时配置以下参数
参数 | 示例 | 说明 |
---|---|---|
连接类型 | SASL_SSL | 为保证日志传输的安全性,必须使用 SASL_SSL 连接协议。对应的用户名为日志服务日志组名,密码为账号密钥。 |
username | default | Kafka SASL 用户名。应配置为日志服务的日志组名。 |
password | ALTAKOGSZ***#ALTAK9sZr** | Kafka SASL 用户密码。应配置为百度云账户密钥。格式为 ${AccessKey}#${SecretKey},其中:${AccessKey} 应替换为您的 AccessKey。${SecretKey} 应替换为您的SecretKey |
host | bls-log.bj.baidubce.com:8201 | 初始连接的集群地址,格式为服务地址:端口号,例如 bls-log.bj.baidubce.com:8201,其中:服务地址为当前地域下日志服务的服务入口。请根据地域选择正确的服务入口,详细信息请参见服务地址。端口号固定为 8201。 |
topic | log-online | 配置为日志集名称 |
go版本消费代码示例如下:
Go
1import (
2 "context"
3 "fmt"
4 "log"
5 "time"
6
7 "github.com/IBM/sarama"
8 "google.golang.org/grpc"
9)
10
11type ConsumerHandler struct {
12}
13
14func (h *ConsumerHandler) Setup(s sarama.ConsumerGroupSession) error {
15 for topic, partitions := range s.Claims() {
16 fmt.Printf("Assigned topic: %s Assigned partitions: %v\n", topic, partitions)
17 }
18 log.Println("Consumer setup")
19 return nil
20}
21
22func (h *ConsumerHandler) Cleanup(s sarama.ConsumerGroupSession) error {
23 log.Println("Consumer cleanup")
24 return nil
25}
26
27func (h *ConsumerHandler) ConsumeClaim(s sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
28 for msg := range claim.Messages() {
29 fmt.Printf("Message: topic=%s partition=%d offset=%d key=%s value=%s\n",
30 msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
31 s.MarkMessage(msg, "") // 手动标记消息为已消费
32 }
33 return nil
34}
35
36func main() {
37 config := sarama.NewConfig()
38 config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()
39 config.Metadata.Full = false
40 config.Net.SASL.Mechanism = "PLAIN"
41 config.Net.SASL.Enable = true
42 config.Net.TLS.Enable = true
43 // 填申请的用户名
44 config.Net.SASL.User = "${project}"
45 // 填用户名对应的密码
46 config.Net.SASL.Password = "${AccessKey}#${SecretKey}"
47 // 地址填bls的域名+8201端口,例如bj地域填bls-log.bj.baidubce.com:8201
48 client, _ := sarama.NewConsumerGroup([]string{"bls_endpoint:8201"}, "your_group_id", config)
49 for {
50 if err := client.Consume(context.Background(), []string{"${topic}"}, &ConsumerHandler{
51 }); err != nil {
52 fmt.Println("Error from consumer:", err)
53 }
54 }
55}
注意:使用java kafka sdk时,版本需要为1.0.0-2.3.1版本,例如
XML
1<dependency>
2 <groupId>org.apache.kafka</groupId>
3 <artifactId>kafka-clients</artifactId>
4 <version>2.2.2</version>
5</dependency>
- 消费任务列表:实时消费任务进度在实时消费模块进行查看,左侧显示消费组列表,点击后右侧显示对应消费组消费对应日志集的消费情况。(注:7天消费组没消费,列表自动把消费组删掉)。
- 重置消费位点:可针对整个消费组或一个日志集对应的shard进行重置,重置时请先关闭消费进程后重置。重置支持选择最早位置,最新位置,或指定时间点
实时消费监控
支持查看消费组整体未消费消息条数监控,选择日志集可查看具体某个日志集未消费消息条数监控,选择日志集和Shard后查看具体某个Shard未消费消息条数监控