SpringBoot2 | 第十八篇:使用 Redis 实现消息队列

—— 基于 Lettuce 的发布与订阅

​ 使用 Spring Data Redis 作为发布消息的方式听起来可能很奇怪,但是您将发现,Redis不仅提供NoSQL数据存储,而且还提供消息传递系统

[TOC]

JMS

​ JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

​ JMS消息通常有两种类型:

  • 点对点(Point-to-Point):在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列(javax.jms.Queue)相关联。被概括为:只有一个消费者将获得消息
  • 发布/订阅(Publish/Subscribe):发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题(javax.jms.Topic)关联。被概括为:多个消费者可以获得消息

环境/版本一览:

  • 开发工具:Intellij IDEA 2018.2.2
  • springboot: 2.0.5.RELEASE
  • jdk:1.8.0_171
  • maven:3.3.9
  • spring-boot-starter-data-redis:2.0.5.RELEASE
  • commons-pool2:2.5.0

1、pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 整合Lettuce Redis需要commons-pool2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

2、application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
spring:
redis:
host: localhost
password: 123456
# Redis默认情况下有16个分片,这里配置具体使用的分片,默认是0
database: 1
lettuce:
pool:
# 当池耗尽时,在引发异常之前连接分配可以阻塞的最长时间(使用负值表示没有限制) 默认 -1
max-wait: -1ms
# 连接池最大连接数(使用负值表示没有限制) 默认 8
max-active: 8
# 连接池中的最大空闲连接 默认 8
max-idle: 8
# 连接池中的最小空闲连接 默认 0
min-idle: 0
# 连接超时时间
timeout: 10000ms

3、message(消费者)

ReceiverOne.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.fatal.message;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.concurrent.CountDownLatch;

/**
* 消费者One
* @author: Fatal
* @date: 2018/10/18 0018 15:34
*/
@Slf4j
public class ReceiverOne {

private CountDownLatch latch;

/**
* 使用`@Autowired`的Setter方法注入方式
*/
@Autowired
public ReceiverOne(CountDownLatch latch) {
this.latch = latch;
}

/** 方法名随你起,不过必须和配置类中的消息监听器组件内容保持一致 */
public void receiveMessage(String message) {
log.info("【消费者 ReceiverOne】:我消费了<" + message + ">");
// Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
latch.countDown();
}

}

ReceiverTwo.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.fatal.message;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.concurrent.CountDownLatch;

/**
* 消费者Two
* @author: Fatal
* @date: 2018/10/18 0018 15:34
*/
@Slf4j
public class ReceiverTwo {

private CountDownLatch latch;

/**
* 使用`@Autowired`的Setter方法注入方式
*/
@Autowired
public ReceiverTwo(CountDownLatch latch) {
this.latch = latch;
}

public void receiveMessage(String message) {
log.info("【消费者 ReceiverTwo】:我消费了<" + message + ">");
// Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
latch.countDown();
}

}

4、config

Spring Data Redis 提供了所有与 Redis 发送和接收消息所需的组件。具体来说,您需要配置:

  • A connection factory(连接工厂)
  • A message listener container(消息监听器容器)
  • A Redis template(RedisTemplate)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.fatal.config;

import com.fatal.message.ReceiverOne;
import com.fatal.message.ReceiverTwo;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

import java.util.concurrent.CountDownLatch;

/**
* `消息服务`配置类
* `@Bean`修饰含参方法时,参数从Spring容器中取
* @author: Fatal
* @date: 2018/10/18 0018 15:40
*/
@Configuration
public class MessageConfig {

/**
* 往Spring容器注入`消费者One`和`消费者Two`
*/
@Bean
ReceiverOne receiverOne(CountDownLatch latch) {
return new ReceiverOne(latch);
}

@Bean
ReceiverTwo receiverTwo(CountDownLatch latch) {
return new ReceiverTwo(latch);
}

@Bean
CountDownLatch countDownLatch() {
// 设置闩锁的计数为 2
return new CountDownLatch(2);
}


@Bean
RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory,
MessageListenerAdapter adapterOne,
MessageListenerAdapter adapterTwo) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
/** 设置连接工厂 */
container.setConnectionFactory(factory);
/** 添加两个消息监听器,设置监听的主题为`topic` */
container.addMessageListener(adapterOne, new PatternTopic("topic"));
container.addMessageListener(adapterTwo, new PatternTopic("topic"));
return container;
}

