SpringBoot2 | 第十九篇:整合RabbitMQ(生产者确认)

SpringBoot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,SpringBoot 提供了 spring-boot-starter-amqp 项目对消息各种支持。

[TOC]

RabbitMQ介绍

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是 面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全

RabbitMQ 是实现 AMQP高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在 易用性、扩展性、高可用性 等方面表现不俗。RabbitMQ 服务器端用 Erlang 语言 编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。支持延迟队列(这是一个非常有用的功能)….

生产者确认

​ 当消息的生产者将消息发送出去之后,消息到底有没有正确的到达服务器呢?如果不进行特殊配置,默认情况下发送消息这个操作是不会返回任何信息给生产者的,也就是默认情况下生产者不知道消息有没有正确地到达服务器。

RabbitMQ 针对这个问题,提供了两种解决方案:

  • 通过事务机制实现;
  • 通过发送方确认机制实现。

事务机制

RabbitMQ 客户端提供了与事务机制的三个方法:channel.txSelect()、channel.txCommit() 和 channel.txRollback()。 事务机制 跟数据库事务不一样,这种事务是为了保证消息被RabbitMQ接收,如果事务提交成功,那么消息一定被RabbitMQ接收。提交之前如果报错,我们便可以将它捕获,进而通过执行 channel.txRollback() 方法来实现当前 channel 事务的回滚。

1
2
3
4
5
6
7
8
9
try {
channel.txSelect();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
int result = 1/0;
channel.txCommit();
} catch(Exception e) {
e.printStackTrace();
channel.txRollback();
}

缺点:严重降低了 RabbitMQ 的消息吞吐量。

发送方确认机制

RabbitMQ的事务机制在性能上得不到保证,所以引入了一种轻量级的方式 —— 发送方确认(publisher confirm)机制。该机制可以弥补事务机制的缺陷,提高了整体的吞吐量。使用 SrpingBoot 整合 RabbitMQ,如何实现发送方确认机制呢?

  1. application.yml 文件需要启用 publisher-confirmspublisher-returns
  2. RabbitMQ 配置类中,自定义 RabbitTempalte,设置setMandatory(true),否则消息路由失败不会回调 ReturnCallback;通过调用 setConfirmCallback()setReturnCallback()传入它们的实现。

环境/版本一览:

  • 开发工具:Intellij IDEA 2018.2.2
  • springboot: 2.0.6.RELEASE
  • jdk:1.8.0_171
  • maven:3.3.9
  • spring-boot-starter-amqp: 2.0.6.RELEASE

1、pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>

2、application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
rabbitmq:
username: guest # 默认为`guest`
password: guest # 默认为`guest`
host: localhost # 默认为`localhost`
port: 5672 # 默认为 5672
virtual-host: /
# 手动ACK,不开启自动ACK,目的是为了防止报错后未正确处理消息丢失,默认为`none`,不 ACK
listener:
simple:
acknowledge-mode: manual
# 启用发布服务器确认
publisher-confirms: true
# 启动发布服务器返回
publisher-returns: true

3、config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.fatal.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.LocalDateTime;

/**
* RabbitMQ 配置类
* @author: Fatal
* @date: 2018/10/20 0020 9:32
*/
@Slf4j
@Configuration
public class RabbitMQConfig {

public static final String CONFIRM_QUEUE = "confirm_queue";
public static final String CONFIRM_EXCHANGE = "confirm_exchange";
public static final String ROUTING_KEY = "book";

/** 日志模板 */
private final String EXIST = "RabbitMQ Block 的 Exchange({})接收到消息";
private final String NOT_EXIST = "RabbitMQ Block 不存在此交换机 ({})";

/**
* 自定义 RabbitTemplate,CachingConnectionFactory 是必须的
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// 设置 mandatory 为 true,否则消息路由失败不会回调 ReturnCallback
rabbitTemplate.setMandatory(true);
// 设置确认回调函数
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) ->
log.info((ack ? EXIST : NOT_EXIST) + ":time({}), ack({}),cause({})",
correlationData.getId(), LocalDateTime.now(), ack, cause));
// 设置返回回调函数
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) ->
log.info("消息路由失败:time({}),message({}), replyCode({}), replyText({}), exchange({}), routingKey({})",
LocalDateTime.now(), message, replyCode, replyText, exchange, routingKey));
return rabbitTemplate;
}

@Bean
public Queue confirmBookQueue() {
return new Queue(CONFIRM_QUEUE);
}

@Bean
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE);
}

@Bean
public Binding binding(Queue queue, DirectExchange confirmExchange) {
return BindingBuilder.bind(queue)
.to(confirmExchange)
.with(ROUTING_KEY);
}

}

4、entity

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.fatal.entity;

import lombok.Data;
import lombok.experimental.Accessors;

import java.io.Serializable;

/**
* Book 实体
* @author: Fatal
* @date: 2018/10/20 0020 9:39
*/
@Data
@Accessors(chain = true)
public class Book implements Serializable {

private String id;
private String name;

}

注意:因为要通过网络传输发送给 RabbitMQ 服务器,所以需要实现序列化

5、receiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.fatal.consumer;

import com.fatal.config.RabbitMQConfig;
import com.fatal.entity.Book;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.LocalDateTime;

/**
* Book 消费者
* 同时监听两个队列
* @author: Fatal
* @date: 2018/10/20 0020 10:04
*/
@Slf4j
@Component
public class BookReceiver {

@RabbitListener(queues = {RabbitMQConfig.CONFIRM_QUEUE})
public void listenerReceiver(Book book, Message message, Channel channel) {
try {
log.info("【listenerReceiver 监听的消息】 - [监听时间] - [{}] - [{}]", LocalDateTime.now(), book);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
// TODO Ack失败的后续处理
log.error("【Ack失败】 time = {}", LocalDateTime.now());
} catch (Exception e) {
// TODO 业务异常的后续处理
log.error("【消费失败,业务异常】 time = {}", LocalDateTime.now());
}
}
}

6、controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package com.fatal.controller;

import com.fatal.config.RabbitMQConfig;
import com.fatal.entity.Book;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;

/**
* Book 控制器
* @author: Fatal
* @date: 2018/10/20 0020 9:46
*/
@Slf4j
@RestController
@RequestMapping("/books")
public class BookController {

private RabbitTemplate rabbitTemplate;

/**
* 回调的相关数据(我拿来放交换机名称用了。。。)
*/
private CorrelationData correlationData = new CorrelationData();

@Autowired
public BookController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
correlationData.setId(RabbitMQConfig.CONFIRM_EXCHANGE);
}

private static String NO = "no";

/**
* 交换机和路由键都正确
*/
@GetMapping("/1")
public void send1() {
Book book = new Book().setId("1").setName("米彩");
log.info("[发送时间] - [{}]", LocalDateTime.now());
// 发送到confirmExchange交换机
rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE, RabbitMQConfig.ROUTING_KEY, book, correlationData);
// 发送到默认交换机(默认交换机隐式的绑定到每个队列,路由键等于队列的名称)
rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_QUEUE, book, correlationData);
}

