SpringBoot2 | 第十九篇:整合RabbitMQ(生产案例)

​ 前面 4 篇笔记对RabbitMQ做了初步的描述:简单入门交换机类型 + 手动 ack生产者确认延迟队列。那么消息中间件如何处理消费失败的消息?假设有两个系统 A系统B系统,A系统是生产者,B系统则是消费者。A系统负责发送消息到 MQ,B系统负责从 MQ 拿到消息后进行消费,AB双方互不干涉互不影响;A不管也不需要知道消费者是谁,B也不需要知道生产者是谁,这样的通信方式,就是所谓的“异步”通信方式。它的好处就是可以实现系统之间的解耦,提高整套系统的容错性。

[TOC]

生产案例:购物平台

​ 以淘宝为例,从我们选择商品到下单、付款,再到快递员派送。快递到了手之前,它有多少流程呢?其它的业务就不提了,就说说 MQ 的使用场景吧。就提两种吧,其一:用户下单后,如果 30 分钟内完成付款,订单就会自动取消,这个功能可以通过 MQ 的延迟队列来实现。其二:用户下单并完成付款后,订单系统会生产一条 MQ 消息,而仓库系统负责消费这条消息,调用独立仓库系统发货和通知第三方物流系统进行配送,为了防止仓库系统出错导致消息丢失,我们需要使用 MQ 的死信队列提升系统的健壮性。

核心问题:

如果消费者服务在消费消息途中失败了,这种情况怎么处理?

解决方案:

使用死信队列处理失败的消息~~

一般生产环境中,如果你有丰富的架构设计经验,都会在使用 MQ 的时候设计两个队列:一个是核心业务队列,一个是死信队列。核心业务队列,比如上面的订单系统仓库系统;死信队列,用来处理异常情况的。

RabbitMQ 持久化

持久化可以提高 RabbitMQ的可靠性,以防在异常情况下(重启、关闭、宕机)数据丢失。RabbitMQ 的持久化:交换机的持久化(对于一个长期使用的交换机来说,建议将其设置为持久化)、队列的持久化消息的持久化。“皮之不存,毛将焉附”,只设置消息的持久化,而忽略了交换机和队列,那也起不了作用,我们必须将交换机、队列、消息三个都设置为持久化,才能保证消息的持久化。通过消息的投递模式(MessageProperties 中的 DeliveryMode)即可实现消息的持久化。SpringBoot已经将这些信息封装好了。

注意:

可以将所有的消息都设置为持久化,但是这样会严重影响 RabbitMQ 的性能,写入磁盘的速度比写入内存的速度慢得不是一点点。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。在选择是否要消息持久化时,需要在可靠性吞吐量之间做一个权衡。

环境/版本一览:

  • 开发工具:Intellij IDEA 2018.2.2
  • springboot: 2.1.6.RELEASE
  • jdk:1.8.0_171
  • maven:3.3.9
  • spring-boot-starter-amqp: 2.1.6.RELEASE

1、pom.xml

  • fastjson:用于反序列化
  • spring-boot-starter-amqp:RabbitMQ 依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

2、application.yml

1
2
3
4
5
6
7
8
9
10
11
spring:
rabbitmq:
username: guest # 默认为`guest`
password: guest # 默认为`guest`
host: localhost # 默认为`localhost`
port: 5672 # 默认为 5672
virtual-host: /
# 手动ACK,不开启自动ACK,目的是为了防止报错后未正确处理消息丢失,默认为`none`,不 ACK
listener:
simple:
acknowledge-mode: manual

3、utils

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.fatal.utils;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.GetResponse;

/**
* RabbitMQ 工具类
* @author: Fatal
* @date: 2019/7/19 0019 11:08
*/
public class RabbitMQUtil {

/**
* Json 反序列化
* @param getResponse
* @param clazz
* @param <T>
* @return
*/
@SuppressWarnings("unchecked")
public static <T> T parseObject(GetResponse getResponse, Class<T> clazz) {
byte[] body = getResponse.getBody();
return (T) JSONObject.parseObject(body, clazz);
}

}

4、config

  • ExceptionHandlerExchange:异常处理交换机(死信交换机),接收消息失败的消息
  • ExceptionHandlerQueue:异常处理队列(死信队列)
  • TaskExchange:任务交换机(Direct Exchange)
  • TaskQueue:任务队列
  • RabbitTemplate:自定义 RabbitTemplate 模板,将序列化方式由默认的 Jdk 序列化 改为 Json序列化
  • SimpleRabbitListenerContainerFactory:自定义 RabbitMQ 监听容器工厂,在默认配置的基础上将序列化方式由默认的 Jdk 序列化 改为 Json序列化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package com.fatal.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
