Spring Boot集成RocketMQ:真实项目应用场景

news/2025/2/26 8:24:16

第一部分:基础配置与简单示例

1. 项目初始化

使用Spring Boot创建一个项目,添加RocketMQ依赖。

  • POM依赖(Maven):

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <version>3.2.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.3.0</version>
    </dependency>
    
  • application.yml 配置:

    rocketmq:
      name-server: localhost:9876
      producer:
        group: default-producer-group
      consumer:
        group: default-consumer-group
    
2. 简单生产者与消费者
  • 生产者

    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class SimpleProducerController {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @GetMapping("/send")
        public String sendMessage() {
            rocketMQTemplate.convertAndSend("SimpleTopic", "Hello, RocketMQ with Spring Boot!");
            return "Message sent!";
        }
    }
    
  • 消费者

    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    
    @Service
    @RocketMQMessageListener(topic = "SimpleTopic", consumerGroup = "simple-consumer-group")
    public class SimpleConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            System.out.println("Received message: " + message);
        }
    }
    
  • 启动类

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class RocketMQApplication {
        public static void main(String[] args) {
            SpringApplication.run(RocketMQApplication.class, args);
        }
    }
    

启动项目后,访问 http://localhost:8080/send,消费者会打印消息。这是最基础的用法,面试中常被问到如何快速集成。


第二部分:真实项目应用场景

以下是RocketMQ在Spring Boot中的典型应用场景,涵盖面试常见问题。

1. 电商订单系统(异步消息)

场景:用户下单后,异步通知库存扣减和物流系统。

  • 生产者(订单服务):

    @RestController
    public class OrderController {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @GetMapping("/place-order")
        public String placeOrder() {
            String orderJson = "{\"orderId\":\"12345\",\"item\":\"Laptop\",\"quantity\":1}";
            // 异步发送消息
            rocketMQTemplate.asyncSend("OrderTopic", orderJson, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("Order sent successfully: " + sendResult.getMsgId());
                }
    
                @Override
                public void onException(Throwable throwable) {
                    System.err.println("Order send failed: " + throwable.getMessage());
                }
            });
            return "Order placed!";
        }
    }
    
  • 消费者(库存服务):

    @Service
    @RocketMQMessageListener(topic = "OrderTopic", consumerGroup = "inventory-consumer-group", selectorExpression = "Inventory")
    public class InventoryConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String orderJson) {
            System.out.println("Processing inventory for: " + orderJson);
            // 假设这里调用库存扣减逻辑
        }
    }
    
  • 配置Tag(application.yml):

    rocketmq:
      name-server: localhost:9876
      producer:
        group: order-producer-group
    

面试问题:如何确保消息不丢失?

  • 回答:使用异步发送时,结合 SendCallback 检查发送结果;在生产者端开启 retries(默认3次重试);Broker端开启持久化。
2. 事务消息(支付系统)

场景:用户支付后,确保订单状态更新和消息发送一致。

  • 生产者(事务消息):

    @RestController
    public class PaymentController {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
        @Autowired
        private OrderService orderService;
    
        @GetMapping("/pay")
        public String payOrder() {
            String orderId = "12345";
            rocketMQTemplate.sendMessageInTransaction("TransactionTopic", MessageBuilder.withPayload("Payment for " + orderId).build(), orderId);
            return "Payment processed!";
        }
    }
    
    @Service
    @RocketMQTransactionListener
    public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
        @Autowired
        private OrderService orderService;
    
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            String orderId = (String) arg;
            try {
                orderService.updateOrderStatus(orderId, "PAID"); // 本地事务
                return RocketMQLocalTransactionState.COMMIT;
            } catch (Exception e) {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
    
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            String orderId = new String(msg.getPayload()).split(" ")[2];
            String status = orderService.getOrderStatus(orderId);
            return "PAID".equals(status) ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    
    @Service
    public class OrderService {
        // 模拟数据库操作
        private Map<String, String> orderStatus = new HashMap<>();
    
        public void updateOrderStatus(String orderId, String status) {
            orderStatus.put(orderId, status);
        }
    
        public String getOrderStatus(String orderId) {
            return orderStatus.getOrDefault(orderId, "UNPAID");
        }
    }
    

面试问题:事务消息的实现原理?

  • 回答:分为两阶段提交。Producer先发送Half消息,执行本地事务后提交或回滚;Broker定时检查未决事务,调用 checkLocalTransaction 确认状态。
3. 日志收集(顺序消息)

场景:收集应用日志,确保按时间顺序处理。

  • 生产者

    @RestController
    public class LogController {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @GetMapping("/log")
        public String sendLog() {
            String log = "{\"timestamp\":\"2025-02-25 10:00:00\",\"message\":\"User login\"}";
            rocketMQTemplate.syncSendOrderly("LogTopic", log, "user123"); // 使用userId作为hashKey保证顺序
            return "Log sent!";
        }
    }
    
  • 消费者

    @Service
    @RocketMQMessageListener(topic = "LogTopic", consumerGroup = "log-consumer-group", messageModel = MessageModel.CLUSTERING)
    public class LogConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String log) {
            System.out.println("Processing log: " + log);
        }
    }
    

面试问题:如何保证消息顺序?

  • 回答:使用 syncSendOrderly,通过hashKey(如用户ID)将消息路由到同一队列,消费者单线程消费该队列。
4. 延迟消息(促销提醒)

