SpringBoot2 | 第三十一篇:Redisson可重入公平锁解决超卖

—— 基于Redis 单节点

库存超卖有很多种解决方案,悲观锁乐观锁zookeeper 分布式锁redis 分布式锁。这篇笔记介绍如何 redis 分布式锁。当然,redlock 实现分布式锁备受争议,就连作者也不推荐使用。Redis 官方推荐了一个 Redis Java Client —— Redisson

分布式锁

什么是分布式锁?

​ 分布式锁是控制分布式系统之间同步访问共享资源的一种方式。不同的系统或者同一个系统的不用主机之间共享同一个或同一组资源,那么访问这些资源的时候,往往需要互斥防止彼此干扰,进而保证了数据一致性。分布式系统下如果想实现线程同步,可以使用分布式锁。

分布式锁有以下特点:

  • 互斥性:一个方法在同一时间段只能被一台机器的一个线程执行
  • 防止死锁:具有失效机制,防止死锁
  • 容错性:集群环境中,只要节点大多数正常运行,就能正常加解锁
  • 解铃还须系铃人:加解锁必须是同一个客户端
  • 可重入性:同一个客户端获得锁后可递归调用
  • 支持阻塞和非阻塞:和 ReentrantLock 一样支持 lock(阻塞) 和 trylock(非阻塞)
  • 支持公平锁和非公平锁(可选)公平锁,多个客户端同时请求加锁时,优先分配给先发出请求的线程非公平锁,相比公平锁而言,它是无序的,不存在什么先来后到

当然,Redisson 就具有这些特点。

Redisson

Redisson 是一个具有内存数据网格的 Redis Java Client。它为使用 Redis 提供了更方便、最简单的方法。它提供了关注点分离,是你能集中精力去处理业务逻辑。Redisson 底层是采用 Netty 框架。支持 Redis 2.8 以上版本,支持 Java 1.6 以上版本。Redisson 提供了很多很强大的功能,详细可以看官网Wiki目录。其中,分布式锁就有很多种,而这里选择的是公平锁。其它的锁也可以使用,看需求,看场景,选择合适即可。:satisfied:

看门狗