* RabbitMQ 配置类
* @author: Fatal
* @date: 2019/7/12 0012 8:51
*/
@Configuration
public class RabbitMQConfig {

/**
* 任务相关配置
*/
public static final String TASK_QUEUE = "task_queue";
public static final String TASK_EXCHANGE = "task_exchange";
public static final String TASK_ROUTING_KEY = "task";

/**
* 异常处理相关配置
*/
public static final String EXCEPTION_HANDLING_QUEUE_NAME = "exception_handling_queue_name";
private static final String EXCEPTION_HANDLING_EXCHANGE_NAME = "exception_handling_exchange_name";
// 一般使用原队列的路由键
private static final String EXCEPTION_HANDLING_ROUTING_KEY = "task";

/**
* 延迟队列中用来关联`死信交换机`的交换机键名,路由键键名(这是参数值是`固定`的)
*/
private static final String X_DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange";
private static final String X_DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";

// ************************ JSON 序列化与反序列化 ***********************

/**
* RabbitTemplate 的消息转换器设置为 Jackson2JsonMessageConverter(默认转换器是 SimpleMessageConverter)
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setConnectionFactory(connectionFactory);
return rabbitTemplate;
}

/**
* 在 SpringBoot 整合 RabbitMQ 的组件 SimpleRabbitListenerContainerFactory
* 默认配置的基础上,将 MessageConverter(默认是 SimpleMessageConverter,该转换器基于 Jdk 序列化)
* 设置为 Jackson2JsonMessageConverter(基于 Json 序列化,性能更好)
* 下面的组件是从源码中找的,我们在这个组件的基础上进行修改的话,可以保留配置的属性。要是写个新的,很多
* 配置就没意义了,比如,你在外面设置的手动ack没有了。它用的还是默认的 no ack。
* @param configurer
* @param connectionFactory
* @return
*/
@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple",
matchIfMissing = true)
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setMessageConverter(new Jackson2JsonMessageConverter());
configurer.configure(factory, connectionFactory);
return factory;
}

// ********************** JSON 序列化与反序列化 end *********************

// ************************ 异常处理交换机相关配置 ***********************

/**
* 异常处理队列
*/
@Bean
public Queue exceptionHandlerQueue() {
return new Queue(EXCEPTION_HANDLING_QUEUE_NAME);
}

/**
* 异常处理交换机
*/
@Bean
public DirectExchange exceptionHandlerExchange() {
return new DirectExchange(EXCEPTION_HANDLING_EXCHANGE_NAME);
}

/**
* 绑定组件
* @desc: 将`死信队列`、`死信交换机(DLX)`、`DEAD_LETTER_ROUTING_KEY`路由键 三者绑定
*/
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(exceptionHandlerQueue())
.to(exceptionHandlerExchange())
.with(EXCEPTION_HANDLING_ROUTING_KEY);
}

// ********************** 异常处理交换机相关配置(end) **********************

// ************************** Task交换机相关配置 **************************

/**
* 任务队列
* @desc: 与`死信交换机`绑定,并指定`死信`携带的路由键,不需要设置消息的过期时间。
* 消息只要是被 basicReject(requeue:true)、basicNack(requeue:true) 回来的,
* 都会被转移到死信交换机那边。
*/
@Bean
public Queue taskQueue() {
Map<String, Object> configs = new HashMap<>();
configs.put(X_DEAD_LETTER_EXCHANGE, EXCEPTION_HANDLING_EXCHANGE_NAME);
configs.put(X_DEAD_LETTER_ROUTING_KEY, EXCEPTION_HANDLING_ROUTING_KEY);
return new Queue(TASK_QUEUE, true, false, false, configs);
}

/**
* 任务交换机
*/
@Bean
public DirectExchange taskExchange() {
return new DirectExchange(TASK_EXCHANGE);
}

/**
* 绑定组件
*/
@Bean
public Binding taskBinding() {
return BindingBuilder.bind(taskQueue())
.to(taskExchange())
.with(TASK_ROUTING_KEY);
}

// ************************ Task交换机相关配置(end) ************************

}

5、entity

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.fatal.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;

import java.io.Serializable;
import java.util.Date;

/**
* @author: Fatal
* @date: 2019/7/12 0012 8:54
*/
@Data
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {

private Long id;
private String username;
private String password;
private Date birthday;

}

6、consumer

6.1、TaskConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.fatal.consumer;

import com.fatal.config.RabbitMQConfig;
import com.fatal.entity.User;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Random;

