共享订阅
更新时间:2024-02-26
MQTT 基于 PUB/SUB 消息模式,同一消息可以分发到多个订阅了此消息topic的客户端。如果单个客户端处理大量消息出现瓶颈时,可以采用共享 订阅的方式实现负载均衡。
共享订阅属于MQTT 5.0中的特性,IOT Core 在3.x版本中提前支持了此功能。 共享订阅模式的TopicFilter格式:
$share/{ShareName}/{filter}
其中:
$share
为共享订阅模式的固定前缀(所有非共享订阅不可以此作为topic的前缀){ShareName}
为共享组,所有拥有相同{ShareName}
的属于同一个共享组{ShareName}
最少一个字符,最大40个字符,且不可包含/
、+
以及#
filter
为实际的 topic,格式限制跟非共享订阅的 topic 一致
多个客户端可以使用共享订阅的方式订阅同一个topic,此topic的消息将以共享组的方式分发消息。以单条消息为粒度,每条消息只会被发送同一 共享组内的一个客户端。
如下图所示:
上图中,subscriber1使用非共享订阅,将会收到所有的消息。subscriber2 和 subscriber3 使用了共享订阅,且属于同一个共享组 'group1',则一 条消息只会发送给其中一个客户端,每次随机选择一个客户端发送。
同样,subscriber4 和 subscriber5 使用共享订阅且属于同一个共享组 'group2',每条消息将随机发送给其中一个客户端。 非共享订阅以及共享订阅的不同组之间则互不影响。
Paho客户端代码示例:
MqttClient sampleClient1 = ...
// topic
sampleClient1.subscribe($share/group1/testTopic, 0);
// Callback
sampleClient1.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
}
@Override
public void messageArrived(String topic, MqttMessage message)
throws Exception {
}
}
// topic 'testTopic' '$share/group1/testTopic'
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
参考 MQTT 协议原文
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Shared_Subscriptions