​ 摘取自官网 Wiki:大家都知道,如果负责储存这个分布式锁的 Redisson节点(Redisson Java Client宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson 内部提供了一个监控锁的看门狗,它的作用是在 Redisson 实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

​ 摘取自源码:锁定监视器超时(毫秒)。仅在未定义 leaseTimeout 参数的情况下获取锁时才使用此参数。如果看门狗没有将锁延长到下一个 LockWatchDogTimeout 时间间隔,则锁将在 LockWatchDogTimeout 之后过期。这可以防止由于 Redisson 客户端崩溃或任何其他原因而导致无限锁定或者锁无法以正确的方式释放。

总结:

它的作用就是给所有未指定 leaseTimeout 参数的锁添加一个超时时间,这个时间就是 LockWatchDogTimeout 的值,默认每隔 internalLockLeaseTime / 3 秒(internalLockLeaseTime 等于 LockWatchDogTimeout,可以去看源码)检查一次,Redisson 节点正常,它就会不断延长锁的持有时间,直到 unlock;当 Redisson 节点宕机后,它不再延长,这个时候锁在持有时间用完之后就会释放锁,可以有效防止死锁

环境/版本一览:

  • 开发工具:Intellij IDEA 2018.2.2
  • springboot: 2.1.1.RELEASE
  • jdk:1.8.0_171
  • maven:3.3.9
  • redisson-spring-boot-starter:3.11.2
  • docker:19.03.1
  • redis:5.0.5

注:这里 SpringBoot 的版本建议Redisson starterSpringBoot 版本保持一致

1、pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- 引入 redisson 依赖 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.11.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

2、yml

application.yml

1
2
3
4
5
spring:
redis:
#path to redisson.yaml or redisson.json
redisson:
config: classpath:redisson.yaml

redisson.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# 单redis节点模式配置文件样本
singleServerConfig:
# 连接空闲超时,单位:毫秒。默认值:10000
# 如果当前连接池里的连接数量超过了最小空闲连接数,而同时有部分连接的空闲时间超过了该数值,
# 那么这些连接将会自动被关闭,并从连接池里去掉。时间单位是毫秒。
idleConnectionTimeout: 10000
# Ping timeout used in Node.ping and Node.pingAll operation。单位:毫秒。默认值:1000
pingTimeout: 1000
# 连接超时,单位:毫秒。默认值:10000
connectTimeout: 10000
# 命令等待超时,单位:毫秒。默认值:3000
timeout: 3000
# 命令失败重试次数,默认值:3
# 如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。
# 如果尝试在此限制次数之内(默认3次嘛)发送成功,则开始启用 timeout(命令等待超时) 计时。
retryAttempts: 3
# 命令重试发送时间间隔,单位:毫秒。默认值:1500。在一条命令发送失败以后,等待重试发送的时间间隔。
retryInterval: 1500
# 重新连接时间间隔,单位:毫秒。默认值:3000。当与某个节点的连接断开时,等待与该节点重新建立连接的时间间隔。
reconnectionTimeout: 3000
# 执行失败最大次数
# 在某个节点执行相同或不同命令时,连续失败 failedAttempts 时,该节点将被从可用节点列表里清除,
# 直到 reconnectionTimeout(重新连接时间间隔) 超时以后再次尝试。
failedAttempts: 3
password: 123456
# 单个连接最大订阅数量。默认值:5
subscriptionsPerConnection: 5
# 客户端名称。默认值:null。在Redis节点里显示的客户端名称。
clientName: null
address: redis://192.168.0.107:6379
# 启用SSL终端识别。开启SSL终端识别能力。默认值:true
sslEnableEndpointIdentification: true
# SSL实现方式。确定采用哪种方式(JDK或OPENSSL)来实现SSL连接。默认值:JDK
sslProvider: JDK
# SSL信任证书库路径。指定SSL信任证书库的路径。默认值:null
sslTruststore: null
# SSL信任证书库密码。指定SSL信任证书库的密码。默认值:null
sslTruststorePassword: null
# SSL钥匙库路径。指定SSL钥匙库的路径。默认值:null
sslKeystore: null
# SSL钥匙库密码。指定SSL钥匙库的密码。默认值:null
sslKeystorePassword: null
# 发布和订阅连接的最小空闲连接数,默认值:1
subscriptionConnectionMinimumIdleSize: 1
# 发布和订阅连接池大小,默认值:50
subscriptionConnectionPoolSize: 50
# 最小空闲连接数,默认值:32。最小保持连接数(长连接)。长期保持一定数量的连接有利于提高瞬时写入反应速度。
connectionMinimumIdleSize: 32
# 连接池大小,默认值:64。连接池的连接数量自动弹性伸缩。
connectionPoolSize: 64
# 数据库编号。默认值:0
database: 4
# DNS监测时间间隔,单位:毫秒。默认值:5000
dnsMonitoringInterval: 5000
# 所有redis节点客户端之间共享的线程数。默认值:16
threads: 0
# Netty线程池数量。默认值:32
nettyThreads: 32
# 编码。默认值:org.redisson.codec.JsonJacksonCodec。Redis用于读取和存储的序列化、反序列化的方式。
codec:
class: org.redisson.codec.JsonJacksonCodec
# 传输模式。默认值:NIO
transportMode: NIO
# 监控锁的看门狗超时,单位:毫秒。默认值:30 * 1000
# 监控锁的看门狗超时时间单位为毫秒。该参数只适用于分布式锁的加锁请求中未明确使用leaseTimeout参数的情况。
# 如果该看门狗未使用lockWatchdogTimeout去重新调整一个分布式锁的lockWatchdogTimeout超时,那么这个锁将变为失效状态。
# 这个参数可以用来避免由Redisson客户端节点宕机或其他原因造成死锁的情况。
lockWatchdogTimeout: 30000
# 保持订阅发布顺序。默认值:true
keepPubSubOrder: true

3、constants

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.fatal.constants;

/**
* @author Fatal
* @date 2019/8/31 0031 12:23
*/
public interface LockConstant {

/**
* 锁前缀
*/
String LOCK_PREFIX = "redisson_fair_lock:%s";

/**
* 格式化
* @param lockName
* @return
*/
static String format(String lockName) {
return String.format(LockConstant.LOCK_PREFIX, lockName);
}

}

4、annotation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.fatal.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;

/**
* 分布式锁
* @author Fatal
* @date 2019/8/24 0024 20:04
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Lock {

/**
* 锁的名称
* @return
*/
String name() default "";

/**
* 等待获取锁的最长时间
* @return
*/
long waitTime() default 4000L;

/**
* 授予锁后持有锁的最长时间。该时间根据业务也行量而定
* @return
*/
long leaseTime() default 2000L;

/**
* 时间单位
* @return
*/
TimeUnit unit() default TimeUnit.SECONDS;

}

5、aspect

核心方法:

1
2
3
4
5
6
7
8
9
10
/**
* 获取锁后立即返回 true,如果当前的锁已经被此系统或者分布式系统中的任何进程中的另一个线程持有,
* 就返回 false。如果获取了锁,它将持有锁直到 unlock 方法被调用 或者 直到 leasetime 结束。
* @param waitTime 等待获取锁的最长时间
* @param leaseTime 持有锁的时间
* @param unit 时间单位
* @return 如果成功获取锁,则返回 true
* @throws InterruptedException 如果线程在此方法之前或期间中断
*/
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

