Skip to content

RabbitMQ 学习指南

导航目录

一、RabbitMQ 概述

1.1 什么是 RabbitMQ

RabbitMQ 是一个开源的消息中间件,基于 AMQP 协议实现。

它主要用于:

  • 应用解耦
  • 异步处理
  • 流量削峰
  • 系统通知
  • 任务分发

重点:RabbitMQ 的核心价值不是“存消息”,而是“异步解耦和流量治理”。

1.2 RabbitMQ 能解决什么问题

例如下单成功后,通常还会触发很多后续动作:

  • 发短信
  • 发邮件
  • 扣减积分
  • 生成物流任务

如果这些逻辑都同步执行,会拖慢主流程。使用 RabbitMQ 后,可以把这些动作异步化。

1.3 RabbitMQ 常见使用场景

  • 订单异步通知
  • 秒杀削峰
  • 日志异步采集
  • 延迟取消订单
  • 任务分发

二、为什么需要消息队列

2.1 应用解耦

没有消息队列时:

  • A 服务必须直接调用 B 服务
  • B 服务不可用时,A 服务也可能失败

使用消息队列后:

  • A 只负责发消息
  • B 负责异步消费消息

2.2 异步提速

同步流程:

text
下单 -> 发短信 -> 发邮件 -> 记录积分 -> 返回成功

异步流程:

text
下单 -> 写入消息 -> 立即返回
                 -> 消费者异步处理短信 / 邮件 / 积分

2.3 削峰填谷

当流量突增时:

  • 生产者快速写入 MQ
  • 消费者按自身能力慢慢消费

这就是削峰的典型应用。

三、RabbitMQ 核心概念

3.1 Broker

  • RabbitMQ 服务实例本身

3.2 Virtual Host

  • 虚拟主机
  • 用于逻辑隔离不同业务环境

3.3 Connection

  • 生产者或消费者与 RabbitMQ 的 TCP 连接

3.4 Channel

  • 信道
  • 绝大多数操作都在 Channel 中完成

3.5 Queue

  • 队列
  • 存放待消费消息

3.6 Exchange

  • 交换机
  • 负责路由消息

3.7 Routing Key

  • 路由键
  • 用于匹配交换机和队列之间的绑定规则

3.8 Binding

  • 绑定关系
  • 定义交换机如何把消息路由到队列

3.9 Message

  • 真正传输的业务数据

3.10 RabbitMQ 核心流程

text
生产者 -> Exchange -> Queue -> 消费者

重点:生产者不会直接把消息发给队列,而是先发给 Exchange

四、RabbitMQ 安装与管理

4.1 安装方式概览

常见安装方式:

  • Docker 安装
  • Linux 安装
  • Windows 安装

学习阶段最推荐 Docker,成本最低。

4.2 Docker 启动 RabbitMQ

bash
docker run -d \
  --name rabbitmq \
  # 映射 AMQP 协议端口
  -p 5672:5672 \
  # 映射管理控制台端口
  -p 15672:15672 \
  rabbitmq:3-management

4.3 Linux 安装思路

RabbitMQ 依赖 Erlang 运行时。

bash
# 1. 安装 Erlang
yum install -y erlang

# 2. 安装 RabbitMQ Server
yum install -y rabbitmq-server

# 3. 启动 RabbitMQ
systemctl start rabbitmq-server

# 4. 设置开机自启
systemctl enable rabbitmq-server

# 5. 开启管理插件
rabbitmq-plugins enable rabbitmq_management

4.4 Windows 安装思路

Windows 下通常流程:

  1. 安装 Erlang
  2. 安装 RabbitMQ Server
  3. 启动 RabbitMQ 服务
  4. 开启 rabbitmq_management 插件

4.5 默认访问地址

  • 管理台地址:http://127.0.0.1:15672
  • 默认账号密码:guest / guest

4.6 常见端口

  • 5672:消息通信端口
  • 15672:管理控制台端口

4.7 常见命令

bash
# 查看 RabbitMQ 状态
rabbitmqctl status

# 创建用户
rabbitmqctl add_user admin 123456

# 设置管理员角色
rabbitmqctl set_user_tags admin administrator

# 给指定 vhost 授权
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

4.8 Virtual Host 使用建议

不同业务或环境建议使用不同 Virtual Host

  • dev
  • test
  • prod

4.9 管理台常用操作

  • 创建交换机
  • 创建队列
  • 绑定路由
  • 查看消息积压
  • 查看消费者状态

五、五种常见消息模型

5.1 简单模式

一个生产者,一个消费者,一个队列。

text
Producer -> Queue -> Consumer

5.1.1 Spring Boot 代码示例

java
@Service
public class SimpleProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send() {
        // 直接向队列发送消息,适合简单模式
        rabbitTemplate.convertAndSend("", "simple.queue", "hello rabbitmq");
    }
}
java
@Component
public class SimpleConsumer {