/**
* 交换机正确,路由键错误
*/
@GetMapping("/2")
public void send2() {
Book book = new Book().setId("1").setName("米彩");
log.info("[发送时间] - [{}]", LocalDateTime.now());
this.rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE,RabbitMQConfig.ROUTING_KEY + NO, book, correlationData);
}

/**
* 交换机错误(不管路由键是否正确)
*/
@GetMapping("/3")
public void send3() {
Book book = new Book().setId("1").setName("米彩");
log.info("[发送时间] - [{}]", LocalDateTime.now());
this.rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE + NO, RabbitMQConfig.ROUTING_KEY, book, correlationData);
// this.rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE + NO, RabbitMQConfig.ROUTING_KEY + NO, book, correlationData);
}

}

7、显示

启动项目

7.1、交换机和路由键都正确

访问 http://localhost:8080/books/1

控制台如下:

1562928687568

你可以在BookReceiver的31行打个断点。再次访问,然后去看 RabbitMQ客户端

1562823185092

你会发现,这个队列确实接收到两条消息,其中一条是通过默认交换机接收的。

析:消息正确地到达了交换机,触发了 ConfirmCallback,ack 为 true;路由键正确所以路由成功,不触发 ReturnCallback

7.2、交换机正确、路由键错误

访问 http://localhost:8080/books/2

控制台如下:

1562823493590

析:消息正确地到达了交换机,触发了 ConfirmCallback,ack 为 true;路由键错误所以路由失败,触发了 ReturnCallback

7.3、交换机错误(不管路由键是否正确)

访问 http://localhost:8080/books/3

控制台如下:

1562825937096

析:消息正确地没有到达了交换机,触发了 ConfirmCallback ,ack 为 false;交换机都错了,所以没有路由操作,自然也不存在路由失败与成功,所以 ReturnRollbak 不被触发。

笔记

基础概念

Broker:简单来说就是消息队列服务器实体
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
Queue:消息队列载体,每个消息都会被投入到一个或多个队列
Binding:绑定,它的作用就是把exchangequeue按照路由规则绑定起来,在绑定的时候一定会指定一个绑定键“Binding Key
Routing Key:路由关键字,exchange根据这个关键字进行消息投递
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离
producer:消息生产者,就是投递消息的程序
consumer:消息消费者,就是接受消息的程序
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

ConfirmCallback 和 ReturnCallback

  • 消息到达 exchange,则 ConfirmCallback 回调,返回 true
  • 消息没有到达 exchange,则 ConfirmCallback 回调,返回 false
  • 消息路由成功,则不回调 ReturnCallback
  • 消息路由失败,则回调 ReturnCallback

参考资料

springboot(八):RabbitMQ详解

一起来学SpringBoot | 第十二篇:初探RabbitMQ消息队列

RabbitMQ 实战指南

总结

SpringBoot的知识已经有前辈在我们之前探索了。比较喜欢的博主有:唐亚峰 | Battcn方志朋的专栏程序猿DD纯洁的微笑。对这门技术感兴趣的可以去他们的博客逛逛。谢谢他们的分享~~

以上文章是我用来学习的Demo,都是基于 SpringBoot2.x 版本。

源码地址: https://github.com/ynfatal/springboot2-learning/tree/master/chapter19_3

学习 唐亚峰纯洁的微笑 前辈的经验