SpringBoot2 | 第十九篇:整合RabbitMQ(延迟队列)

​ 本篇文章写 RabbitMQ 一个强大的功能 延迟队列。 在AMQP协议中,或者 RabbitMQ 本身没有直接支持延迟队列的功能, 但是它可以通过过期时间(TTL)死信交换机(DLX)模拟出延迟队列的功能。

[TOC]

延迟队列

延迟队列存储的对象是对应的延迟消息。所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是在等待指定的时间后,消费者才能拿到这个消息进行消费。

延迟队列能做什么?

  • 订单业务: 在电商/点餐中,都有下单后 30 分钟内没有付款,就自动取消订单。
  • 短信通知: 下单成功后 60s 之后给用户发送短信通知。
  • 失败重试: 业务操作失败后,间隔一定的时间进行失败重试。

​ 这类业务的特点就是:非实时的,需要延迟处理,需要进行失败重试。一种比较笨的方式是采用定时任务,轮询数据库,方法简单好用,但性能底下,在高并发情况下容易弄死数据库,间隔时间不好设置,时间过大,影响精度,过小影响性能,而且做不到按超时的时间顺序处理。另一种就是用Java中的DelayQueue 位于java.util.concurrent包下,本质是由PriorityQueueBlockingQueue实现的阻塞优先级队列。这玩意最大的问题就是不支持分布式与持久化

1562890837061

Time-To-Live Extensions

RabbitMQ支持为队列或者消息设置TTL(time to live 存活时间)。TTL表明了一条消息可在队列中存活的最大时间。当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在TTL时间后死亡成为Dead Letter。==如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用==(一般情况下只需要设置其中一个就行了)。

Dead Letter Exchange

死信交换机,上文中提到设置了 TTL 的消息或队列最终会成为Dead Letter。如果为队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新发送到Dead Letter Exchange中,然后通过Dead Letter Exchange路由到其他队列,即可实现延迟队列的功能。

消息变成死信一般由一下三种情况:

  • 消息被拒绝(Basic.Reject/Basic.Nack),并设置 requeue 参数为 false
  • 消息过期
  • 队列达到最大长度

这篇笔记用的是消息过时,下一篇用的是消息过时 + 消息被拒绝

环境/版本一览:

  • 开发工具:Intellij IDEA 2018.2.2
  • springboot: 2.0.6.RELEASE
  • jdk:1.8.0_171
  • maven:3.3.9
  • spring-boot-starter-amqp: 2.0.6.RELEASE

1、pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

2、application.yml

1
2
3
4
5
6
7
8
9
10
11
spring:
rabbitmq:
username: guest # 默认为`guest`
password: guest # 默认为`guest`
host: localhost # 默认为`localhost`
port: 5672 # 默认为 5672
virtual-host: /
# 手动ACK,不开启自动ACK,目的是为了防止报错后未正确处理消息丢失,默认为`none`,不 ACK
listener:
simple:
acknowledge-mode: manual

3、config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package com.fatal.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
* RabbitMQ 配置类
* @author: Fatal
* @date: 2018/10/23 0023 11:23
*/
@Slf4j
@Configuration
public class RabbitMQConfig {

/**
* 延迟相关名称
*/
private static final String DELAY_QUEUE_NAME = "delay_queue";
public static final String DELAY_EXCHANGE_NAME = "delay_exchange";
public static final String DELAY_ROUTING_KEY = "delay_book";

/**
* 死信相关名称
*/
public static final String DEAD_LETTER_QUEUE_NAME = "dead_letter_queue";
private static final String DEAD_LETTER_EXCHANGE_NAME = "dead_letter_exchange";
// 一般使用原队列的路由键
private static final String DEAD_LETTER_ROUTING_KEY = "delay_book";

/**
* 延迟队列中用来关联`死信交换机`的交换机键名,路由键键名和存活时间键名(这是参数值是`固定`的)
*/
private static final String X_DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange";
private static final String X_DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
private static final String X_MESSAGE_TTL = "x-message-ttl";

// ************************** 死信交换机相关配置 **************************

/**
* 死信队列
*/
@Bean
public Queue dlxQueue() {
return new Queue(DEAD_LETTER_QUEUE_NAME);
}

/**
* DLX(死信交换机):dead letter发送到的exchange
* @desc: 本质上是一个普通的交换机
*/
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
}

/**
* 绑定组件
* @desc: 将`死信队列`、`死信交换机(DLX)`、`DEAD_LETTER_ROUTING_KEY`路由键 三者绑定
*/
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue())
.to(dlxExchange())
.with(DEAD_LETTER_ROUTING_KEY);
}

// ************************ 死信交换机相关配置(end) ************************

// ************************** 延迟交换机相关配置 **************************