/**
* @author: Fatal
* @date: 2019/7/12 0012 9:05
*/
@Slf4j
@Component
public class TaskConsumer {

private Random random = new Random();

@RabbitListener(queues = RabbitMQConfig.TASK_QUEUE)
@Transactional(rollbackFor = Exception.class)
public void taskConsumer(@Payload User user, Message message, Channel channel) throws Exception {
log.info("【taskConsumer 消费消息】 - [消费时间] - [{}] - [{}]", LocalDateTime.now(), user);
try {
// 模拟业务异常
// int i = 1/0;
// 模拟 Ack 失败
int number = random.nextInt(100);
if (number % 2 == 0) {
throw new IOException("taskConsumer Ack失败");
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
// TODO Ack失败的后续处理
log.error("【taskConsumer Ack失败】 time = {}", LocalDateTime.now());
// 消息转移
basicNack(message, channel);
// 抛出异常,让消费端事务回滚
throw new IOException(e);
} catch (Exception e) {
// TODO 业务异常的后续处理
log.error("【taskConsumer 消费失败,业务异常】 time = {}", LocalDateTime.now());
// 消息转移
basicNack(message, channel);
// 抛出异常,让消费端事务回滚
throw new RuntimeException(e);
}
}


/**
* 将消费失败的信息转移到死信队列
* @param message
* @param channel
* @throws IOException
*/
private void basicNack(Message message, Channel channel) throws IOException {
try {
// 模拟 Nack 失败
int number = random.nextInt(100);
if (number % 2 == 0) {
throw new IOException();
}
/**
* 拒绝消息。multiple 设置为 true,如果调用此方法过程中报错了,消息就会变为 unacked 状态,那么下次调用就需要
* 拒绝传递标签之前(包括提供的传递标签)的所有消息(当然,如果只有一条消费失败的消息运气不好,因为网络问题调用不了
* basicNack 方法,那么这条消息就会变成 unacked 状态,那怎么办呢?目前我的想法是给消息设置个过期时间)
* @method void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException
* @deliveryTag 指定队列要拒绝的已接收消息的标签(也叫传递标签)。新的队列默认的传递标签为0,代表接收过0条消息;
* 队列接收消息后,传递标签会从0开始累加。(传递标签de值也可以看成该队列接收的第n条消息)
* @multiple true: 用于拒绝提供的传递标签之前(包括提供的传递标签)指向的所有消息;
* false: 仅拒绝提供的传递标签指向的那条消息
* @requeue true: 拒绝的消息重新排队
* false: 判断该队列是否有绑定死信交换机,没有则丢弃;有则转移到死信交换机做后续处理
* @desc basicNack 与 basicReject 的唯一区别:basicNack 可以选择拒绝传递标签之前(包括提供的传递标签)的所有消息
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false);
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("【taskConsumer Nack失败】 time = {}", LocalDateTime.now());
/**
* basicRecover 在这里解决消费端ack和nack失败和断网问题。怎么说呢?
* 1. ack和nack失败 通过重新排队,直到这两个方法正常调用为止
* 2. 断网问题 消费失败的消息会变成 unacked,那么当网络正常了,系统重新启动会再次消费 这个 unacked 的消息。
* @method Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
* @requeue true: 消息将被重新排队并可能传递给其他使用者(默认)
* false: 消息将被重新排队并传递给同一使用者
*/
channel.basicRecover();
}
}
}

6.2、ExceptionHandlingConsumer

exceptionHandlingConsumer:定时器,用于处理 由第三方物流系统异常引起的消费端消费失败 的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.fatal.consumer;

import com.fatal.config.RabbitMQConfig;
import com.fatal.entity.User;
import com.fatal.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Optional;
import java.util.Random;

/**
* RabbitMQ 异常处理消费者
* @desc: 处理消费失败的消息
* @author: Fatal
* @date: 2019/7/12 0012 17:18
*/
@Slf4j
@Component
public class ExceptionHandlingConsumer {

private Random random = new Random();

private Channel channel;

public ExceptionHandlingConsumer(CachingConnectionFactory cachingConnectionFactory) {
Connection connection = cachingConnectionFactory.createConnection();
this.channel = connection.createChannel(false);
}

/**
* 定时器判断第三方物流系统是否正常
* 若正常,则从死信队列拿到消息进行消费
* @throws Exception
*/
@Scheduled(fixedDelay = 2000)
@Transactional(rollbackFor = Exception.class)
public void exceptionHandlingConsumer() throws Exception {
// 判断第三方系统是否正常了。
if (!listener()) {
log.warn("【exceptionHandlingConsumer 监听第三方物流系统】 - [监听时间] - [{}] 第三方系统异常,重试", LocalDateTime.now());
return;
}
// 主动取队列去消息,并获得 GetResponse
GetResponse getResponse = channel.basicGet(RabbitMQConfig.EXCEPTION_HANDLING_QUEUE_NAME, false);
if (!Optional.ofNullable(getResponse).isPresent()) {
log.info("【exceptionHandlingConsumer 监听队列 {}】 - [监听时间] - [{}] 暂无消息",
RabbitMQConfig.EXCEPTION_HANDLING_QUEUE_NAME, LocalDateTime.now());
return;
}
try {
// 获得消息
User user = RabbitMQUtil.parseObject(getResponse, User.class);
log.info("【exceptionHandlingConsumer 监听到消息】 - [操作时间] - [{}] -[{}]", LocalDateTime.now(), user);
// 模拟业务异常
// int i = 1/0;
// 模拟 Ack 失败
/*int number = random.nextInt(100);
if (number % 2 == 0) {
throw new IOException("Ack失败");
}*/
channel.basicAck(getResponse.getEnvelope().getDeliveryTag(), false);
} catch (IOException e) {
log.error("【exceptionHandlingConsumer Ack失败】 time = {}", LocalDateTime.now());
e.printStackTrace();
// 消息将被重新排队
channel.basicRecover();
// 抛出异常,让消费端事务回滚
throw new IOException(e);
} catch (Exception e) {
log.error("【exceptionHandlingConsumer 消费失败,业务异常】 time = {}", LocalDateTime.now());
// 业务出现异常,保存到数据库后并手动 ack
channel.basicAck(getResponse.getEnvelope().getDeliveryTag(), false);
// 抛出异常,让消费端事务回滚
throw new RuntimeException(e);
}
}

/**
* 模拟监听第三方是否正常
* @return
*/
private boolean listener() {
int number = random.nextInt(100);
if (number % 2 == 0) {
return true;
}
return false;
}

}

7、controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.fatal.controller;

import com.fatal.config.RabbitMQConfig;
import com.fatal.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.util.Date;

/**
* @author: Fatal
* @date: 2019/7/12 0012 8:56
*/
@Slf4j
@RestController
@RequestMapping("/task")
public class TaskController {

@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping
public void task() {
User user = new User()
.setUsername("fatal")
.setPassword("123456")
.setBirthday(new Date())
.setId(1L);
log.info("[Task 发送时间] - [{}]", LocalDateTime.now());
rabbitTemplate.convertAndSend(RabbitMQConfig.TASK_EXCHANGE, RabbitMQConfig.TASK_ROUTING_KEY, user, message -> {
/**
* 将发送的消息设置为持久化
*/
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
});
}

}

8、Application

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.fatal;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

// 开启定时任务
@EnableScheduling
@SpringBootApplication
public class Chapter195Application {

public static void main(String[] args) {
SpringApplication.run(Chapter195Application.class, args);
}

}

9、测试

启动项目

访问 http://localhost:8080/task

TaskConsumer Ack 成功

1563515422790

TaskConsumer Ack 失败,Nack 成功

消息在死信队列中消费了

1563515512291

TaskConsumer Ack 成功,Nack 失败

消息被压回队列重新消费

1563515265014

笔记

基础概念

Broker:简单来说就是消息队列服务器实体
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
Queue:消息队列载体,每个消息都会被投入到一个或多个队列
Binding:绑定,它的作用就是把exchangequeue按照路由规则绑定起来,在绑定的时候一定会指定一个绑定键“Binding Key
Routing Key:路由关键字,exchange根据这个关键字进行消息投递
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离
producer:消息生产者,就是投递消息的程序
consumer:消息消费者,就是接受消息的程序
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

注解

@Payload:指定被修饰的参数来接收消息的 body

方法

basicNack