    @RabbitListener(queues = "simple.queue")
    public void receive(String msg) {
        System.out.println("简单模式收到消息: " + msg);
    }
}

5.2 工作队列模式

一个生产者,多个消费者共同消费一个队列。

text
Producer -> Queue -> Consumer1 / Consumer2 / Consumer3

适合:

  • 任务并行处理
  • 提高消费能力

5.2.1 代码示例

java
@Service
public class WorkProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send() {
        for (int i = 1; i <= 10; i++) {
            rabbitTemplate.convertAndSend("", "work.queue", "task-" + i);
        }
    }
}
java
@Component
public class WorkConsumer1 {

    @RabbitListener(queues = "work.queue")
    public void receive(String msg) {
        System.out.println("消费者1处理: " + msg);
    }
}
java
@Component
public class WorkConsumer2 {

    @RabbitListener(queues = "work.queue")
    public void receive(String msg) {
        System.out.println("消费者2处理: " + msg);
    }
}

5.3 发布订阅模式

  • 使用 fanout 交换机
  • 消息广播给所有绑定队列

5.3.1 配置示例

java
@Configuration
public class FanoutConfig {

    @Bean
    public FanoutExchange logExchange() {
        return new FanoutExchange("log.fanout");
    }

    @Bean
    public Queue logQueue1() {
        return new Queue("log.queue.1");
    }

    @Bean
    public Queue logQueue2() {
        return new Queue("log.queue.2");
    }

    @Bean
    public Binding logBinding1() {
        return BindingBuilder.bind(logQueue1()).to(logExchange());
    }

    @Bean
    public Binding logBinding2() {
        return BindingBuilder.bind(logQueue2()).to(logExchange());
    }
}
java
@Service
public class FanoutProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send() {
        // fanout 模式下 routingKey 会被忽略
        rabbitTemplate.convertAndSend("log.fanout", "", "broadcast message");
    }
}

5.4 路由模式

  • 使用 direct 交换机
  • 根据精确的 routingKey 路由

5.4.1 配置示例

java
@Configuration
public class DirectConfig {

    @Bean
    public DirectExchange orderDirectExchange() {
        return new DirectExchange("order.direct");
    }

    @Bean
    public Queue createQueue() {
        return new Queue("order.create.queue");
    }

    @Bean
    public Queue cancelQueue() {
        return new Queue("order.cancel.queue");
    }

    @Bean
    public Binding createBinding() {
        return BindingBuilder.bind(createQueue()).to(orderDirectExchange()).with("order.create");
    }

    @Bean
    public Binding cancelBinding() {
        return BindingBuilder.bind(cancelQueue()).to(orderDirectExchange()).with("order.cancel");
    }
}
java
@Service
public class DirectProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendCreate() {
        rabbitTemplate.convertAndSend("order.direct", "order.create", "create order");
    }

    public void sendCancel() {
        rabbitTemplate.convertAndSend("order.direct", "order.cancel", "cancel order");
    }
}

5.5 Topic 模式

  • 使用 topic 交换机
  • 支持通配符路由

5.5.1 配置示例

java
@Configuration
public class TopicConfig {

    @Bean
    public TopicExchange orderTopicExchange() {
        return new TopicExchange("order.topic");
    }

    @Bean
    public Queue orderAllQueue() {
        return new Queue("order.all.queue");
    }

    @Bean
    public Queue orderCreateQueue() {
        return new Queue("order.create.only.queue");
    }

    @Bean
    public Binding orderAllBinding() {
        // 匹配所有 order 开头的消息
        return BindingBuilder.bind(orderAllQueue()).to(orderTopicExchange()).with("order.#");
    }

    @Bean
    public Binding orderCreateBinding() {
        // 只匹配 order.create
        return BindingBuilder.bind(orderCreateQueue()).to(orderTopicExchange()).with("order.create");
    }
}
java
@Service
public class TopicProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send() {
        rabbitTemplate.convertAndSend("order.topic", "order.create", "topic create message");
        rabbitTemplate.convertAndSend("order.topic", "order.pay.success", "topic pay success");
    }
}

5.6 Headers 模式

虽然业务中使用较少,但 RabbitMQ 还支持 headers 交换机。

特点:

  • 按消息头匹配
  • 不依赖 routingKey

5.6.1 配置示例

java
@Configuration
public class HeadersConfig {

    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange("report.headers");
    }

    @Bean
    public Queue headersQueue() {
        return new Queue("report.queue");
    }

    @Bean
    public Binding headersBinding() {
        return BindingBuilder.bind(headersQueue())
                .to(headersExchange())
                .where("type").matches("pdf");
    }
}

5.7 五种模式总览

