雅鉴生活-2

3. 优惠券秒杀

3.1 Redis实现全局唯一ID

当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:

  • id的规律性太明显
  • 受单表数据量的限制

场景分析:如果我们的id具有太明显的规则,用户或者说商业对手很容易猜测出来我们的一些敏感信息,比如商城在一天时间内,卖出了多少单,这明显不合适。

场景分析二:随着我们商城规模越来越大,mysql的单表的容量不宜超过500W,数据量过大之后,我们要进行拆库拆表,但拆分表了之后,他们从逻辑上讲他们是同一张表,所以他们的id是不能一样的, 于是乎我们需要保证id的唯一性。

全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具。

为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:

img

ID的组成部分:
符号位:1bit,永远为0
时间戳:31bit,以秒为单位,可以使用69年
序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Component
public class RedisIdWorker {
/**
* 开始时间戳
*/
private static final long BEGIN_TIMESTAMP = 1640995200L;
/**
* 序列号的位数
*/
private static final int COUNT_BITS = 32;

private StringRedisTemplate stringRedisTemplate;

public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

public long nextId(String keyPrefix) {
// 1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;

// 2.生成序列号
// 2.1.获取当前日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
// 2.2.自增长
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);

// 3.拼接并返回
return timestamp << COUNT_BITS | count;
}
}
  • BEGIN_TIMESTAMP:这是一个静态常量,代表开始的时间戳(单位为秒),具体对应 2022 年 1 月 1 日 00:00:00 的时间戳。在生成 ID 时,会用当前时间戳减去这个开始时间戳,从而得到相对时间戳。
  • LocalDateTime.now():获取当前的日期和时间。
  • now.toEpochSecond(ZoneOffset.UTC):把当前的日期和时间转换为从 1970 年 1 月 1 日 00:00:00 UTC 开始到现在的秒数。
  • now.format(DateTimeFormatter.ofPattern(“yyyy:MM:dd”)):把当前日期格式化为 yyyy:MM:dd 的字符串。
  • timestamp << COUNT_BITS:将相对时间戳左移 32 位,为序列号留出 32 位的空间。
  • timestamp << COUNT_BITS | count:使用按位或运算符 | 将左移后的时间戳和序列号进行拼接,得到最终的全局唯一 ID 并返回。

3.2 添加优惠卷

每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购

tb_voucher:优惠券的基本信息,优惠金额、使用规则等
tb_seckill_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才填写这些信息

新增普通卷代码: VoucherController

1
2
3
4
5
@PostMapping
public Result addVoucher(@RequestBody Voucher voucher) {
voucherService.save(voucher);
return Result.ok(voucher.getId());
}

新增秒杀卷代码:
VoucherController

1
2
3
4
5
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {
voucherService.addSeckillVoucher(voucher);
return Result.ok(voucher.getId());
}

VoucherServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀库存到Redis中
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}

img

img

3.3 秒杀下单

秒杀下单应该思考的内容:

下单时需要判断两点:

  • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
  • 库存是否充足,不足则无法下单

img

VoucherOrderServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}

// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}

//5,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update();
if (!success) {
//扣减库存
return Result.fail("库存不足!");
}

//6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 6.1.订单id
long orderId = redisIdWorker.nextId("order"); // 获取全局唯一订单ID
voucherOrder.setId(orderId);
// 6.2.用户id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
// 6.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

return Result.ok(orderId);

}

img

3.4 库存超卖问题

假设线程1过来查询库存,判断出来库存大于1,正准备去扣减库存,但是还没有来得及去扣减,此时线程2过来,线程2也去查询库存,发现这个数量一定也大于1,那么这两个线程都会去扣减库存,最终多个线程相当于一起去扣减库存,此时就会出现库存的超卖问题。

img

img

img

img

img

乐观锁解决超卖问题


修改代码方案一:
VoucherOrderServiceImpl 在扣减库存时,改为:

1
2
3
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1") //set stock = stock -1
.eq("voucher_id", voucherId).eq("stock",voucher.getStock()).update(); //where id = ? and stock = ?

100份只卖了21份

img


修改代码方案二

之前的方式要修改前后都保持一致,但是这样我们分析过,成功的概率太低,所以我们的乐观锁需要变一下,改成stock大于0 即可

1
2
3
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update().gt("stock",0); //where id = ? and stock > 0

3.5 优惠券秒杀 一人一单

需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单