场景:订单未支付30分钟后发送提醒。

  • 生产者

    @RestController
    public class ReminderController {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @GetMapping("/remind")
        public String sendReminder() {
            String reminder = "Order 12345 unpaid";
            rocketMQTemplate.syncSend("ReminderTopic", MessageBuilder.withPayload(reminder).build(), 1000, 18); // 18代表30分钟延迟
            return "Reminder scheduled!";
        }
    }
    
  • 消费者

    @Service
    @RocketMQMessageListener(topic = "ReminderTopic", consumerGroup = "reminder-consumer-group")
    public class ReminderConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String reminder) {
            System.out.println("Sending reminder: " + reminder);
        }
    }
    

面试问题:延迟消息的实现机制?

  • 回答:RocketMQ内置18个延迟级别(1s到2h),消息先存储到延迟队列,到期后投递到目标队列。

第三部分:优化与高可用

1. 高可用性配置
  • 多NameServer

    rocketmq:
      name-server: localhost:9876;localhost:9877
    
  • 多消费者实例:ConsumerGroup内多实例负载均衡。

2. 性能优化
  • 批量发送

    List<Message> messages = Arrays.asList(
        MessageBuilder.withPayload("msg1").build(),
        MessageBuilder.withPayload("msg2").build()
    );
    rocketMQTemplate.syncSend("BatchTopic", messages);
    
  • 调整线程池

    rocketmq:
      consumer:
        pull-batch-size: 32
        consume-thread-max: 64
    
3. 异常处理
  • 消费者重试

    @Service
    @RocketMQMessageListener(topic = "RetryTopic", consumerGroup = "retry-consumer-group")
    public class RetryConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            if (true) { // 模拟失败
                throw new RuntimeException("Processing failed");
            }
        }
    }
    

    默认重试16次,可通过 maxReconsumeTimes 调整。


第四部分:面试常见问题与回答

  1. RocketMQ与Kafka的区别?

    • RocketMQ支持事务消息和延迟消息,Kafka不支持。
    • RocketMQ拉模式和推模式都支持,Kafka主要拉模式。
    • RocketMQ适合业务场景,Kafka更偏大数据处理。
  2. 如何处理消息重复消费?

    • 在消费者端实现幂等性(如数据库唯一约束或Redis去重)。
  3. 如何监控RocketMQ?

    • 使用RocketMQ Dashboard查看Topic、消费进度;集成Prometheus+Grafana监控性能。

http://www.niftyadmin.cn/n/5868379.html

相关文章

DeepSeek回答:AI时代Go语言学习路线

最近有小伙伴经常会问&#xff1a;**该如何学习入门Go语言&#xff1f;怎样提升Go语言Coding水平&#xff1f;**这篇文章我们就使用DeepSeek来梳理下Go语言在AI时代的学习路线。 向DeepSeek提问的问题原文&#xff1a; 你现在是一名资深的Go语言工程师&#xff0c;精通Go语言并…

MySQL索引失效

MySQL索引失效会导致查询性能下降&#xff0c;常见原因及解决方案如下&#xff1a; 一、使用OR条件 原因&#xff1a;当OR条件中有一个列没有索引时&#xff0c;索引可能失效 解决方法&#xff1a;确保OR条件中的所有列都有索引&#xff0c;或使用UNION替代OR -- 不推荐 SE…

进程状态(R|S|D|t|T|X|Z)、僵尸进程及孤儿进程

文章目录 一.进程状态进程排队状态&#xff1a;运行、阻塞、挂起 二.Linux下的进程状态R 运行状态&#xff08;running&#xff09;S 睡眠状态&#xff08;sleeping)D 磁盘休眠状态&#xff08;Disk sleep&#xff09;t 停止、暂停状态(tracing stopped)T 停止、暂停状态(stopp…

【UCB CS 61B SP24】Lecture 14 - Data Structures 1: Disjoint Sets学习笔记

本文内容为数据结构并查集&#xff08;DSU&#xff09;的介绍与实现&#xff0c;详细讲解了并查集这一数据结构所能实现的各种操作&#xff0c;以及如何通过路径压缩与按秩合并大幅优化并查集的效率。 1. 并查集 1.1 介绍及其基础操作 并查集&#xff08;Disjoint Set Union…

冯诺依曼体系结构 ──── linux第8课

目录 冯诺依曼体系结构 关于冯诺依曼&#xff0c;必须强调几点&#xff1a; 冯诺依曼体系结构 我们常见的计算机&#xff0c;如笔记本。我们不常见的计算机&#xff0c;如服务器&#xff0c;大部分都遵守冯诺依曼体系 输入单元&#xff1a;包括键盘, 鼠标&#xff0c;网卡,扫…

C#语音识别与播报开发指南

C# 语音识别离线开发推荐库 以下是一些适用于 C# 离线语音识别的库&#xff0c;支持本地处理&#xff0c;无需网络连接&#xff1a; 1. System.Speech.Recognition (Windows) 简介&#xff1a;.NET Framework 自带的库&#xff0c;适合简单的离线命令词识别。适用场景&#x…

Python--内置函数与推导式(上)

1. 匿名函数 Lambda表达式基础 语法&#xff1a;lambda 参数: 表达式​ 特点&#xff1a; 没有函数名&#xff0c;适合简单逻辑函数体只能是单行表达式自动返回表达式结果 # 示例1&#xff1a;加法 add lambda a, b: a b print(add(3, 5)) # 输出 8# 示例2&#xff1a;字…

selenium如何实现,开启浏览器的开发者工具模式,并且开启 toggle移动设备模拟模式

核心实现代码 pythonCopy Code from selenium import webdriver from selenium.webdriver.chrome.options import Options def enable_devtools_with_toggle(): options Options() # 强制开启开发者工具 options.add_argument("--auto-open-devtools-for-tabs&quo…