Appearance
RabbitMQ 学习指南
导航目录
- 一、RabbitMQ 概述
- 二、为什么需要消息队列
- 三、RabbitMQ 核心概念
- 四、RabbitMQ 安装与管理
- 五、五种常见消息模型
- 六、交换机与路由模式
- 七、消息可靠性与确认机制
- 八、死信队列、延迟队列与幂等性
- 九、Spring Boot 整合 RabbitMQ
- 十、面试重点与最佳实践
一、RabbitMQ 概述
1.1 什么是 RabbitMQ
RabbitMQ 是一个开源的消息中间件,基于 AMQP 协议实现。
它主要用于:
- 应用解耦
- 异步处理
- 流量削峰
- 系统通知
- 任务分发
重点:RabbitMQ 的核心价值不是“存消息”,而是“异步解耦和流量治理”。
1.2 RabbitMQ 能解决什么问题
例如下单成功后,通常还会触发很多后续动作:
- 发短信
- 发邮件
- 扣减积分
- 生成物流任务
如果这些逻辑都同步执行,会拖慢主流程。使用 RabbitMQ 后,可以把这些动作异步化。
1.3 RabbitMQ 常见使用场景
- 订单异步通知
- 秒杀削峰
- 日志异步采集
- 延迟取消订单
- 任务分发
二、为什么需要消息队列
2.1 应用解耦
没有消息队列时:
- A 服务必须直接调用 B 服务
- B 服务不可用时,A 服务也可能失败
使用消息队列后:
- A 只负责发消息
- B 负责异步消费消息
2.2 异步提速
同步流程:
text
下单 -> 发短信 -> 发邮件 -> 记录积分 -> 返回成功异步流程:
text
下单 -> 写入消息 -> 立即返回
-> 消费者异步处理短信 / 邮件 / 积分2.3 削峰填谷
当流量突增时:
- 生产者快速写入 MQ
- 消费者按自身能力慢慢消费
这就是削峰的典型应用。
三、RabbitMQ 核心概念
3.1 Broker
- RabbitMQ 服务实例本身
3.2 Virtual Host
- 虚拟主机
- 用于逻辑隔离不同业务环境
3.3 Connection
- 生产者或消费者与 RabbitMQ 的 TCP 连接
3.4 Channel
- 信道
- 绝大多数操作都在 Channel 中完成
3.5 Queue
- 队列
- 存放待消费消息
3.6 Exchange
- 交换机
- 负责路由消息
3.7 Routing Key
- 路由键
- 用于匹配交换机和队列之间的绑定规则
3.8 Binding
- 绑定关系
- 定义交换机如何把消息路由到队列
3.9 Message
- 真正传输的业务数据
3.10 RabbitMQ 核心流程
text
生产者 -> Exchange -> Queue -> 消费者重点:生产者不会直接把消息发给队列,而是先发给 Exchange。
四、RabbitMQ 安装与管理
4.1 安装方式概览
常见安装方式:
- Docker 安装
- Linux 安装
- Windows 安装
学习阶段最推荐 Docker,成本最低。
4.2 Docker 启动 RabbitMQ
bash
docker run -d \
--name rabbitmq \
# 映射 AMQP 协议端口
-p 5672:5672 \
# 映射管理控制台端口
-p 15672:15672 \
rabbitmq:3-management4.3 Linux 安装思路
RabbitMQ 依赖 Erlang 运行时。
bash
# 1. 安装 Erlang
yum install -y erlang
# 2. 安装 RabbitMQ Server
yum install -y rabbitmq-server
# 3. 启动 RabbitMQ
systemctl start rabbitmq-server
# 4. 设置开机自启
systemctl enable rabbitmq-server
# 5. 开启管理插件
rabbitmq-plugins enable rabbitmq_management4.4 Windows 安装思路
Windows 下通常流程:
- 安装 Erlang
- 安装 RabbitMQ Server
- 启动 RabbitMQ 服务
- 开启
rabbitmq_management插件
4.5 默认访问地址
- 管理台地址:
http://127.0.0.1:15672 - 默认账号密码:
guest / guest
4.6 常见端口
5672:消息通信端口15672:管理控制台端口
4.7 常见命令
bash
# 查看 RabbitMQ 状态
rabbitmqctl status
# 创建用户
rabbitmqctl add_user admin 123456
# 设置管理员角色
rabbitmqctl set_user_tags admin administrator
# 给指定 vhost 授权
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"4.8 Virtual Host 使用建议
不同业务或环境建议使用不同 Virtual Host:
devtestprod
4.9 管理台常用操作
- 创建交换机
- 创建队列
- 绑定路由
- 查看消息积压
- 查看消费者状态
五、五种常见消息模型
5.1 简单模式
一个生产者,一个消费者,一个队列。
text
Producer -> Queue -> Consumer5.1.1 Spring Boot 代码示例
java
@Service
public class SimpleProducer {
@Resource
private RabbitTemplate rabbitTemplate;
public void send() {
// 直接向队列发送消息,适合简单模式
rabbitTemplate.convertAndSend("", "simple.queue", "hello rabbitmq");
}
}java
@Component
public class SimpleConsumer {
@RabbitListener(queues = "simple.queue")
public void receive(String msg) {
System.out.println("简单模式收到消息: " + msg);
}
}5.2 工作队列模式
一个生产者,多个消费者共同消费一个队列。
text
Producer -> Queue -> Consumer1 / Consumer2 / Consumer3适合:
- 任务并行处理
- 提高消费能力
5.2.1 代码示例
java
@Service
public class WorkProducer {
@Resource
private RabbitTemplate rabbitTemplate;
public void send() {
for (int i = 1; i <= 10; i++) {
rabbitTemplate.convertAndSend("", "work.queue", "task-" + i);
}
}
}java
@Component
public class WorkConsumer1 {
@RabbitListener(queues = "work.queue")
public void receive(String msg) {
System.out.println("消费者1处理: " + msg);
}
}java
@Component
public class WorkConsumer2 {
@RabbitListener(queues = "work.queue")
public void receive(String msg) {
System.out.println("消费者2处理: " + msg);
}
}5.3 发布订阅模式
- 使用
fanout交换机 - 消息广播给所有绑定队列
5.3.1 配置示例
java
@Configuration
public class FanoutConfig {
@Bean
public FanoutExchange logExchange() {
return new FanoutExchange("log.fanout");
}
@Bean
public Queue logQueue1() {
return new Queue("log.queue.1");
}
@Bean
public Queue logQueue2() {
return new Queue("log.queue.2");
}
@Bean
public Binding logBinding1() {
return BindingBuilder.bind(logQueue1()).to(logExchange());
}
@Bean
public Binding logBinding2() {
return BindingBuilder.bind(logQueue2()).to(logExchange());
}
}java
@Service
public class FanoutProducer {
@Resource
private RabbitTemplate rabbitTemplate;
public void send() {
// fanout 模式下 routingKey 会被忽略
rabbitTemplate.convertAndSend("log.fanout", "", "broadcast message");
}
}5.4 路由模式
- 使用
direct交换机 - 根据精确的
routingKey路由
5.4.1 配置示例
java
@Configuration
public class DirectConfig {
@Bean
public DirectExchange orderDirectExchange() {
return new DirectExchange("order.direct");
}
@Bean
public Queue createQueue() {
return new Queue("order.create.queue");
}
@Bean
public Queue cancelQueue() {
return new Queue("order.cancel.queue");
}
@Bean
public Binding createBinding() {
return BindingBuilder.bind(createQueue()).to(orderDirectExchange()).with("order.create");
}
@Bean
public Binding cancelBinding() {
return BindingBuilder.bind(cancelQueue()).to(orderDirectExchange()).with("order.cancel");
}
}java
@Service
public class DirectProducer {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendCreate() {
rabbitTemplate.convertAndSend("order.direct", "order.create", "create order");
}
public void sendCancel() {
rabbitTemplate.convertAndSend("order.direct", "order.cancel", "cancel order");
}
}5.5 Topic 模式
- 使用
topic交换机 - 支持通配符路由
5.5.1 配置示例
java
@Configuration
public class TopicConfig {
@Bean
public TopicExchange orderTopicExchange() {
return new TopicExchange("order.topic");
}
@Bean
public Queue orderAllQueue() {
return new Queue("order.all.queue");
}
@Bean
public Queue orderCreateQueue() {
return new Queue("order.create.only.queue");
}
@Bean
public Binding orderAllBinding() {
// 匹配所有 order 开头的消息
return BindingBuilder.bind(orderAllQueue()).to(orderTopicExchange()).with("order.#");
}
@Bean
public Binding orderCreateBinding() {
// 只匹配 order.create
return BindingBuilder.bind(orderCreateQueue()).to(orderTopicExchange()).with("order.create");
}
}java
@Service
public class TopicProducer {
@Resource
private RabbitTemplate rabbitTemplate;
public void send() {
rabbitTemplate.convertAndSend("order.topic", "order.create", "topic create message");
rabbitTemplate.convertAndSend("order.topic", "order.pay.success", "topic pay success");
}
}5.6 Headers 模式
虽然业务中使用较少,但 RabbitMQ 还支持 headers 交换机。
特点:
- 按消息头匹配
- 不依赖
routingKey
5.6.1 配置示例
java
@Configuration
public class HeadersConfig {
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange("report.headers");
}
@Bean
public Queue headersQueue() {
return new Queue("report.queue");
}
@Bean
public Binding headersBinding() {
return BindingBuilder.bind(headersQueue())
.to(headersExchange())
.where("type").matches("pdf");
}
}5.7 五种模式总览
| 模式 | 特点 | 适用场景 |
|---|---|---|
| 简单模式 | 最基础 | 入门学习 |
| 工作队列 | 多消费者竞争消费 | 任务处理 |
| 发布订阅 | 广播 | 通知、日志 |
| 路由模式 | 精确路由 | 分类处理 |
| Topic 模式 | 模糊匹配 | 复杂业务路由 |
5.8 消息模型学习重点
- 简单模式
- 工作队列模式
- 发布订阅模式
- 路由模式
- Topic 模式
- Headers 模式
六、交换机与路由模式
6.1 四种常见交换机
6.1.1 direct
- 精确匹配
routingKey
6.1.2 topic
- 模糊匹配
routingKey *匹配一个单词#匹配多个单词
6.1.3 fanout
- 广播模式
- 忽略
routingKey
6.1.4 headers
- 根据消息头匹配
- 实际项目中用得较少
6.2 Topic 示例理解
例如绑定规则:
order.*order.#
消息路由键:
order.createorder.pay.success
匹配规则:
order.*只能匹配order.createorder.#两个都能匹配
6.3 交换机模式学习重点
- direct
- topic
- fanout
- headers
6.4 各种路由模式适用场景
6.4.1 direct 场景
- 订单创建
- 订单取消
- 精确业务分类
6.4.2 topic 场景
- 订单相关所有事件
- 日志分级分类
- 多层级业务路由
6.4.3 fanout 场景
- 广播通知
- 配置刷新通知
- 操作日志多系统同步
6.4.4 headers 场景
- 基于消息属性做精细化分类
- 特殊规则系统
七、消息可靠性与确认机制
7.1 为什么需要消息可靠性
消息链路可能在任意环节丢失:
- 生产者发送失败
- Exchange 没收到
- Queue 没成功持久化
- 消费者处理失败
7.2 RabbitMQ 可靠性链路
消息可靠性可以拆成三个阶段:
- 生产者把消息成功发到交换机
- 交换机把消息成功路由到队列
- 消费者把消息成功处理完成
7.3 生产者消息确认
生产者可以通过 publisher-confirm-type 确认消息是否到达 Broker。
yaml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated
publisher-returns: true7.4 ConfirmCallback 示例
java
@Configuration
public class RabbitTemplateConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息成功到达交换机");
} else {
System.out.println("消息没有到达交换机: " + cause);
}
});
rabbitTemplate.setReturnsCallback(returned -> {
System.out.println("消息没有路由到队列: " + returned.getMessage());
});
return rabbitTemplate;
}
}7.5 mandatory / return 机制说明
confirm:确认消息是否到达交换机return:确认消息是否从交换机成功路由到队列
二者配合才能更完整地确认生产端链路。
7.6 消费者手动确认
yaml
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manualjava
@Component
public class OrderConsumer {
@RabbitListener(queues = "order.queue")
public void receive(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 处理业务逻辑
System.out.println("收到消息: " + new String(message.getBody(), StandardCharsets.UTF_8));
// 处理成功后手动 ack
channel.basicAck(deliveryTag, false);
} catch (Exception ex) {
// 处理失败后可以拒绝并决定是否重回队列
channel.basicNack(deliveryTag, false, true);
}
}
}7.7 basicAck、basicNack、basicReject 区别
basicAck:确认成功消费basicNack:确认消费失败,可批量,可选择是否重回队列basicReject:确认消费失败,不支持批量
7.8 消息持久化
可靠性增强常见做法:
- 交换机持久化
- 队列持久化
- 消息持久化
重点:持久化不是百分百不丢消息,但它是可靠性的基础配置。
7.9 生产端可靠性建议
- 开启
confirm - 开启
return - 发送失败要记录日志或落库补偿
- 重要消息可配合本地消息表
7.10 消费端可靠性建议
- 使用手动
ack - 消费失败明确决定是否重回队列
- 业务逻辑一定要幂等
- 做好异常日志和告警
八、死信队列、延迟队列与幂等性
8.1 什么是死信队列
消息变成死信的常见原因:
- 被拒绝且不重回队列
- 消息过期
- 队列达到最大长度
8.2 死信队列配置示例
java
@Configuration
public class DeadLetterConfig {
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue")
// 指定死信交换机
.deadLetterExchange("order.dlx")
// 指定死信路由键
.deadLetterRoutingKey("order.dead")
.build();
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("order.dlx");
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("order.dead.queue").build();
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("order.dead");
}
}8.3 死信消息常见场景
- 订单超时未支付
- 消费失败进入异常队列
- 需要人工介入的补偿消息
8.4 延迟队列思路
RabbitMQ 原生没有真正的延迟队列概念,常见做法有:
- TTL + 死信队列
- 安装 delayed-message 插件
8.5 TTL + 死信实现延迟取消订单
text
下单成功 -> 发送一条 30 分钟过期消息
-> 消息先进入普通队列
-> 过期后进入死信队列
-> 消费死信队列执行取消订单8.6 TTL 延迟队列配置示例
java
@Configuration
public class DelayConfig {
@Bean
public Queue ttlQueue() {
return QueueBuilder.durable("order.ttl.queue")
// 统一设置消息过期时间,单位毫秒
.ttl(30 * 60 * 1000)
.deadLetterExchange("order.dlx")
.deadLetterRoutingKey("order.dead")
.build();
}
}8.7 delayed-message 插件方案
如果安装了 rabbitmq_delayed_message_exchange 插件,可以更灵活地设置消息级别延迟。
java
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("order.delayed.exchange", "x-delayed-message", true, false, args);
}8.8 幂等性问题
消费者可能会出现重复消费。
常见原因:
- 消费成功但 ack 丢失
- 网络抖动
- 消费失败重试
8.9 幂等性处理建议
- 数据库唯一索引
- 业务状态机判断
- Redis 防重
- 去重表
8.10 死信、延迟、幂等性学习重点
- 死信消息产生原因
- 死信队列配置
- TTL 延迟队列
- 插件延迟消息
- 消费者幂等性
九、Spring Boot 整合 RabbitMQ
9.1 引入依赖
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<!-- RabbitMQ Spring Boot 集成依赖 -->
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>9.2 基础配置
yaml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true9.3 队列、交换机、绑定声明
java
@Configuration
public class RabbitConfig {
@Bean
public DirectExchange orderExchange() {
// 订单直连交换机
return new DirectExchange("order.exchange", true, false);
}
@Bean
public Queue orderQueue() {
// 订单队列
return new Queue("order.queue", true);
}
@Bean
public Binding orderBinding() {
// 用 routingKey=order.create 绑定队列和交换机
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with("order.create");
}
}9.4 生产者发送消息
java
@Service
public class OrderProducer {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendOrderMessage(Long orderId) {
// 发送订单创建消息
rabbitTemplate.convertAndSend("order.exchange", "order.create", "orderId=" + orderId);
}
}9.5 消费者接收消息
java
@Component
public class OrderMessageConsumer {
@RabbitListener(queues = "order.queue")
public void receive(String msg) {
System.out.println("消费者收到消息: " + msg);
}
}9.6 JSON 消息示例
java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderMessage {
private Long orderId;
private String type;
}java
@Service
public class JsonOrderProducer {
@Resource
private RabbitTemplate rabbitTemplate;
public void send(OrderMessage orderMessage) {
// 发送 JSON 结构消息
rabbitTemplate.convertAndSend("order.exchange", "order.create", orderMessage);
}
}9.7 Spring AMQP 学习重点
- 队列声明
- 交换机声明
- 绑定关系
- 生产者发送
- 消费者监听
- 确认机制
9.8 典型业务场景示例
9.8.1 下单后异步通知
- 订单服务发送下单消息
- 短信服务、积分服务异步消费
9.8.2 秒杀削峰
- 请求先写入 MQ
- 后端消费者按能力慢慢处理
9.8.3 延迟取消订单
- 下单成功发送延迟消息
- 超时未支付则自动取消
9.8.4 日志异步处理
- 系统日志先写 MQ
- 日志服务异步消费入库
十、面试重点与最佳实践
10.1 高频面试题
- RabbitMQ 为什么能解耦
- Exchange 和 Queue 的关系
- 五种消息模型有哪些
- direct、topic、fanout 的区别
- 消息为什么会丢失
- 如何保证消息可靠性
- 如何解决重复消费
- 死信队列和延迟队列怎么实现
10.2 最佳实践
- 生产者开启 confirm 和 return
- 消费者尽量使用手动 ack
- 重要消息做好持久化
- 消费者业务必须考虑幂等性
- 合理监控消息积压
- 延迟任务优先考虑 TTL + 死信方案
10.3 总结
RabbitMQ 是 Java 后端里非常常见的消息中间件。
- 入门阶段要掌握队列、交换机、路由
- 进阶阶段要掌握五种模型和确认机制
- 高阶阶段要掌握死信队列、延迟队列、幂等性和可靠性