简介:本文详细介绍如何在单台服务器上通过多进程模拟RocketMQ集群部署,涵盖核心组件配置、环境准备、Broker与NameServer协作机制及故障模拟测试,适合开发测试环境和小型项目使用。
在开发测试阶段或资源受限的小型项目中,单机部署RocketMQ集群能以极低的硬件成本实现消息队列的核心功能。相比传统多节点部署方案,该方案通过多进程模拟集群环境,既能验证消息队列的分布式特性,又能显著降低运维复杂度。典型应用场景包括:
该方案的关键优势在于:
| 组件 | 版本要求 | 配置建议 |
|---|---|---|
| JDK | 1.8+ | 推荐OpenJDK 11 |
| 操作系统 | Linux/MacOS | 内存≥8GB,磁盘≥50GB |
| RocketMQ | 4.9.4+ | 包含Namesrv和Broker模块 |
安装步骤示例(CentOS 7):
# 安装OpenJDK 11sudo yum install java-11-openjdk-devel# 下载RocketMQwget https://dist.apache.rocketmq.com/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zipunzip rocketmq-all-4.9.4-bin-release.zipcd rocketmq-all-4.9.4-bin-release
修改conf/namesrv.properties:
# 监听端口配置listenPort=9876# JVM参数优化JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
创建conf/2m-2s-async/broker-a.properties:
# 集群名称brokerClusterName=DefaultCluster# Broker名称brokerName=broker-a# Broker角色brokerId=0# 存储路径storePathRootDir=/tmp/rocketmq/store-a# 监听端口listenPort=10911# 集群配置namesrvAddr=127.0.0.1:9876# JVM参数JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
创建conf/2m-2s-async/broker-b.properties:
brokerClusterName=DefaultClusterbrokerName=broker-abrokerId=1storePathRootDir=/tmp/rocketmq/store-blistenPort=10921namesrvAddr=127.0.0.1:9876JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
# 启动NameServernohup sh bin/mqnamesrv &# 启动Master Brokernohup sh bin/mqbroker -n localhost:9876 -c conf/2m-2s-async/broker-a.properties &# 启动Slave Brokernohup sh bin/mqbroker -n localhost:9876 -c conf/2m-2s-async/broker-b.properties &
使用RocketMQ Admin工具验证集群状态:
public class ClusterStatusChecker {public static void main(String[] args) throws Exception {DefaultMQAdminExt admin = new DefaultMQAdminExt();admin.setNamesrvAddr("localhost:9876");admin.start();ClusterInfo clusterInfo = admin.examineBrokerClusterInfo();System.out.println("Cluster Info: " + clusterInfo);TopicList topicList = admin.fetchAllTopicList();System.out.println("Topics: " + topicList.getTopicList());admin.shutdown();}}
预期输出应包含:
public class SimpleProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("test_producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();for (int i = 0; i < 10; i++) {Message msg = new Message("TestTopic",("Hello RocketMQ " + i).getBytes());SendResult sendResult = producer.send(msg);System.out.printf("Send Result: %s%n", sendResult);}producer.shutdown();}}
public class SimpleConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TestTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.printf("Receive Msg: %s%n", new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}}
现象:启动时报Address already in use错误
解决方案:
netstat -tulnp | grep 9876listenPort参数namesrvAddr正确现象:Broker启动失败,日志显示Permission denied
解决方案:
# 创建存储目录mkdir -p /tmp/rocketmq/store-{a,b}# 修改权限chown -R $(whoami) /tmp/rocketmq
现象:Broker频繁崩溃,日志显示OutOfMemoryError
优化建议:
JAVA_OPT参数:
JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g"
# 使用jstat监控GCjstat -gcutil <pid> 1000 5
| 参数 | 推荐值 | 作用说明 |
|---|---|---|
sendMessageThreadPoolNums |
16 | 发送消息线程数 |
pullMessageThreadPoolNums |
32 | 拉取消息线程数 |
diskMaxUsedSpaceRatio |
85 | 磁盘使用率阈值 |
推荐监控方案:
# prometheus.yml配置示例scrape_configs:- job_name: 'rocketmq'static_configs:- targets: ['localhost:10911']
# 启动时添加JMX参数JAVA_OPT="${JAVA_OPT} -Dcom.sun.management.jmxremote"
# 终止Master Brokerpkill -f "broker-a.properties"# 验证Slave自动切换sh bin/mqadmin clusterList -n localhost:9876
# 模拟网络断开iptables -A INPUT -p tcp --dport 9876 -j DROP# 恢复网络iptables -D INPUT -p tcp --dport 9876 -j DROP
该单机集群方案存在以下限制:
建议生产环境采用至少3节点的标准集群部署方案,本方案主要适用于:
通过合理配置和监控,单机RocketMQ集群方案能在资源受限环境下提供可靠的消息队列服务,为项目初期验证和开发测试提供高效解决方案。