现在的问题在于:
优惠卷是为了引流,但是目前的情况是,一个人可以无限制的抢这个优惠卷,所以我们应当增加一层逻辑,让一个用户只能下一个单,而不是让一个用户下多个单
具体操作逻辑如下:比如时间是否充足,如果时间充足,则进一步判断库存是否足够,然后再根据优惠卷id和用户id查询是否已经下过这个订单,如果下过这个订单,则不再下单,否则进行下单

img

VoucherOrderServiceImpl


初步代码:增加一人一单逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
// 5.一人一单逻辑 // &&&&&&&&&&&&&&&&&&&&&&
// 5.1.用户id
Long userId = UserHolder.getUser().getId();
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) { // &&&&&&&&&&&&&&&&&&&&&&
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

//6,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update().gt("stock",0);
if (!success) {
//扣减库存
return Result.fail("库存不足!");
}

//7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);

voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

return Result.ok(orderId);

}

存在问题:现在的问题还是和之前一样,并发过来,查询数据库,都不存在订单,所以我们还是需要加锁,但是乐观锁适合更新数据,而现在是插入数据使用悲观锁操作。

没办法判断是否存在,因本身就是不存在,是插入操作故悲观锁

img

注意:在这里遇到了非常多的问题,我们需要慢慢的来思考。


解决并发问题

保证线程安全添加synchronized 锁

首先我们的初始方案是封装了一个createVoucherOrder方法,同时为了确保他线程安全,在方法上添加了一把synchronized 锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Transactional // &&&&&&&&&&&&&&synchronized锁粒度太粗,性能低,在之后修改
public synchronized Result createVoucherOrder(Long voucherId) {

Long userId = UserHolder.getUser().getId();
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}

// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

// 7.返回订单id
return Result.ok(orderId);
}

但是这样添加锁,锁的粒度太粗了,在使用锁过程中,控制锁粒度 是一个非常重要的事情,因为如果锁的粒度太大,会导致每个线程进来都会锁住,所以我们需要去控制锁的粒度,代码需要修改。


解决锁粒度太粗修问题

intern() 这个方法是从常量池中拿到数据,如果我们直接使用userId.toString() 他拿到的对象实际上是不同的对象,new出来的对象,我们使用锁必须保证锁必须是同一把,所以我们需要使用intern()方法

img

img


解决事务没提交锁已释放问题

以上代码还是存在问题,问题的原因在于当前方法被spring的事务控制,如果你在方法内部加锁,可能会导致当前方法事务还没有提交,但是锁已经释放也会导致问题,所以我们选择将当前方法整体包裹起来,确保事务不会出现问题:如下:

img


解决事务未生效问题

但是以上做法依然有问题,因为你调用的方法,其实是this.的方式调用的,事务想要生效,还得利用代理来生效,所以这个地方,我们需要获得原始的事务对象, 来操作事务

img

img

3.6 集群环境下的并发问题

通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。

1、我们将服务启动两份,端口分别为8081和8082:

img

2、然后修改nginx的conf目录下的nginx.conf文件,配置反向代理和负载均衡:

img

img

4. 分布式锁

4.1 基本原理和实现方式对比

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

(多个JVM都可以看到;不管谁来访问,都只有一个可以看到)

分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路

img

img

4.2 Redis分布式锁的实现核心思路

实现分布式锁时需要实现的两个基本方法:

  • 获取锁:
    • 互斥:确保只能有一个线程获取锁
    • 非阻塞:尝试一次,成功返回true,失败返回false
  • 释放锁:
    • 手动释放
    • 超时释放:获取锁时添加一个超时时间

img

img

4.3 实现分布式锁版本一

  • 加锁逻辑

定义一个类实现下面的接口,利用Redis实现分布式锁功能

img

利用setnx方法进行加锁,同时增加过期时间,防止死锁,此方法可以保证加锁和增加过期时间具有原子性

img

  • 修改业务代码

img

img

img

4.4 Redis分布式锁误删

img

img

解决Redis分布式锁误删问题

需求:修改之前的分布式锁实现,满足:在获取锁时存入线程标示(可以用UUID表示) 在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致

  • 如果一致则释放锁
  • 如果不一致则不释放锁

核心逻辑:在存入锁时,放入自己线程的标识,在删除锁时,判断当前这把锁的标识是不是自己存入的,如果是,则进行删除,如果不是,则不进行删除。

img

释放锁

img

4.5 更极端的分布式锁误删