@Bean
MessageListenerAdapter adapterOne(ReceiverOne receiverOne) {
/** 对`ReceiverOne`进行封装,指定接受消息de方法为`receiverMessage` */
return new MessageListenerAdapter(receiverOne, "receiveMessage");
}

@Bean
MessageListenerAdapter adapterTwo(ReceiverTwo receiverTwo) {
/** 对`ReceiverTwo`进行封装,指定接受消息de方法为`receiverMessage` */
return new MessageListenerAdapter(receiverTwo, "receiveMessage");
}

}

​ 因为 ReceiverOne、ReceiverTwo 两个类是都是POJO,所以它需要包装在一个消息监听器适配器中(MessageListenerAdapter),该适配器实现 MessageListener 接口 。消息监听适配器还配置为当消息到达时调用接收器上的用来接收消息的方法(receiveMessage)。

5、controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.fatal.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.CountDownLatch;

/**
* Message 控制器
* @author: Fatal
* @date: 2018/10/18 0018 16:07
*/
@Slf4j
@RestController
@RequestMapping("/message")
public class MessageController {

@Autowired
private StringRedisTemplate stringRedisTemplate;

@Autowired
private CountDownLatch latch;

@PostMapping("/sendRedisMessage")
public void sendRedisMessage(String message) throws InterruptedException {
log.info("【控制器 MessageController】:生产者发布了<" + message + ">");
stringRedisTemplate.convertAndSend("topic", message);

// 使当前线程等待直到闩锁计数为零,除非线程被中断。
latch.await();

// 结束程序
System.exit(0);
}

}

6、显示

运行工程

1539856884275

发布消息

访问 sendRedisMessage 接口发布消息

POST:http://localhost:8080/message/sendRedisMessage

1539856959699

1539857052006

控制台

1539857112708

参考资料

Messaging-Redis

SpringBoot非官方教程 | 第十四篇:在springboot中用redis实现消息队列

CountDownLatch 源码

See Also

The following guides may also be helpful:

总结

CountDownLatch介绍

CountDownLatch一种同步辅助工具,允许一个或多个线程等到在其它线程中执行的一组操作完成为止。

使用给定的计数初始化CountDownLatch。调用 await() 方法让线程阻塞,直到当前计数到达 0 触发 countDown() 方法的调用,释放所有等待的线程,再继续执行 await() 方法的后续调用。这是一种一次性现象——计数无法重置。如果需要重置计数的版本,可以考虑使用 Cyclicbarrier

用一个计数初始化的CountDownLatch充当一个简单的开/关闩锁,或门:所有调用了 await() 方法的线程在门外等待直到其中一个线程调用了 countDown() 方法将门打开。 初始化为 NCountDownLatch 可用于让一个线程等待直到 N 个线程完成某些操作,或者某些操作完成 N 次。

CountDownLatch 一个有用的属性就是它不要求调用 countDown() 在继续之前等待计数达到 0,它只是为了阻止任何线程在所有的线程都能通过之前继续通过 await() (倒计时到 0 时,开门放一群或一只狗)

CountDownLatch原理

CountDownLatch是通过一个计数器来实现的,计数器的初始化值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就相应得减1。当计数器到达0时,表示所有的线程都已完成任务,然后在闭锁上等待的线程就可以恢复执行任务。

:这里为了演示,才使用 CountDownLatch 的。使用时不必加上它。

详细了解 CountDownLatch

SpringBoot的知识已经有前辈在我们之前探索了。比较喜欢的博主有:唐亚峰 | Battcn方志朋的专栏程序猿DD纯洁的微笑。对这门技术感兴趣的可以去他们的博客逛逛。谢谢他们的分享~~

以上文章是我用来学习的Demo,都是基于 SpringBoot2.x 版本。

源码地址: https://github.com/ynfatal/springboot2-learning/tree/master/chapter18

学习 方志朋 前辈的经验