书写切面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.fatal.aspect;

import com.fatal.annotation.Lock;
import com.fatal.constants.LockConstant;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

/**
* @author Fatal
* @date 2019/8/24 0024 20:06
*/
@Slf4j
@Component
@Aspect
@Order(1)
public class LockAspect {

private RedissonClient redisson;

public LockAspect(RedissonClient redisson) {
this.redisson = redisson;
}

@Pointcut("@annotation(com.fatal.annotation.Lock)")
public void point() {}

@Around("point()")
public Object lock(ProceedingJoinPoint joinPoint) throws Throwable {
Lock lock = getLock(joinPoint);
// 目标方法参数
Object[] args = joinPoint.getArgs();
RLock fairLock = redisson.getFairLock(LockConstant.format(lock.name()));
boolean isLock = fairLock.tryLock(lock.waitTime(), lock.leaseTime(), lock.unit());
if (isLock) {
try {
return joinPoint.proceed(args);
} finally {
fairLock.unlock();
}
}
log.info("{}: 系统繁忙,请重试", Thread.currentThread().getName());
// 这里为了方便查看日志,把下面一行注释掉了,实际还需要抛个异常就行
// throw new RuntimeException("系统繁忙,请重试");
return null;
}

/**
* 获取 Lock 注解
* @param joinPoint
* @return
*/
private Lock getLock(ProceedingJoinPoint joinPoint) {
// 获得方法署名
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
// 获得目标方法
Method method = signature.getMethod();
// 拿到目标方法上的注解 Lock 并返回
return method.getAnnotation(Lock.class);
}

}

6、service

IBusinessService.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.fatal.service;

import java.util.concurrent.atomic.AtomicInteger;

/**
* 业务服务
* @author Fatal
* @date 2019/8/24 0024 20:22
*/
public interface IBusinessService {

/**
* 无锁业务
*/
void businessWithoutLock();

/**
* 带锁业务
*/
void businessWithLock();

/**
* 获取库存
* @return
*/
AtomicInteger getStock();

/**
* 补充库存
* @param increment
*/
Integer supplyStock(Integer increment);

}

BusinessServiceImpl.java

刚开始我还在想,AtomicInteger 这种类的所有方法都具有原子性,那我之前和数据库交互的实体属性都只是包装类,那并发情况下对实体属性的操作是非线程安全的,那会不会引起线程安全问题?后来一想,考虑线程是否安全应该体现在共享数据上面,实体的属性 get 出来后是局部变量(局部嘛,多个线程的同一个方法的局部变量互不影响的),没有必要考虑线程安全与否,从实体属性的角度出发,不需要用原子类。真正需要考虑线程安全的是对数据库的操作,数据库中的数据当然是共享数据,笑了:smile:。这里的疑问应该就是概念混淆了。不过可以想象一下,数据库加事务后也具有原子性,AtomicInteger <-> 数据库 ? 我们倒可以用 AtomicInteger 来模拟对数据库的操作呢!于是接下来就偷个懒了,不建数据库,newAtomicInteger 对象来做 Demo 就行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.fatal.service.impl;

import com.fatal.annotation.Lock;
import com.fatal.service.IBusinessService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.concurrent.atomic.AtomicInteger;