线程1现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的,正准备删除锁,但是此时他的锁到期了,那么此时线程2进来,但是线程1他会接着往后执行,当他卡顿结束后,他直接就会执行删除锁那行代码,相当于条件判断并没有起到作用,这就是删锁时的原子性问题,之所以有这个问题,是因为线程1的拿锁,比锁,删锁,实际上并不是原子性的,我们要防止刚才的情况发生,

img

Lua脚本解决多条命令原子性问题

Redis与MySQL一样,也有事务,Redis的事务能保证事务原子性但没法保证事务一致性。
没法先查询再判断,因Redis中多个事务是最后一次性执行,做查询拿不到结果。

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言。

这里重点介绍Redis提供的调用函数,语法如下:

1
redis.call('命令名称', 'key', '其它参数', ...)

例如,我们要先执行set name Rose,再执行get name,则脚本如下:

1
2
3
4
5
6
# 先执行 set name jack
redis.call('set', 'name', 'Rose')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name

img

img

最终我们操作redis的拿锁比锁删锁的lua脚本就会变成这样

1
2
3
4
5
6
7
8
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0

利用Java代码调用Lua脚本改造分布式锁

lua脚本本身并不需要大家花费太多时间去研究,只需要知道如何调用,大致是什么意思即可。

img

一路走来,利用添加过期时间,防止死锁问题的发生,

但是有了过期时间之后,可能出现误删别人锁的问题,这个问题我们开始是利用删之前 通过拿锁,比锁,删锁这个逻辑来解决的,也就是删之前判断一下当前这把锁是否是属于自己的,

但是现在还有原子性问题,也就是我们没法保证拿锁比锁删锁是一个原子性的动作,最后通过lua表达式来解决这个问题

但是目前还剩下一个问题锁不住,什么是锁不住呢,你想一想,如果当过期时间到了之后,我们可以给他续期一下,比如续个30s,是不是后边的问题都不会发生了,那么续期问题怎么解决呢,可以依赖于我们接下来要学习redission。

5. 分布式锁-redission

5.1 redission功能介绍

基于setnx实现的分布式锁存在下面的问题:

重入问题:重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。

不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。

超时释放:我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患

主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。

以上发生的概率就极低,之前我们自定义的分布式锁能满足大部分情况。

若对锁要求高才用Redission。Redission就是一个分布式工具的集合。

提供了分布式锁的多种多样的功能。

img

5.2 redission快速入门

引入依赖:

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>

配置Redisson客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置类
Config config = new Config();
// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://192.168.150.101:6379")
.setPassword("123321");
// 创建RedissonClient对象 创建客户端
return Redisson.create(config);
}
}

如何使用Redission的分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Resource
private RedissionClient redissonClient;

@Test
void testRedisson() throws Exception{
//获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
//判断获取锁成功
if(isLock){
try{
System.out.println("执行业务");
}finally{
//释放锁
lock.unlock();
}
}
}

在 VoucherOrderServiceImpl
注入RedissonClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Resource
private RedissonClient redissonClient;

@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
//创建锁对象 这个代码不用了,因为我们现在要使用分布式锁
//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
RLock lock = redissonClient.getLock("lock:order:" + userId); // &&&&&&&&&&&&&&&&
//获取锁对象
boolean isLock = lock.tryLock();

//加锁失败
if (!isLock) {
return Result.fail("不允许重复下单");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
//释放锁
lock.unlock();
}
}

5.3 redission可重入锁原理

img

  • 在Lock锁中,他是借助于底层的一个voaltile的一个state变量来记录重入的状态的,比如当前没有人持有这把锁,那么state=0,假如有人持有这把锁,那么state=1,如果持有这把锁的人再次持有这把锁,那么state就会+1
  • 如果是对于synchronized而言,他在c语言代码中会有一个count,原理和state类似,也是重入一次就加一,释放一次就-1 ,直到减少成0 时,表示当前这把锁没有被人持有。

在redission中,我们的也支持支持可重入锁

img

用Lua脚本保证一致性

img

5.4 redission锁重试和WatchDog机制

img

尝试获取锁

  • lock.tryLock (),这时候它会尝试获取锁,如果锁被占用,立即返回 false,不会等待。
  • lock.tryLock (1L, TimeUnit.SECONDS),这里设置了等待时间 1 秒。这时候如果获取锁失败,不会马上返回,而是在 1 秒内不断尝试,在指定时间内通过监听锁释放信号等方式尝试重新获取,直到时间到了还没获取到才返回 false。

可重试机制原理