模式特点适用场景
简单模式最基础入门学习
工作队列多消费者竞争消费任务处理
发布订阅广播通知、日志
路由模式精确路由分类处理
Topic 模式模糊匹配复杂业务路由

5.8 消息模型学习重点

  1. 简单模式
  2. 工作队列模式
  3. 发布订阅模式
  4. 路由模式
  5. Topic 模式
  6. Headers 模式

六、交换机与路由模式

6.1 四种常见交换机

6.1.1 direct

  • 精确匹配 routingKey

6.1.2 topic

  • 模糊匹配 routingKey
  • * 匹配一个单词
  • # 匹配多个单词

6.1.3 fanout

  • 广播模式
  • 忽略 routingKey

6.1.4 headers

  • 根据消息头匹配
  • 实际项目中用得较少

6.2 Topic 示例理解

例如绑定规则:

  • order.*
  • order.#

消息路由键:

  • order.create
  • order.pay.success

匹配规则:

  • order.* 只能匹配 order.create
  • order.# 两个都能匹配

6.3 交换机模式学习重点

  1. direct
  2. topic
  3. fanout
  4. headers

6.4 各种路由模式适用场景

6.4.1 direct 场景

  • 订单创建
  • 订单取消
  • 精确业务分类

6.4.2 topic 场景

  • 订单相关所有事件
  • 日志分级分类
  • 多层级业务路由

6.4.3 fanout 场景

  • 广播通知
  • 配置刷新通知
  • 操作日志多系统同步

6.4.4 headers 场景

  • 基于消息属性做精细化分类
  • 特殊规则系统

七、消息可靠性与确认机制

7.1 为什么需要消息可靠性

消息链路可能在任意环节丢失:

  • 生产者发送失败
  • Exchange 没收到
  • Queue 没成功持久化
  • 消费者处理失败

7.2 RabbitMQ 可靠性链路

消息可靠性可以拆成三个阶段:

  1. 生产者把消息成功发到交换机
  2. 交换机把消息成功路由到队列
  3. 消费者把消息成功处理完成

7.3 生产者消息确认

生产者可以通过 publisher-confirm-type 确认消息是否到达 Broker。

yaml
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: correlated
    publisher-returns: true

7.4 ConfirmCallback 示例

java
@Configuration
public class RabbitTemplateConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("消息成功到达交换机");
            } else {
                System.out.println("消息没有到达交换机: " + cause);
            }
        });

        rabbitTemplate.setReturnsCallback(returned -> {
            System.out.println("消息没有路由到队列: " + returned.getMessage());
        });

        return rabbitTemplate;
    }
}

7.5 mandatory / return 机制说明

  • confirm:确认消息是否到达交换机
  • return:确认消息是否从交换机成功路由到队列

二者配合才能更完整地确认生产端链路。

7.6 消费者手动确认

yaml
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual
java
@Component
public class OrderConsumer {

    @RabbitListener(queues = "order.queue")
    public void receive(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 处理业务逻辑
            System.out.println("收到消息: " + new String(message.getBody(), StandardCharsets.UTF_8));

            // 处理成功后手动 ack
            channel.basicAck(deliveryTag, false);
        } catch (Exception ex) {
            // 处理失败后可以拒绝并决定是否重回队列
            channel.basicNack(deliveryTag, false, true);
        }
    }
}

7.7 basicAck、basicNack、basicReject 区别

  • basicAck:确认成功消费
  • basicNack:确认消费失败,可批量,可选择是否重回队列
  • basicReject:确认消费失败,不支持批量

7.8 消息持久化

可靠性增强常见做法:

  • 交换机持久化
  • 队列持久化
  • 消息持久化

重点:持久化不是百分百不丢消息,但它是可靠性的基础配置。

7.9 生产端可靠性建议

  • 开启 confirm
  • 开启 return
  • 发送失败要记录日志或落库补偿
  • 重要消息可配合本地消息表

7.10 消费端可靠性建议

  • 使用手动 ack
  • 消费失败明确决定是否重回队列
  • 业务逻辑一定要幂等
  • 做好异常日志和告警

八、死信队列、延迟队列与幂等性

8.1 什么是死信队列

消息变成死信的常见原因:

  • 被拒绝且不重回队列
  • 消息过期
  • 队列达到最大长度

8.2 死信队列配置示例

java
@Configuration
public class DeadLetterConfig {

    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable("order.queue")
                // 指定死信交换机
                .deadLetterExchange("order.dlx")
                // 指定死信路由键
                .deadLetterRoutingKey("order.dead")
                .build();
    }

    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("order.dlx");
    }

    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable("order.dead.queue").build();
    }

    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue())
                .to(deadLetterExchange())
                .with("order.dead");
    }
}

8.3 死信消息常见场景

  • 订单超时未支付
  • 消费失败进入异常队列
  • 需要人工介入的补偿消息