/**
* 业务服务实现
* @author Fatal
* @date 2019/8/24 0024 20:24
*/
@Slf4j
@Service
public class BusinessServiceImpl implements IBusinessService {

// 设置库存为10,AtomicInteger的所有方法都具有原子性
private AtomicInteger stock = new AtomicInteger(10);

@Override
public void businessWithoutLock() {
if (stock.get() > 0) {
try {
// 1ms 秒执行业务逻辑。当然这是假设,1ms肯定执行不完的。以此为例子,就算是业务很快完成,也会出现超卖
Thread.sleep(1);
stock.decrementAndGet();
log.info("{}: 成功购买", Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
log.info("{}: 抱歉,库存不足", Thread.currentThread().getName());
}
}

@Override
@Lock(name = "buckleInventoryLock")
public void businessWithLock() {
businessWithoutLock();
}

@Override
public AtomicInteger getStock() {
return this.stock;
}

@Override
public Integer supplyStock(Integer increment) {
return this.stock.addAndGet(increment);
}

}

7、controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.fatal.controller;

import com.fatal.service.IBusinessService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

/**
* @desc 为了方便测试,下边的映射器都用 @GetMapping
* @author Fatal
* @date 2019/8/29 0029 17:07
*/
@RestController
public class OversoldController {

private IBusinessService businessService;

public OversoldController(IBusinessService businessService) {
this.businessService = businessService;
}

@GetMapping("/withoutLock")
public void withoutLock() {
businessService.businessWithoutLock();
}

@GetMapping("/withLock")
public void withLock() {
businessService.businessWithLock();
}

@GetMapping("/query")
public String query() {
return "剩余库存:" + businessService.getStock().get();
}

@GetMapping("/supply/{increment}")
public Integer supply(@PathVariable(value = "increment") Integer increment) {
return businessService.supplyStock(increment);
}

}

8、Test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.fatal;

import org.junit.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.concurrent.TimeUnit;

/**
* @author Fatal
* @date 2019/8/24 0024 17:39
*/
public class RedissonTests extends Chapter31ApplicationTests {

@Autowired
private RedissonClient redisson;

/**
* 测试整合 Redisson
* @throws InterruptedException
*/
@Test
public void fairLockTest() throws InterruptedException {
RLock fairLock = redisson.getFairLock("anyLock");
boolean isLock = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
if (isLock) {
Thread.sleep(4000);
fairLock.unlock();
}
}

}

9、测试

首先,启动虚拟机。为什么选择虚拟机呢?因为我觉得虚拟机中安装 redis,是比较接近生产环境的。麻烦?你可以使用 docker 安装 redis 来测试,不会麻烦的。(这里我安装的是最新版本 5.0.5

在测试中,我们还会使用一个工具,就是 Apach ab 来进行压测。

整合 Redisson

运行 RedissonTests.fairLockTest()

Redis 图形化界面

1567228248652

模拟超卖

启动程序

使用 ab 压测。来到 bin 目录下,输入以下命令:

1
ab -n 1000 -c 1000 http://localhost:8080/withoutLock

-n:总请求数

-c:并发数

1567228633820

访问 http://localhost:8080/query

你可以发现,超卖了…

1567228600969

解决超卖

启动程序

使用 ab 压测。来到 bin 目录下,输入以下命令:

1
ab -n 1000 -c 1000 http://localhost:8080/withLock

1567230817983

访问 http://localhost:8080/query

1567228995335

问题

Windows 下的 Redis 和 Redisson starter 一起用会出现问题。报错如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2019-08-29 14:54:56.755 ERROR 16728 --- [isson-netty-2-9] o.redisson.client.handler.CommandsQueue  : Exception occured. Channel: [id: 0x5b2c40f0, L:/127.0.0.1:65492 - R:localhost/127.0.0.1:6379]

java.io.IOException: 远程主机强迫关闭了一个现有的连接。
at sun.nio.ch.SocketDispatcher.read0(Native Method) ~[na:1.8.0_171]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) ~[na:1.8.0_171]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_171]
at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0_171]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0_171]
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) ~[netty-buffer-4.1.31.Final.jar:4.1.31.Final]
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132) ~[netty-buffer-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:648) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:583) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:500) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897) [netty-common-4.1.31.Final.jar:4.1.31.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.31.Final.jar:4.1.31.Final]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
... 很多个,省略

应用启动之后,压测后隔段时间就出现这些错误,应用关闭时也会出现这些错误。:sweat:

解决

刚刚又重新下载了 Windows 版本的 Redis3.2.100,只改密码,然后发现还是报错,官网也能看出来很久没维护了,我想问题应该出现在 Windows 版本的 redis 上。

建议使用 Linux 版本的。 :smirk:

笔记

压测的时候,用 Redis 图形化界面看,如下:

1567238839909

A:为什么使用 hash 这种结果存储锁呢?

B:如果直接 String 的话, UUID:ThreadId 就不知道是谁的锁,后面还有别的锁的话, UUID:ThreadId 可能会重复。类似命名空间的思想,选择 hash,可以避免了 key 的碰撞。

A:那不用 UUID:ThreadId不行吗?

B:不用的话怎么实现 解铃还须系铃人,是吧。我觉得用 hash 不是为了存放多个 field,只是它这种数据结构刚好适合而已。

看下源码:

1
2
3
4
5
6
...
// 线程队列,用 list 数据类型
threadsQueueName = prefixName("redisson_lock_queue", name);
// 线程在队列中的超时时间,用 zset 数据类型;key: UUID:ThreadId,value: 过期时间
timeoutSetName = prefixName("redisson_lock_timeout", name);
...

Redisson 的公平锁就是凭 redisson_lock_queue:{lockName}redisson_lock_timeout:{lockName} 来实现的。

参考资料

公平锁(Fair Lock)

官网Wiki目录

总结

SpringBoot的知识已经有前辈在我们之前探索了。比较喜欢的博主有:唐亚峰 | Battcn方志朋的专栏程序猿DD纯洁的微笑。对这门技术感兴趣的可以去他们的博客逛逛。谢谢他们的分享~~

以上文章是我用来学习的Demo,都是基于 SpringBoot2.x 版本。

源码地址: https://github.com/ynfatal/springboot2-learning/tree/master/chapter31