当线程尝试获取锁失败时,Redisson 不会让线程无休止地尝试,而是采用一种智能的重试策略,结合 Redis 的发布 - 订阅机制,在一定时间内等待锁释放的信号,然后再次尝试获取锁

  • 获取锁尝试:线程调用 tryLock 方法尝试获取锁,Redisson 会向 Redis 发送一个 Lua 脚本,该脚本会检查锁是否已经被其他线程持有。如果锁未被持有,线程成功获取锁;如果锁已被持有,脚本会返回锁的剩余过期时间。
  • 进入等待状态:如果获取锁失败,线程会根据传入的等待时间参数,进入等待状态。在等待期间,线程会订阅 Redis 上的一个特定频道,该频道用于接收锁释放的消息。
  • 监听锁释放信号:当持有锁的线程释放锁时,Redisson 会向该频道发布一条消息。等待的线程会接收到这个消息,意味着锁已经被释放。
  • 再次尝试获取锁:接收到锁释放消息后,等待的线程会再次尝试获取锁。如果在等待时间内成功获取到锁,则继续执行后续业务逻辑;如果等待时间结束仍未获取到锁,则返回获取锁失败的结果。

超时续约机制原理(Watch Dog 机制)

为了防止线程在执行耗时任务时,锁因为过期而被自动释放,导致其他线程也能获取到锁,从而引发数据不一致的问题,Redisson 引入了 Watch Dog 机制。该机制会在锁被获取后,自动为锁设置一个默认的过期时间,并启动一个后台线程定时检查锁是否还被当前线程持有,如果是,则延长锁的过期时间。

  • 默认过期时间设置:当线程调用 lock 方法获取锁时,如果没有指定锁的过期时间(即没有传入 leaseTime 参数),Redisson 会为锁设置一个默认的过期时间,通常是 30 秒。
  • 启动 Watch Dog 线程:在获取锁成功后,Redisson 会启动一个后台线程,该线程会每隔一段时间(默认是过期时间的 1/3,即 10 秒)检查一次锁的状态。
  • 检查锁状态并续约:Watch Dog 线程会通过 Redis 的命令检查锁是否还被当前线程持有。如果是,则向 Redis 发送一个 Lua 脚本,将锁的过期时间延长为默认的过期时间(30 秒)。
  • 停止续约:当线程正常释放锁(调用 unlock 方法)或发生异常导致线程退出时,Watch Dog 线程会停止运行,不再进行续约操作。

img

img

img

img

img

5.5 redission锁的MutiLock原理

img

img

img

  1. 不可重入Redis分布式锁:
    原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标示。
    缺陷:不可重入、无法重试、锁超时失效。
  2. 可重入的Redis分布式锁:
    原理:利用hash结构,记录线程标示和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待。
    缺陷:redis宕机引起锁失效问题。
  3. Redisson的multiLock:
    原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功。
    缺陷:运维成本高、实现复杂。

6. 秒杀优化

6.1 异步秒杀思路

之前优惠券秒杀业务是同步下单 串行操作,现在优化变 异步下单。

img

当然这里边有两个难点

  • 我们怎么在redis中去快速校验一人一单,还有库存判断
  • 由于我们校验和tomct下单是两个线程,那么我们如何知道到底哪个单他最后是否成功,为了完成这件事我们在redis操作完之后,我们会将一些信息返回给前端,同时也会把这些信息丢到异步queue中去,后续操作中,可以通过这个id来查询我们tomcat中的下单逻辑是否完成了。

img

分离成两部分,

  • 主线程判断是否有购买资格(抢单流程),查数据库库存和用户ID费时,故在Redis中判断。
  • 另一独立线程是下单流程,减库存和创建订单是对MySQL的写操作,费时。
  • 两个独立的线程,程序怎么知道给谁创建订单?把id存储到队列中,独立线程从队列中读取。
  • Redis判断完后返回订单Id,用户拿到订单id付款抢单成功。虽订单未真正创建,但可以确保独立线程能创建订单。

img

现在来看看整体思路:

  • 当用户下单之后,判断库存是否充足只需要到redis中去根据key找对应的value是否大于0即可;如果不充足,则直接结束;如果充足,继续在redis中判断用户是否可以下单,
  • 如果set集合中没有这条数据,说明他可以下单,将userId和优惠券存入到redis中,并且返回0。
  • 整个过程需要保证是原子性的,我们可以使用lua来操作
  • 当以上判断逻辑走完之后,我们可以判断当前redis中返回的结果是否是0 。如果是0,则表示可以下单,则将之前说的信息存入到到queue中去,然后返回,然后再来个线程异步的下单,前端可以通过返回的订单id来判断是否下单成功。