8.4 延迟队列思路

RabbitMQ 原生没有真正的延迟队列概念,常见做法有:

  • TTL + 死信队列
  • 安装 delayed-message 插件

8.5 TTL + 死信实现延迟取消订单

text
下单成功 -> 发送一条 30 分钟过期消息
         -> 消息先进入普通队列
         -> 过期后进入死信队列
         -> 消费死信队列执行取消订单

8.6 TTL 延迟队列配置示例

java
@Configuration
public class DelayConfig {

    @Bean
    public Queue ttlQueue() {
        return QueueBuilder.durable("order.ttl.queue")
                // 统一设置消息过期时间,单位毫秒
                .ttl(30 * 60 * 1000)
                .deadLetterExchange("order.dlx")
                .deadLetterRoutingKey("order.dead")
                .build();
    }
}

8.7 delayed-message 插件方案

如果安装了 rabbitmq_delayed_message_exchange 插件,可以更灵活地设置消息级别延迟。

java
@Bean
public CustomExchange delayedExchange() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    return new CustomExchange("order.delayed.exchange", "x-delayed-message", true, false, args);
}

8.8 幂等性问题

消费者可能会出现重复消费。

常见原因:

  • 消费成功但 ack 丢失
  • 网络抖动
  • 消费失败重试

8.9 幂等性处理建议

  • 数据库唯一索引
  • 业务状态机判断
  • Redis 防重
  • 去重表

8.10 死信、延迟、幂等性学习重点

  1. 死信消息产生原因
  2. 死信队列配置
  3. TTL 延迟队列
  4. 插件延迟消息
  5. 消费者幂等性

九、Spring Boot 整合 RabbitMQ

9.1 引入依赖

xml
<dependency>
  <groupId>org.springframework.boot</groupId>
  <!-- RabbitMQ Spring Boot 集成依赖 -->
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

9.2 基础配置

yaml
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

9.3 队列、交换机、绑定声明

java
@Configuration
public class RabbitConfig {

    @Bean
    public DirectExchange orderExchange() {
        // 订单直连交换机
        return new DirectExchange("order.exchange", true, false);
    }

    @Bean
    public Queue orderQueue() {
        // 订单队列
        return new Queue("order.queue", true);
    }

    @Bean
    public Binding orderBinding() {
        // 用 routingKey=order.create 绑定队列和交换机
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with("order.create");
    }
}

9.4 生产者发送消息

java
@Service
public class OrderProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendOrderMessage(Long orderId) {
        // 发送订单创建消息
        rabbitTemplate.convertAndSend("order.exchange", "order.create", "orderId=" + orderId);
    }
}

9.5 消费者接收消息

java
@Component
public class OrderMessageConsumer {

    @RabbitListener(queues = "order.queue")
    public void receive(String msg) {
        System.out.println("消费者收到消息: " + msg);
    }
}

9.6 JSON 消息示例

java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderMessage {
    private Long orderId;
    private String type;
}
java
@Service
public class JsonOrderProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send(OrderMessage orderMessage) {
        // 发送 JSON 结构消息
        rabbitTemplate.convertAndSend("order.exchange", "order.create", orderMessage);
    }
}

9.7 Spring AMQP 学习重点

  1. 队列声明
  2. 交换机声明
  3. 绑定关系
  4. 生产者发送
  5. 消费者监听
  6. 确认机制

9.8 典型业务场景示例

9.8.1 下单后异步通知

  • 订单服务发送下单消息
  • 短信服务、积分服务异步消费

9.8.2 秒杀削峰

  • 请求先写入 MQ
  • 后端消费者按能力慢慢处理

9.8.3 延迟取消订单

  • 下单成功发送延迟消息
  • 超时未支付则自动取消

9.8.4 日志异步处理

  • 系统日志先写 MQ
  • 日志服务异步消费入库

十、面试重点与最佳实践

10.1 高频面试题

  1. RabbitMQ 为什么能解耦
  2. Exchange 和 Queue 的关系
  3. 五种消息模型有哪些
  4. direct、topic、fanout 的区别
  5. 消息为什么会丢失
  6. 如何保证消息可靠性
  7. 如何解决重复消费
  8. 死信队列和延迟队列怎么实现

10.2 最佳实践

  • 生产者开启 confirm 和 return
  • 消费者尽量使用手动 ack
  • 重要消息做好持久化
  • 消费者业务必须考虑幂等性
  • 合理监控消息积压
  • 延迟任务优先考虑 TTL + 死信方案

10.3 总结

RabbitMQ 是 Java 后端里非常常见的消息中间件。

  • 入门阶段要掌握队列、交换机、路由
  • 进阶阶段要掌握五种模型和确认机制
  • 高阶阶段要掌握死信队列、延迟队列、幂等性和可靠性