前面 4 篇笔记对RabbitMQ
做了初步的描述:简单入门
、交换机类型 + 手动 ack
、生产者确认
、延迟队列
。那么消息中间件如何处理消费失败的消息?假设有两个系统 A系统 和 B系统,A系统是生产者,B系统则是消费者。A系统负责发送消息到 MQ
,B系统负责从 MQ
拿到消息后进行消费,AB双方互不干涉互不影响;A不管也不需要知道消费者是谁,B也不需要知道生产者是谁,这样的通信方式,就是所谓的“异步
”通信方式。它的好处就是可以实现系统之间的解耦
,提高整套系统的容错性。
[TOC]
生产案例:购物平台
以淘宝为例,从我们选择商品到下单、付款,再到快递员派送。快递到了手之前,它有多少流程呢?其它的业务就不提了,就说说 MQ
的使用场景吧。就提两种吧,其一:用户下单后,如果 30 分钟内完成付款,订单就会自动取消,这个功能可以通过 MQ
的延迟队列来实现。其二:用户下单并完成付款后,订单系统会生产一条 MQ
消息,而仓库系统负责消费这条消息,调用独立仓库系统发货和通知第三方物流系统进行配送,为了防止仓库系统出错导致消息丢失,我们需要使用 MQ
的死信队列提升系统的健壮性。
核心问题:
如果消费者服务在消费消息途中失败了,这种情况怎么处理?
解决方案:
使用死信队列处理失败的消息~~
一般生产环境中,如果你有丰富的架构设计经验,都会在使用 MQ
的时候设计两个队列:一个是核心业务队列
,一个是死信队列
。核心业务队列,比如上面的订单系统到仓库系统;死信队列,用来处理异常情况的。
RabbitMQ 持久化
持久化
可以提高 RabbitMQ
的可靠性,以防在异常情况下(重启、关闭、宕机)数据丢失。RabbitMQ
的持久化:交换机的持久化(对于一个长期使用的交换机来说,建议将其设置为持久化)、队列的持久化和消息的持久化。“皮之不存,毛将焉附”,只设置消息的持久化,而忽略了交换机和队列,那也起不了作用,我们必须将交换机、队列、消息
三个都设置为持久化,才能保证消息的持久化。通过消息的投递模式(MessageProperties
中的 DeliveryMode
)即可实现消息的持久化。SpringBoot
已经将这些信息封装好了。
注意:
可以将所有的消息都设置为持久化,但是这样会严重影响 RabbitMQ
的性能,写入磁盘的速度比写入内存的速度慢得不是一点点。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。在选择是否要消息持久化时,需要在可靠性和吞吐量之间做一个权衡。
环境/版本一览:
- 开发工具:Intellij IDEA 2018.2.2
- springboot: 2.1.6.RELEASE
- jdk:1.8.0_171
- maven:3.3.9
- spring-boot-starter-amqp: 2.1.6.RELEASE
1、pom.xml
fastjson
:用于反序列化spring-boot-starter-amqp
:RabbitMQ 依赖
1 | <dependencies> |
2、application.yml
1 | spring: |
3、utils
1 | package com.fatal.utils; |
4、config
ExceptionHandlerExchange
:异常处理交换机(死信交换机),接收消息失败的消息ExceptionHandlerQueue
:异常处理队列(死信队列)TaskExchange
:任务交换机(Direct Exchange)TaskQueue
:任务队列RabbitTemplate
:自定义RabbitTemplate
模板,将序列化方式由默认的Jdk 序列化
改为Json序列化
SimpleRabbitListenerContainerFactory
:自定义RabbitMQ
监听容器工厂,在默认配置的基础上将序列化方式由默认的Jdk 序列化
改为Json序列化
1 | package com.fatal.config; |
5、entity
1 | package com.fatal.entity; |
6、consumer
6.1、TaskConsumer
1 | package com.fatal.consumer; |
6.2、ExceptionHandlingConsumer
exceptionHandlingConsumer
:定时器,用于处理 由第三方物流系统异常引起的消费端消费失败 的消息。
1 | package com.fatal.consumer; |
7、controller
1 | package com.fatal.controller; |
8、Application
1 | package com.fatal; |
9、测试
启动项目
TaskConsumer
Ack 成功
TaskConsumer
Ack 失败,Nack 成功
消息在死信队列中消费了
TaskConsumer
Ack 成功,Nack 失败
消息被压回队列重新消费
笔记
基础概念
Broker:简单来说就是消息队列服务器实体
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
Queue:消息队列载体,每个消息都会被投入到一个或多个队列
Binding:绑定,它的作用就是把exchange
和queue
按照路由规则绑定起来,在绑定的时候一定会指定一个绑定键“Binding Key
”
Routing Key:路由关键字,exchange
根据这个关键字进行消息投递
vhost:虚拟主机,一个broker
里可以开设多个vhost
,用作不同用户的权限分离
producer:消息生产者,就是投递消息的程序
consumer:消息消费者,就是接受消息的程序
channel:消息通道,在客户端的每个连接里,可建立多个channel
,每个channel
代表一个会话任务
注解
@Payload:指定被修饰的参数来接收消息的 body
方法
basicNack
deliveryTag
:指定队列要拒绝的已接收消息的标签(也叫传递标签)。新的队列默认的传递标签为0,代表接收过0条消息;队列接收消息后,传递标签会从0开始累加。(传递标签de值也可以看成该队列接收的第n条消息)multiple
:- true:用于拒绝提供的传递标签之前(包括提供的传递标签)指向的所有消息
- false:仅拒绝提供的传递标签指向的那条消息
requeue
:- true:拒绝的消息重新排队
- false:判断该队列是否有绑定死信交换机,没有则丢弃;有则转移到死信交换机做后续处理
basicReject
deliveryTag
:同上requeue
:同上
basicRecover
requeue
:- true:消息将被重新排队并可能传递给其他使用者(默认)
- false:消息将被重新排队并传递给同一使用者
参考资料
一起来学SpringBoot | 第十三篇:RabbitMQ延迟队列
RabbitMQ 实战指南
总结
SpringBoot
的知识已经有前辈在我们之前探索了。比较喜欢的博主有:唐亚峰 | Battcn、方志朋的专栏、程序猿DD、纯洁的微笑。对这门技术感兴趣的可以去他们的博客逛逛。谢谢他们的分享~~
以上文章是我用来学习的Demo
,都是基于 SpringBoot2.x
版本。
源码地址: https://github.com/ynfatal/springboot2-learning/tree/master/chapter19_5