6.2 Redis完成秒杀资格判断

img

VoucherServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀库存到Redis中
//SECKILL_STOCK_KEY 这个变量定义在RedisConstans中
//private static final String SECKILL_STOCK_KEY ="seckill:stock:"
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}

完整lua表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]

-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey $$$$$字符串tonumber转数字
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId $$$$$是否存在 == 1表示存在
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.成功
return 0

当以上lua表达式执行完毕后,剩下的就是根据步骤3,4来执行我们接下来的任务了

VoucherOrderServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public Result seckillVoucher(Long voucherId) {
//获取用户
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");

// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(), //$$$$$$$空集合,无key,只有ARGV[1],ARGV[2]
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
int r = result.intValue(); //$$$$$$$ Long类型转int

// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}

//TODO 保存阻塞队列 //$$$$$$$$$$$$$$

// 3.返回订单id
return Result.ok(orderId);
}

6.3 基于阻塞队列实现秒杀优化

VoucherOrderServiceImpl

修改下单动作,现在我们去下单时,是通过lua表达式去原子执行判断逻辑,如果判断我出来不为0 ,则要么是库存不足,要么是重复下单,返回错误信息,如果是0,则把下单的逻辑保存到队列中去,然后异步执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private RedissonClient redissonClient;
@Resource
private StringRedisTemplate stringRedisTemplate;

// Lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}

//异步处理线程池 $$$$$$$$单线程即可因处理订单来说 速度不用太快 一个线程慢慢处理即可
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();


//$$$$$$$在类初始化时执行线程池
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}


// =========4 执行异步下单=========
// 当初始化完毕后,就会去从对列中去拿信息
// =========4.1 线程池任务是从阻塞队列中不断取出订单,然后创建=========
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// 1. 从队列中获取订单信息
VoucherOrder voucherOrder = orderTasks.take();
// 2. 处理订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("处理订单异常", e);
}
} // 补充while循环的闭合大括号
}

// =========4.2 创建流程 获取锁(保证并发安全 实际没必要 锁这段不做也行)=========
private void handleVoucherOrder(VoucherOrder voucherOrder) {
//1.获取用户 $$$$$$$$$$全新线程 不能再从UserHolder即threadLocal中取线程 取不到。从voucherOrder中取
Long userId = voucherOrder.getUserId();

// 2.创建锁对象
RLock redisLock = redissonClient.getLock("lock:order:" + userId);
// 3.尝试获取锁
boolean isLock = redisLock.lock();
// 4.判断是否获得锁成功
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
log.error("不允许重复下单!");
return;
}

// =========4.3 创建订单=========
try {
//注意:由于是spring的事务是放在threadLocal中,此时的是多线程,事务会失效
proxy.createVoucherOrder(voucherOrder); //$$$$$$$$$$$$$$$$$$$$$
} finally {
// 释放锁
redisLock.unlock();
}
}

}


//$$$$$$$$$$$$$$阻塞队列 当一个线程尝试从队列中获取元素,没有元素线程就会被阻塞。知道队列中有元素,线程才会被唤醒并去获取元素
private BlockingQueue<VoucherOrder> orderTasks =new ArrayBlockingQueue<>(1024 * 1024);

//$$$$$$$$$$$$$$要从主线程中拿到代理对象,放到成员变量位置
private IVoucherOrderService proxy;


@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// =========1用户购买资格判断=========
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
int r = result.intValue();

// =========2无资格 结束=========
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}

// =========3有资格 创建订单 加入阻塞队列=========
VoucherOrder voucherOrder = new VoucherOrder();
// 2.3.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 2.4.用户id
voucherOrder.setUserId(userId);
// 2.5.代金券id
voucherOrder.setVoucherId(voucherId);
// 2.6.放入阻塞队列
orderTasks.add(voucherOrder);
//3.获取代理对象 $$$$$$$$$$$$$$$$$$$$$$$$
proxy = (IVoucherOrderService)AopContext.currentProxy();
//4.返回订单id
return Result.ok(orderId);
}


// =========4.3 创建订单=========
@Transactional
@Override
public void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();

// 检查用户是否已购买
long count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
log.error("用户{}已购买过该代金券", userId);
return;
}

// 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0)
.update();

if (!success) {
log.error("代金券{}库存不足", voucherId);
return;
}

save(voucherOrder);
}
}