  • deliveryTag:指定队列要拒绝的已接收消息的标签(也叫传递标签)。新的队列默认的传递标签为0,代表接收过0条消息;队列接收消息后,传递标签会从0开始累加。(传递标签de值也可以看成该队列接收的第n条消息)
  • multiple
    • true:用于拒绝提供的传递标签之前(包括提供的传递标签)指向的所有消息
    • false:仅拒绝提供的传递标签指向的那条消息
  • requeue
    • true:拒绝的消息重新排队
    • false:判断该队列是否有绑定死信交换机,没有则丢弃;有则转移到死信交换机做后续处理

basicReject

  • deliveryTag:同上
  • requeue:同上

basicRecover

  • requeue
    • true:消息将被重新排队并可能传递给其他使用者(默认)
    • false:消息将被重新排队并传递给同一使用者

参考资料

【真实生产案例】消息中间件如何处理消费失败的消息?

一起来学SpringBoot | 第十三篇:RabbitMQ延迟队列

RabbitMQ 实战指南

总结

SpringBoot的知识已经有前辈在我们之前探索了。比较喜欢的博主有:唐亚峰 | Battcn方志朋的专栏程序猿DD纯洁的微笑。对这门技术感兴趣的可以去他们的博客逛逛。谢谢他们的分享~~

以上文章是我用来学习的Demo,都是基于 SpringBoot2.x 版本。

源码地址: https://github.com/ynfatal/springboot2-learning/tree/master/chapter19_5

学习 唐亚峰 前辈的经验