/**
* 延迟队列
* @desc: 与`死信交换机`绑定,并指定`死信`携带的路由键
*/
@Bean
public Queue delayQueue() {
Map<String, Object> configs = new HashMap<>();
// `x-dead-letter-exchange` 关联`DLX(死信交换机)`
configs.put(X_DEAD_LETTER_EXCHANGE, DEAD_LETTER_EXCHANGE_NAME);
// `x-dead-letter-routing-key` 声明了死信在转发时携带的 routing-key。
configs.put(X_DEAD_LETTER_ROUTING_KEY, DEAD_LETTER_ROUTING_KEY);
// `x-message-ttl`设置该队列中消息的存活时间(队列属性设置,队列中所有消息都有相同的过期时间)
// configs.put(X_MESSAGE_TTL, 5 * 1000);
/**
* @param durable 声明持久化队列,则为true。(该队列在服务器重启之后继续存在)
* @param exclusive 如果声明独占队列,则该队列将仅由声明者的连接使用
* @param autoDelete 如果服务器不存在的时候应该将队列删除,则为true
* @param arguments 用于声明队列的参数
*/
return new Queue(DELAY_QUEUE_NAME, true, false, false, configs);
}

/**
* 延迟交换机
*/
@Bean
public DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE_NAME);
}

/**
* 绑定组件
* @desc: 将`延迟队列`、`延迟交换机`、`DELAY_ROUTING_KEY`路由键 三者绑定
*/
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue())
.to(delayExchange())
.with(DELAY_ROUTING_KEY);
}

// ************************ 延迟交换机相关配置(end) ************************

}

4、entity

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.fatal.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;

import java.io.Serializable;

/**
* Book 实体类
* @author: Fatal
* @date: 2018/10/23 0023 12:36
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Book implements Serializable {

private Long id;
private String name;

}

5、consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.fatal.consumer;

import com.fatal.config.RabbitMQConfig;
import com.fatal.entity.Book;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.LocalDateTime;

/**
* Book 消费者
* @author: Fatal
* @date: 2018/10/23 0023 14:16
*/
@Slf4j
@Component
public class BookConsumer {

@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE_NAME)
public void listenerDelayQueue(Book book, Message message, Channel channel) {
log.info("[listenerDelayQueue 监听的消息] - [监听时间] - [{}] - [{}]", LocalDateTime.now(), book);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
// TODO Ack失败的后续处理
log.error("【Ack失败】 time = {}", LocalDateTime.now());
} catch (Exception e) {
// TODO 业务异常的后续处理
log.error("【消费失败,业务异常】 time = {}", LocalDateTime.now());
}
}

}

6、controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.fatal.controller;

import com.fatal.config.RabbitMQConfig;
import com.fatal.entity.Book;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;

/**
* Book 控制器
* @author: Fatal
* @date: 2018/10/23 0023 12:37
*/
@Slf4j
@RestController
@RequestMapping("/books")
public class BookController {

private final RabbitTemplate rabbitTemplate;

/**
* 使用构造方法注入
*/
@Autowired
public BookController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}

@GetMapping
public void send() {
Book book = new Book(1L, "六月与便士");
// 添加延迟队列
this.rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME,
RabbitMQConfig.DELAY_ROUTING_KEY, book, message -> {
/**
* @desc: 对消息本身进行单独设置,每条消息的TTL可以不同。
* @careful: 如果你在`延迟队列`中配置了params.put("x-message-ttl", 5 * 1000);那么下面
* 这句就可以省略,二选一即可
*/
message.getMessageProperties().setExpiration(String.valueOf(5 * 1000));
return message;
});
log.info("[发送时间] - [{}]", LocalDateTime.now());
}

}

7、显示

访问 http://localhost:8080/books

1562891211968

笔记

基础概念

Broker:简单来说就是消息队列服务器实体
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
Queue:消息队列载体,每个消息都会被投入到一个或多个队列
Binding:绑定,它的作用就是把exchangequeue按照路由规则绑定起来,在绑定的时候一定会指定一个绑定键“Binding Key
Routing Key:路由关键字,exchange根据这个关键字进行消息投递
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离
producer:消息生产者,就是投递消息的程序
consumer:消息消费者,就是接受消息的程序
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

参考资料

RabbitMQ(四)消息确认(发送确认,接收确认)

一起来学SpringBoot | 第十三篇:RabbitMQ延迟队列

RabbitMQ 实战指南

总结

SpringBoot的知识已经有前辈在我们之前探索了。比较喜欢的博主有:唐亚峰 | Battcn方志朋的专栏程序猿DD纯洁的微笑。对这门技术感兴趣的可以去他们的博客逛逛。谢谢他们的分享~~

以上文章是我用来学习的Demo,都是基于 SpringBoot2.x 版本。

源码地址: https://github.com/ynfatal/springboot2-learning/tree/master/chapter19_4

学习 唐亚峰北京-小北 前辈的经验