秒杀业务的优化思路是什么?

  • 先利用Redis完成库存余量、一人一单判断,完成抢单业务
  • 再将下单业务放入阻塞队列,利用独立线程异步下单

基于阻塞队列的异步秒杀存在哪些问题?

  • 内存限制问题
    ① 高并发环境下会有无数订单对象需要创建,放到阻塞队列可能会导致内存溢出。
    ② 如果队列设置的长度存满了,再有新的订单信息就塞不进去了。
  • 数据安全问题
    ① 内存保存。若内存宕机,内存里所有订单信息就丢失了,用户完成了下单付完款,后台却没数据,出现数据不一致。
    ② 线程从队列取出要下单任务,在此时发生了事故,但任务未执行,而任务一旦取出队列中就没有了。以后再不会执行,任务丢失,导致数据又不一致,出现数据安全问题。

7. Redis消息队列

7.1 认识消息队列

什么是消息队列:字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

img

使用队列的好处在于解耦:所谓解耦,举一个生活中的例子就是:快递员(生产者)把快递放到快递柜里边(Message Queue)去,我们(消费者)从快递柜里边去拿东西,这就是一个异步,如果耦合,那么这个快递员相当于直接把快递交给你,这事固然好,但是万一你不在家,那么快递员就会一直等你,这就浪费了快递员的时间,所以这种思想在我们日常开发中,是非常有必要的。

这种场景在我们秒杀中就变成了:我们下单之后,利用redis去进行校验下单条件,再通过队列把消息发送出去,然后再启动一个线程去消费这个消息,完成解耦,同时也加快我们的响应速度。

Redis提供了三种不同的方式来实现消息队列:

list结构:基于List结构模拟消息队列

PubSub:基本的点对点消息模型

Stream:比较完善的消息队列模型

7.2 基于List实现消息队列

  • Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
  • 队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。 不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

img

img

img

基于List的消息队列有哪些优缺点?

优点:

  • 利用Redis存储,不受限于JVM内存上限;
  • 基于Redis的持久化机制,数据安全性有保证;
  • 可以满足消息有序性。

缺点:

  • 无法避免消息丢失;(从redis队列中取出还没处理就宕机了,但此时已经从list中remove了,其他消费者拿不到,则消息丢失)
  • 只支持单消费者

7.3 基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,

  • 消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
  • PUBLISH channel msg :向一个频道发送消息
  • SUBSCRIBE channel [channel] :订阅一个或多个频道
  • PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道

img

img

基于PubSub的消息队列有哪些优缺点?

优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化(发完没人收就丢了)
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

7.4 基于Stream的消息队列

Stream 是 Redis 5.0 引入的一种新数据类型(与list sortedset hash一样),可以实现一个功能非常完善的消息队列。(专为消息队列设计)

img

img

img

img

img

STREAM类型消息队列的XREAD命令特点:

  • 消息可回溯(消息读完不丢失,永久存在队列中)
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

7.5 基于Stream的消息队列-消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

img

img

img

img

imgimg

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次

img

7.6 基于Redis的Stream结构作为消息队列,实现异步秒杀下单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id $$$$$$$$$$$$$$$$$$$$
local orderId = ARGV[3]

-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey $$$$$字符串tonumber转数字
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId $$$$$是否存在 == 1表示存在
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ... $$$$$$$$$$$$$$$
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
-- 3.7.成功
return 0

image-20250909131848622

VoucherOrderServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// 1.获取消息队列中的订单信息
// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有消息,继续下一次循环
continue;
}
// 3.解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 4.如果获取成功,可以下单
createVoucherOrder(voucherOrder);
// 5.确认消息 XACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
//处理异常消息
handlePendingList();
}
}
}

private void handlePendingList() {
while (true) {
try {
// 1.获取pending-list中的订单信息
// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1), // $$$$$$$$$
StreamOffset.create("stream.orders", ReadOffset.from("0")) // $$$$$$$$$
);

// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明pending-list中没有异常消息,结束循环
break;
}

// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);

// 3.创建订单
createVoucherOrder(voucherOrder);

// 4.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} catch (Exception e) {
log.error("处理pendding订单异常", e);
try{
Thread.sleep(20);
}catch(Exception e){
e.printStackTrace();
}
}
}
}

@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 3.返回订单id
return Result.ok(orderId);
}

}

雅鉴生活-2
https://blog.xirui.work/posts/42b3c747.html
作者
xirui
发布于
2024年9月19日
许可协议