RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ

作者:沙与沫2024.01.17 11:35浏览量:26

简介:本文将介绍如何在 SpringBoot 项目中集成 RocketMQ,实现消息的发送与接收。我们将分步骤详细解析配置和代码编写过程,以帮助您快速掌握 RocketMQ 在 SpringBoot 项目中的集成方法。

在上一篇教程中,我们简要介绍了 RocketMQ 的概念、特点和基本使用。今天,我们将重点讲解如何在 SpringBoot 项目中集成 RocketMQ,实现消息的发送与接收。以下是详细的步骤和代码示例。
一、添加依赖
在 SpringBoot 项目中集成 RocketMQ,首先需要在项目的 pom.xml 文件中添加 RocketMQ 的相关依赖。具体如下:

  1. <dependencies>
  2. <!-- 其他依赖项... -->
  3. <dependency>
  4. <groupId>org.apache.rocketmq</groupId>
  5. <artifactId>rocketmq-spring-boot-starter</artifactId>
  6. <version>最新版本</version>
  7. </dependency>
  8. </dependencies>

请确保替换 <version>最新版本</version> 为当前 RocketMQ 的最新版本号。
二、配置 RocketMQ
application.propertiesapplication.yml 文件中配置 RocketMQ 的相关属性,如生产者和消费者的名称、地址等。以下是一个简单的配置示例:
application.properties 文件内容:

  1. rocketmq.name=your-producer-name
  2. rocketmq.broker-name=your-broker-name
  3. rocketmq.producer.group=your-producer-group
  4. rocketmq.consumer.group=your-consumer-group
  5. rocketmq.consumer.name=your-consumer-name
  6. rocketmq.producer.namesrv-addr=127.0.0.1:9876
  7. rocketmq.consumer.namesrv-addr=127.0.0.1:9876

其中,namesrv-addr 是 RocketMQ 集群的地址,需要根据你的实际环境进行修改。多个地址之间用逗号分隔。
三、编写消息发送代码
在 SpringBoot 项目中,可以通过注入 RocketMQProducer 类来发送消息。以下是一个简单的消息发送示例:

  1. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class MessageService {
  6. @Autowired
  7. private RocketMQTemplate rocketMQTemplate;
  8. public void sendMessage(String topic, String message) {
  9. rocketMQTemplate.convertAndSend(topic, message);
  10. }
  11. }

在上述代码中,我们通过 @Autowired 注解将 RocketMQTemplate 注入到 MessageService 类中,然后使用 convertAndSend 方法发送消息。其中,topic 是消息的主题,message 是要发送的消息内容。
四、编写消息接收代码
在 SpringBoot 项目中,可以通过实现 MessageListener 接口来接收消息。以下是一个简单的消息接收示例:
java import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.stereotype.Service; @Service public class MessageReceiver implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {\ // 处理接收到的消息...}\n}\n在上述代码中,我们实现了 MessageListenerConcurrently 接口,并重写了 consumeMessage 方法来处理接收到的消息。你可以根据实际需求在 consumeMessage 方法中编写处理逻辑。注意,你还需要在配置文件中指定消费者组名和名称服务器地址等属性。
五、启动和测试
启动 SpringBoot 项目后,你可以通过调用 MessageService 类的 sendMessage 方法发送消息,并通过查看日志或控制台输出等来检查消息是否被