学后端到现在也算是开始写第一个可以称之为项目的东西了,虽然都在讲 xx 点评、xx 外卖之类的项目没含金量,我也觉得简历里面写这种东西没什么竞争力,但花一两个星期写完我不认为不会有任何收获,看的也就是 redis - 黑马点评的那个视频
这个项目的前端页面已经提供了,我只需要完成后端接口即可。以各个功能模块的代码作为段落进行解析,记录一下写这个项目遇到的各种问题和学到的、弄明白的各种东西,所以不会贴完整的代码,完整代码会放到 Github 上,等写完了再传,欢迎品鉴 shi 山(
技术栈大概是 springboot、mysql、redis、mybatisplus 这些
# 准备工作下载 redis 并导入到 wsl 里面,wsl 就作为我的 redis 服务器了,挂着。视频提供了 sql 文件,创建一个数据库然后导入一下,相关命令为:
配置文件里配置一下 redis 和 mysql:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 spring: datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/hmdp?useSSL=false&serverTimezone=UTC username: xx password: xx redis: host: 127.0.0.1 port: 6379 lettuce: pool: max-active: 10 max-idle: 10 min-idle: 1 time-between-eviction-runs: 10s
还有前端环境一些杂七杂八的,挺简单的没学到什么东西,懒得写了。
# 短信验证码登录# 需求分析和实现第一个功能模块是做手机号 - 短信验证码登录的功能
前端这里,当用户输入手机号并点击 “发送验证码” 的时候需要向后端请求验证码,验证码发送之后点击 “登录”,又需要发送到后端进行验证
第一个请求是发送验证码,那么后端这里接收到的就是手机号,首先应该校验的就是手机号是否是正确的手机号,可以用正则来一个 mismatch 匹配:
1 public static final String PHONE_REGEX = "^1([38][0-9]|4[579]|5[0-3,5-9]|6[6]|7[0135678]|9[89])\\d{8}$";
若手机号没问题,那么就可以随机生成一个验证码并发送给用户
第二个请求就是登录,后端需要校验用户的手机号和验证码是否匹配。若匹配成功,则又要判定用户是否存在,如果存在可以直接登录,否则应该先自动注册一个账号
那么这里牵扯到了一个矛盾。HTTP 协议本身是无状态的,服务器无法仅通过一次请求判断当前请求是否来自同一用户。为了在多次请求之间保存验证码等临时状态信息,可以使用 Session 机制
**Session 是由 Web 容器按照 Servlet 规范提供的一种会话管理机制,而 tomcat 是其中的一个实现方式。服务器会为每个客户端创建一个会话对象,并通过 Cookie(如 JSESSIONID)在客户端与服务器之间建立关联。** 在发送验证码时,可以将手机号与验证码以键值对的形式存入 Session;在校验验证码时,再从 Session 中取出对应的值进行比对,从而完成验证
这么看来似乎可以解决现在的问题,但随着技术的发展,session 机制渐渐暴露出来很多缺点。现代的后端服务器大多数是 “集群式”,有很多的服务器来共同处理客户端发送的请求,客户端每次发送的请求可能都不是同一个服务器在处理,那么,因为 session 机制是服务器层面的技术,多服务器之间无法完成数据共享,虽然有一些解决办法,但都不太好用
那么就需要找一个好点的解决方法,这个方法就是 Redis,一种键值对型的内存级数据库,可以提供集中式、高性能、可共享的状态存储 。简单来说,它跟 session 不在一个状态层面,独立于 web 容器存在,因此各个服务器都可以访问同一个 redis,这样就解决了数据共享的问题。
在第一个请求中,生成验证码之后需要把它存到 redis 数据库里面,Java 提供了 api,相关代码如下:
1 stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY+phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);
1 String code = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY+phone);
解决了这个问题,再回到刚刚的登录需求
前面提到过 HTTP 请求是无状态的,那么登录完成之后客户端要知道自己已经登录了,仍然可以用 redis 来存储登录状态。具体实现方案为: 使用 redis 的 string-hash 结构来存储,key 段存储一串 token(可以用随机生成的 UUID),value 段存储用户信息,然后把 token 返回给前端,以后客户端每次请求都携带这个 token,服务端再进行校验即可。
但是,原来的用户类 User 保存了太多的信息,如果直接把它转成 Map 然后存到 redis 里面,再返回给前端的话,HTTP 请求体里面会直接暴露用户的敏感信息,那么则需要一个 “中间类”,只做必要的校验,舍弃掉敏感信息即可,同时应该注意设置有效期,防止用户数量过多把 redis 存储空间占满
到此为止短信验证码登录就完成了
# 拦截器前面提到,服务端需要校验从客户端发来的 token 以鉴别用户,那么我们不可能在每个 service 方法里面都写个校验,应该把它总结到一个地方,也就是 spring MVC 提供的拦截器 (注意区分它与过滤器)
一次 HTTP 请求的执行顺序如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 客户端请求 ↓ Filter(编码 / 跨域等) ↓ DispatcherServlet ↓ Interceptor.preHandle() ↓ Controller ↓ Interceptor.postHandle() ↓ Interceptor.afterCompletion()
首先创建一个类 LoginInterceptor ,它需要继承接口 HandlerInterceptor ,然后根据业务需求,需要实现里面的方法 preHandle ,在这里面做登录校验即可,首先获取 token
1 String token = request.getHeader("Authorization");
然后用这个 token 去数据库里面找用户,如果 token 或用户不存在直接返回 401 即可,如果存在则先把它转成 UserDTO 对象,然后把它存到.....
存到哪?我要在后面的 controller、service 继续使用这个用户,可是我应该把它存在什么地方?
第一个想法是,我的 redis 里面都有这个用户了,直接照着 token 拿不就完了。但是,这里的 token 是前端传来的,要从 HTTP 请求体里面拿,也就是说:
Controller 传一次
Service 再传
每层都要带 token
那么参数会爆炸,而且后续的业务逻辑里面还要写查询,会导致重复的 IO,性能很差且污染业务逻辑
把它塞到 HTTP 请求体里面,然后让后面的 controller 自己 get 一下?也一样会污染参数和业务逻辑。
这里可以用 ThreadLocal ,它的作用很简单:同一个线程里,随便在哪,都能拿到这份数据;换一个线程,互相看不见,真实的数据在堆内存中,它只存引用
核心原理是,一次 HTTP 请求就认为是一个线程,那么需要的机制也就是在一次请求内,全局可用,但又不会串请求 ,这就是 ThreadLocal 干的
1 2 3 // 将Hash数据转为UserDTO对象 UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap,new UserDTO(),false); UserHolder.saveUser(userDTO);
这里如果不写 saveUser 的操作,当请求走出拦截器代码部分时,userDTO 对象就会被销毁。写了之后则会提升它的作用域,使它一直存在于堆内存中,后续可以用 ThreadLocal 获取,不用传任何参数,本质是把这个 userDTO 变成了逻辑上的全局变量,物理上的线程私有变量
唯一需要注意的是,一定要在 controller 结束后 UserHolder.removeUser() 删掉这次的对象,否则线程池复用可能把上一个请求的数据 “泄露” 到下一个请求,造成严重的安全问题。
到这里为止,短信验证码登录和校验功能都已经完成了,但这里有一个细节性的东西还是可以修改一下。
之前写的拦截器代码里面做了登录状态(即 token)的刷新,但这个拦截器并不是拦截一切路径,它只拦截一些做登录校验的路径。可是如果用户一直在访问比如首页、商户详情页之类不需要登录的页面,一定时间之后他的登录状态会被取消,显然不太合理
改进方法就是再加一个拦截器,把这两个拦截器的功能隔开,一个做 token 刷新,一个负责拦截即可,代码也不用动太多,稍微改改就行
# 商户查询缓存# 需求分析由于 redis 的读写速度极快,因此可以用作客户端 - 服务端 - 数据库之间的缓存区域,具体模型如下:
简单写段代码,然后在前端刷新页面,可以发现第一次查询和第二次查询的请求时间明显降低,从秒级到了毫秒级
这里踩了个坑
MyBatis-Plus 的 list() 不会返回 null ,只会返回:
也就是说不能用 list == null 来判空,得用 list.isEmpty() ,用 null 来判断会导致代码变为 “死代码”,永远不会进这个逻辑
# 缓存更新策略如果数据库的数据更新了但缓存的数据还是老数据,就会导致客户端查询到的数据与数据库的数据出现差异,因此需要及时更新缓存中的数据,常见的缓存更新思路包括先删除缓存再更新数据库、先更新数据库再删除缓存等方式。在并发读写场景下,由于数据库操作与缓存操作之间不存在原子性,可能会出现短暂的数据不一致问题,导致客户端拿到的数据有误
大致思路如上,缓存一致性问题主要来源于并发时序问题(读写竞争)和分布式环境下的操作非原子性,像现在写的单体系统不用在意太多,先了解一下即可 。尤其注意一下第三张图的并发问题,相对于缓存操作来说,数据库操作速度极慢。因此先更新数据库再删除缓存,虽然仍存在并发读写导致短暂不一致的可能,但在实际应用中出现概率较低,且可通过缓存过期时间进行最终一致性兜底,故在项目中常用第二种方案
# 缓存穿透缓存穿透指的是客户端请求的数据在缓存和数据库中都不存在,这样缓存永远不会生效,但每次的请求都会发送到数据库 。如果有人利用这个漏洞向服务器发送大量无意义的请求数据,很有可能会导致服务器过载以至于崩溃,因此有必要解决这种问题
解决方法有两种:
缓存空对象
布隆过滤
这个东西暂时不用深入了解原理,大致知道它是用哈希算法 进行的,不用真正存储数据库所有数据的一种方法即可
优点是占用内存较少,不用存储多余的 key 缺点是实现复杂且有误判可能(若被布隆过滤器识别为存在不一定是真的存在,这样的话仍可能出现穿透现象) 上面两个方法是较为常见的被动防御缓存穿透,但实际上还有一些主动防御的方法,比如对请求进行校验,不符合要求的直接拦截等方法。这里先不深入了
知道了缓存穿透这个现象,那就应该修改一下前面的缓存代码,采用的是第一种方法,相关代码为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 // 防御缓存穿透相关代码,留作备份 // public Shop queryWithPassThrough(Long id){ // String key = CACHE_SHOP_KEY + id; // // 从redis查询缓存 // String shopJson = stringRedisTemplate.opsForValue().get(key); // // // 注意这里的isNotBlank仅仅判断查询到的shopJson有没有值,null、空值等不包括在内,因此下面还要做一次校验 // if (StrUtil.isNotBlank(shopJson)){ // return JSONUtil.toBean(shopJson, Shop.class); // } // // // 如果查询到的不是null,那么就只能是我们设置的空值 // if (shopJson != null) { // return null; // } // // Shop shop = getById(id); // // 如果数据库里面都不存在,则需要先缓存一下空值,然后返回fail // if (shop == null){ // stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL + RandomUtil.randomLong(1,6),TimeUnit.MINUTES); // return null; // } // // 若存在则先保存到redis,然后返回给用户 // stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL + RandomUtil.randomLong(1,6), TimeUnit.MINUTES); // // return shop; // }
# 缓存雪崩缓存雪崩指的是同一时段大量的缓存 key 同时失效或者 Redis 服务宕机,导致大量请求直接打入数据库带来巨大访问压力
解决方法有下面这些:
给不同 Key 的 TTL 添加几分钟的随机值 利用 Redis 集群提高服务的可用性(微服务集群方面) 给缓存业务添加降级限流 给业务添加多级缓存(浏览器端、JVM 端等) 代码方面采用的是简单的第一种
# 缓存击穿也称为热点 key 问题 ,指的是一个被高并发访问且缓存业务重建较为复杂的 key 突然失效了,大量访问请求会瞬间冲向数据库以过载。
缓存业务重建较为复杂 指的是某些数据不仅仅是查一个数据库而得,可能需要多表查询甚至进行大量计算才可以得到,缓存重建的时间比较久的一种情况。此时它又是一个热点 key,有大量并发线程来访问,那么就会出现这种情况:
在重建缓存过程中,大量线程都会未命中缓存,导致大量线程同时查询数据库并进行缓存重建,这一超高爆发对数据库会带来极大冲击
解决方法有两个:互斥锁和逻辑过期
要解决的问题只有一个:防止多个线程一块去查询数据库导致数据库访问量过多而崩溃。只能让一个线程去查数据库
这边我的代码里面两种方法都写了,留作一个备份
# 互斥锁这里没有用 Java 提供的 lock 机制,而是通过 redis 的 setnx 关键字来实现简单自定义互斥锁,具体操作看代码即可,大致思路为
一开始写的原始代码长这样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public Shop queryWithMutex(Long id){ String key = CACHE_SHOP_KEY + id; // 从redis查询缓存 String shopJson = stringRedisTemplate.opsForValue().get(key); // 注意这里的isNotBlank仅仅判断查询到的shopJson有没有值,null、空值等不包括在内,因此下面还要做一次校验 if (StrUtil.isNotBlank(shopJson)){ return JSONUtil.toBean(shopJson, Shop.class); } // 如果查询到的不是null,那么就只能是我们设置的空值 if (shopJson != null) { return null; } // 到这为止就是“未命中”,需要尝试获取互斥锁 String keyLock = LOCK_SHOP_KEY + id; Shop shop = null; try { // 如果锁获取失败,那么需要休眠一段时间再来获取,用递归实现 if (!tryLock(keyLock)) { Thread.sleep(50); return queryWithMutex(id); } shop = getById(id); // 如果数据库里面都不存在,则需要先缓存一下空值,然后返回fail if (shop == null){ stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL + RandomUtil.randomLong(1,6),TimeUnit.MINUTES); return null; } // 若存在则先保存到redis,然后返回给用户 stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL + RandomUtil.randomLong(1,6), TimeUnit.MINUTES); } catch (Exception e) { throw new RuntimeException(e); }finally { unlock(keyLock); } return shop; }
有几个点需要注意,也就是我代码的修改处
在 try-catch-finally 里面有这样一句: return queryWithMutex(id) ,执行顺序是先 queryWithMutex (id)-> 保存返回值 ->finally-> 返回返回值 。
那么中间有这样一段逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 public Shop queryWithMutex(Long id) { boolean isLock = tryLock(); try { if (!tryLock(keyLock)) { Thread.sleep(50); return queryWithMutex(id); } } finally { unlock(keyLock); } }
模拟一下: 一个线程来了尝试获取锁,如果没有拿到则进入 if 语句进行递归 -> 第二次递归假设拿到了锁
那么在第二次递归中,由于它拿到了锁,那么就不会进 if 语句,继续下面的查询数据库 -> 添加缓存 -> 走到 finally 里面释放锁 -> 返回结果到上一层递归,然后由上一层递归再返回出去。
似乎没问题?但实际上当上一层(也就是第一层)递归在返回之前,它也要进一次 finally 并进行 unlock 释放锁,如果说在释放锁之前有另一个线程刚好拿到锁,由于我的代码中锁的 key 是一致的 String keyLock = LOCK_SHOP_KEY + id ,那么此时就会把另一个线程的锁释放掉
改进了之后出现的第二个问题是,我加的锁是有 TTL 的,那么就不可避免地会出现锁过期的现象,可以预料到这样的场景:
A 线程还没有执行完,他的锁就过期了。此时有一个线程 B 拿到了锁,如果刚好 A 线程进了 finally 段,此时它的 isLock 校验是可以过的,那么就把线程 B 的锁释放掉了。
解决问题也很简单,既然可能会导致误删,那么我给每个线程拿到的锁都做一个特定标识,在释放锁的时候都去校验一下这个标识不就好了?标识可以对 value 进行操作,因为前面的代码里面这个值都是固定的 “1”,正好可以用来做唯一性标识
思路没问题,改一下代码即可,释放锁那里的代码一开始可能会想着改成这样:
1 2 3 4 String valueInRedis = stringRedisTemplate.opsForValue().get(key); if (lockValue.equals(valueInRedis)) { stringRedisTemplate.delete(key); }
既然我要根据 value 来判断,那我直接取出来比一下就行了
在以往的代码中,这种思路没什么问题,因为写的都是单线程。可是在多线程模式下这个思路会出现很大的安全隐患
中间可能发生并发:
线程 A get 得到 value "v1" ,然后经过了校验 线程 A 还没 delete 线程 B 抢到了锁,把 key 设置成 "v2" 线程 A 执行 delete → 删掉了 B 的锁 那么这里就应该引入一个新的概念,用 Redis 支持的 内嵌 Lua 脚本 ,可以把多条命令包装成单条原子操作
Lua 是一种轻量级脚本语言:
语法简单、像 Python / JavaScript 一样动态 内存占用小,执行效率高 适合嵌入到其他程序里当 “脚本引擎” 使用 没学过这东西,代码里的 Lua 是 ai 写的,这里贴一下各个段落的作用:
1 2 3 4 5 6 7 8 9 10 11 12 13 if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end /* KEYS[1] → Redis 脚本的第一个 key,在 Java 里传的是锁 key ARGV[1] → Redis 脚本的第一个参数,在 Java 里传的是锁 value(UUID token) redis.call('get', KEYS[1]) → 拿到当前锁的 value 判断 get(key) 是否等于我们自己的 token(保证安全解锁) 相等 → 删除锁 (del) 并返回 1 不相等 → 不删除,返回 0 */
1 2 3 4 5 stringRedisTemplate.execute( UNLOCK_SCRIPT, // Lua 脚本 Collections.singletonList(key),// KEYS value // ARGV );
参数 含义 UNLOCK_SCRIPT要执行的 Lua 脚本封装对象,包含脚本内容和返回类型 Collections.singletonList(key)Lua 脚本的 KEYS 数组,在 Lua 里通过 KEYS[1] 获取 valueLua 脚本的 ARGV 数组,在 Lua 里通过 ARGV[1] 获取
跑了个测试类,只有第一个线程去查了数据库,其它的 49 个都走的缓存,目的应该是达到了
毕竟本地的数据库访问速度挺快的,也懒得再去模拟其他情况了。没看出来啥问题,就到这里吧
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Test public void testConcurrentQuery() throws InterruptedException { Long id = 1L; stringRedisTemplate.delete(CACHE_SHOP_KEY + id); // 模拟缓存未命中 // 模拟50个并发请求 int threadCount = 50; CountDownLatch latch = new CountDownLatch(threadCount); // 循环启动50个线程,第一个线程拿到锁去查询数据库,然后存到缓存里面,其它的线程阻塞等待第一个线程拿到数据或者超时返回null for (int i = 0; i < threadCount; i++) { new Thread(() -> { try { Shop shop = (Shop) shopService.queryById(id).getData(); System.out.println(Thread.currentThread().getName() + ": " + shop); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } }).start(); } latch.await(); }
# 逻辑过期
这里默认一定命中,若未命中则认为不是热点 key,不需要处理
对于我的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 // 创建10个线程池 private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10); // 逻辑过期解决缓存穿透 public Shop queryWithLogicalExpire(Long id){ String key = CACHE_SHOP_KEY + id; // 从redis查询缓存 String shopJson = stringRedisTemplate.opsForValue().get(key); // 这里默认一定命中,若未命中则认为不是热点key,不需要处理 if (StrUtil.isBlank(shopJson)){ return null; } // 若命中则需要判断缓存是否过期,先把查询到的redis字符串反序列化为对象 RedisData redisData = JSONUtil.toBean(shopJson,RedisData.class); Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(),Shop.class); LocalDateTime expireTime = redisData.getExpireTime(); // 判断逻辑是否过期,若未过期则直接返回即可,如果过期,则需要利用互斥锁,让一个线程去重建缓存,其它的线程先直接返回过期信息即可 if (expireTime.isAfter(LocalDateTime.now())){ return shop; } String keyLock = LOCK_SHOP_KEY + id; String valueLock = UUID.randomUUID().toString(); boolean isLock = tryLock(keyLock,valueLock); // 拿到锁了之后新创建一个新的线程,让它去重建缓存,自己先直接返回旧数据 if (isLock){ // 拿到锁之后仍然要做一次double check shopJson = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(shopJson)) { redisData = JSONUtil.toBean(shopJson,RedisData.class); if (redisData.getExpireTime().isAfter(LocalDateTime.now())){ shop = JSONUtil.toBean((JSONObject) redisData.getData(),Shop.class); unlock(keyLock,valueLock); return shop; } } CACHE_REBUILD_EXECUTOR.submit(() -> { try { this.saveShop2Redis(id,20L); } catch (Exception e){ // 异步线程抛异常不会回到主线程,因此其抛异常没有什么意义,应该做一下日志 log.error("缓存重建失败,shopId={}", id, e); }finally{ unlock(keyLock,valueLock); } }); } return shop; }
有一些点提一下,做个笔记。
这里的线程池是懒创建的 ,当第一次出现 submit ,发生的事情是:
线程池发现:当前线程数 < 10 创建一个新线程 用这个线程执行你的任务 第二次 submit ,如果第一个线程还在忙,那就再创建一个新线程
直到最多创建 10 个线程时,再多任务 → 排队
也就是说它们不会一开始就创建 10 个。
submit 那里用的 lambda 表达式 ,原语法应该为
1 2 3 4 5 6 new Runnable() { @Override public void run() { this.saveShop2Redis(id, 20L); } }
写到这里突然发现一件事,就是前面的几个问题都有多种解决方案,我想都留存下来作为学习用,但是都写在项目文件里面然后用注释来保存感觉也太丑了。。。而且真实项目肯定也不会这么干。同时,Service 层不应该做这么多东西,把很多方法都写在里面显得很臃肿
联想到之前学的一些配置类,应该可以把自己写的各种方法也作为一种 “策略”,然后提供接口,再用类似配置类的形式规定接口使用哪个实现,这样既方便后续更改,也不会把项目代码搞得太丑,还保证了 Service 层的功能性
方案有了,那么就来实现(大量借助 ai。。。。 code 水平真的有待提升)
首先创建了一个 ShopCacheStrategy 接口,用于 Service 层的调用,然后创建两个类作为实现类,分别实现对应的 query 方法。
同时我新建了一个工具类包,用于存放一些工具类,我把取与释放锁的相应代码写到了里面,保证一下 Service 的简洁。
一开始我没有动太多,就是直接把代码挪到不同地方而已。但这样做会导致实现类里面仍然需要调用 getById 方法,也就是说我还需要让策略实现类继承那个 mp 父类,这显然有点 “越权” 嫌疑。
那么,如果我的这些类里面不能出现 getById ,那应该怎么查询数据库?其实这里面根本就不应该查询数据库,因为它只是一个解决问题的实现类,不应该让它与数据库产生交互
但是我重建缓存的时候又得进行 double check,不查数据库怎么看?
可以用 Supplier<T>
Supplier<T> 是 Java 8 的一个 函数式接口 ,定义在 java.util.function 包里:
1 2 3 4 @FunctionalInterface public interface Supplier<T> { T get(); }
它很简单:就是一个没有参数,但会返回一个值的 “工厂 / 提供者”。
调用 get() 方法,返回一个 T 类型的对象。 它没有参数,只有返回值。 也就是说,我可以用它的 get 方法获取一个类型为 T 的对象。那么这个对象怎么来?从数据库里面查。在哪查?不能在实现类里面查,应该在 Service 里面查
所以我在 Service 层里面写的是:
1 Shop shop = cacheStrategy.queryById(id, () -> getById(id));
第二个参数为 lambda 表达式, () 表示没有参数, getById(id) 表示 supplier 的返回值。
当调用 dbFallback.get() 时,代码才会走这个 getById 来获得 shop 对象并返回,而这一过程是在调用处的类进行的,而不是在实现类处进行的
这里还有另一种方式。用 Function<R,T>,功能上没什么区别,也没有说哪个更好,只是适用范围不同:
方案切换的话,因为方案比较少,所以我简单写了个 Config 类,用原生的 Spring 方案结合 yaml 配置文件来更改配置,这样更换实现类的时候只用改一下配置文件即可
1 2 3 shop: cache: strategy: mutexStrategy
# 总结这一节的主要需求就是缓存问题 。因为查询数据库所用的时间过长,所以我们需要把会被多次查询到的数据存到一个缓存空间(类似于电脑的 “内存”)以提高服务性能,也可以保证服务器的稳定性。而在做缓存需求的时候又会出现很多问题,比如:
缓存的数据可能与数据库的数据不一致,此时要更新缓存,更新缓存的话又会出现等等问题 缓存拿不到就去找数据库,万一数据库也没有,此时缓存就形同虚设 缓存的空间远小于数据库,哪些数据应该存到缓存,哪些不应该 多线程下的大量并发可能会导致哪些没有被考虑到的问题? 基于解决需求时遇到的问题,又出现了种种解决方案,于是在解决缓存问题的基础上又引申学习了很多策略来解决遇到的问题
# 优惠券秒杀# 全局唯一 ID 及实现联系实际可以明白,订单数据的 id 不能直接用 mysql 数据库的自增解决:
因此要生成 “全局 ID”,保证全局内的 id 都一致,且 id 必须无规律
这里利用的是 redis 的字符串自增策略来解决的问题,序列号为:符号占位符 + 时间戳 + 实际序列号(共 64bit),代码比较短就贴在这了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package com.hmdp.utils; /* 全局id生成工具类 生成的id共为64bit的序列号,第一位为符号位(不用管),中间31位为时间戳,后面32位为实际递增的序列号 */ import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; @Component public class RedisIdWorker { @Resource private StringRedisTemplate stringRedisTemplate; // 初始时间戳 private static final long BEGIN_TIMESTAMP = 1640995200L; // 时间戳左移的位数,可更改 private static final int COUNT_BITS = 32; public long nextId(String keyPrefix){ // 生成时间戳 LocalDateTime now = LocalDateTime.now(); long nowSecond = now.toEpochSecond(ZoneOffset.UTC); long timestamp = nowSecond - BEGIN_TIMESTAMP; /* 用redis的字符串自增策略生成序列号 */ String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd")); // 把key对应的value + 1,若不存在则视为0(value必须不存在或者为整数,非整数均会报错) long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date); return timestamp << COUNT_BITS | count; } }
测试方法为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Autowired private RedisIdWorker redisIdWorker; // 创建一个 最多 500 个线程的线程池 private static final ExecutorService es = Executors.newFixedThreadPool(500); @Test public void testRedisIdWorker() throws InterruptedException { // 可以理解为一个300的计数器。每完成一个任务-1,变成 0 时await() 才会放行 CountDownLatch latch = new CountDownLatch(300); // 每个用户请求 100 次 ID,共300个线程,所以一共发起了300x100=30000个请求 Runnable task = () -> { for (int i = 0; i < 100; i++){ long id = redisIdWorker.nextId("order"); System.out.println("id = " + id); } // 表示一个进程结束了 latch.countDown(); }; long begin = System.currentTimeMillis(); for (int i = 0; i < 300; i++) { es.submit(task); } // 阻塞当前测试线程,直到 300 个 task 全部执行完 latch.await(); long end = System.currentTimeMillis(); System.out.println("time = " + (end - begin)); }
# 添加秒杀优惠券先看看秒杀优惠券的实体类,有些注解不太懂,写了些注释
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 package com.hmdp.entity; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; import java.io.Serializable; import java.time.LocalDateTime; /** * <p> * 秒杀优惠券表,与优惠券是一对一关系 * </p> * * @author 虎哥 * @since 2022-01-04 */ @Data @EqualsAndHashCode(callSuper = false) @Accessors(chain = true) // 上面三个注解均是lombok // EqualsAndHashCode表示生成 equals 和 hashCode 时,不调用父类的字段 // 意思就是在判断两个对象是否相等、以及计算哈希值时,只看当前这个类自己声明的字段,不管它父类里有什么字段,但这个类没父类,所以可有可无 // Accessors表示开启链式调用 @TableName("tb_seckill_voucher") // 告诉 MyBatis-Plus这个实体类对应哪张表。如果没有它,MyBatis-Plus 会默认SeckillVoucher → seckill_voucher // Serializable表明这个对象可以被序列化(变成字节流) public class SeckillVoucher implements Serializable { // 防止序列化版本不一致导致反序列化失败,一般写1L,不写也可以 private static final long serialVersionUID = 1L; /** * 关联的优惠券的id */ // value表示Java 字段 voucherId ↔ 数据库字段 voucher_id。 // type表示MyBatis-Plus 主键策略,这里用的是INPUT,表明插入 tb_seckill_voucher 时,voucher_id 必须自己传 @TableId(value = "voucher_id", type = IdType.INPUT) private Long voucherId; /** * 库存 */ private Integer stock; /** * 创建时间 */ private LocalDateTime createTime; /** * 生效时间 */ private LocalDateTime beginTime; /** * 失效时间 */ private LocalDateTime endTime; /** * 更新时间 */ private LocalDateTime updateTime; }
理解了之后,先测试一下现有的添加秒杀券功能 addSeckillVoucher
由于 MvcConfig 那里放行过了,因此这里不需要登录也能正常发出请求
# 优惠券秒杀下单功能一开始完成的下单代码逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 package com.hmdp.service.impl; import com.hmdp.dto.Result; import com.hmdp.entity.SeckillVoucher; import com.hmdp.entity.VoucherOrder; import com.hmdp.mapper.VoucherOrderMapper; import com.hmdp.service.ISeckillVoucherService; import com.hmdp.service.IVoucherOrderService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.hmdp.utils.RedisIdWorker; import com.hmdp.utils.UserHolder; import org.apache.tomcat.jni.Local; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.time.LocalDateTime; /** * <p> * 服务实现类 * </p> * * @author 虎哥 * @since 2021-12-22 */ @Service public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Override public Result seckillVoucher(Long voucherId) { // 查询优惠券 SeckillVoucher voucher = seckillVoucherService.getById(voucherId); // 判断秒杀时间 if (voucher.getBeginTime().isAfter(LocalDateTime.now()) || voucher.getEndTime().isBefore(LocalDateTime.now())){ return Result.fail("当前不在秒杀时间"); } // 判断库存数量 if (voucher.getStock() < 1){ return Result.fail("库存不足"); } // 先让库存数减一 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") .eq("voucher_id", voucherId).update(); if (!success){ // 扣减库存失败,一般认为也是库存不足 return Result.fail("库存不足"); } // 创建订单 VoucherOrder voucherOrder = new VoucherOrder(); // 获取订单id,用前面的全局id生成器 long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); // 获取用户id,用UserHolder Long userId = UserHolder.getUser().getId(); voucherOrder.setUserId(userId); // 优惠券id直接根据传入的即可 voucherOrder.setVoucherId(voucherId); save(voucherOrder); // 返回订单编号 return Result.ok(orderId); } }
写到这里还和秒杀优惠券没什么关系,只是可以下单了
但根据实际情况来说,这种限时的优惠活动一定会导致高并发,那么就不能忽视线程安全问题。
# 超卖问题及解决方案这里涉及到的超卖问题也是经典的线程安全问题
简单来说,无法保证多个线程的执行顺序如我们所愿,根据执行顺序不同,它们可能会产生很多奇葩的错误结果
多线程并发问题解决方案一般来说就是加锁,而锁的设计理念分为悲观锁和乐观锁:
对于高并发而言,悲观锁的性能较差,不太适合作为解决方案,这里学习的是乐观锁
乐观锁的核心设计问题就在于如何 “在更新数据时去判断有没有其它线程对数据做了修改”,有很多实现的方案,这里我采用的是加一个 “版本号”,辅助以 SQL 条件判断,用于鉴别当前操作的数据是否和之前查到的一致
版本号是一个数据库字段,每次操作数据的时候就要把它做一个变化,以表示这个数据被操作了
而同时可以发现,stock 字段被操作的时候就会做一个变化,那么在现在这个场景下,我们就可以用这个数据本身去做它的版本号而不用引入新的字段,本质上是 CAS(Compare And Swap)思想在数据库层的体现
结合上述分析,这里要修改一下代码
一开始改成了
1 2 3 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") .eq("voucher_id", voucherId).eq("stock",voucher.getStock()).update();
后面经过测试,发现大部分线程会获取失败。原因是乐观锁的设计原理:只要发现数据被修改了,那么就认为不安全,拒绝访问
那么这里的判断条件其实是有误的,我们实际上想解决的问题是超卖而不是不让同步卖出。按照上述写法会导致大部分同步线程都执行失败,表现出异常率过高
所以这里应该改成:
1 2 3 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") .eq("voucher_id", voucherId).gt("stock",0).update();
也就是只要查到的库存数大于 0 就允许卖出
# 一人一单根据实际业务需求,有代码思路如下:
这里仍然会涉及多线程并发安全问题,所以需要加锁。由于乐观锁是在更新过程中适用,因此这里用到的是悲观锁。由于这里我还没怎么学过多线程,也没有搞过分布式集群,所以下面都是基于单服务器 来理解的
一开始写出来的代码长这样,我把主要逻辑移到了一个方法里面,然后对方法加了锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Transactional public synchronized Result createVoucherOrder(Long voucherId) { // 判断该用户是否已经买过 Long userId = UserHolder.getUser().getId(); int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); if (count > 0){ return Result.fail("用户已购买过一次!"); } // 先让库存数减一 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") .eq("voucher_id", voucherId).gt("stock",0).update(); if (!success){ // 扣减库存失败,一般认为也是库存不足 return Result.fail("库存不足"); } // 创建订单 VoucherOrder voucherOrder = new VoucherOrder(); // 获取订单id,用前面的全局id生成器 long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); // 优惠券id直接根据传入的即可 voucherOrder.setVoucherId(voucherId); save(voucherOrder); // 返回订单编号 return Result.ok(orderId); }
如果是单 JVM 这样写其实功能没有问题,可以保证线程安全了
但是把锁加到方法上,就意味着无论多少个线程(即使是多个不同用户)过来,它们都只能一个个的串行执行方法,效率大打折扣,因为它们拿到的都是同一把锁
根据上述分析,可以把代码改为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Transactional public Result createVoucherOrder(Long voucherId) { // 判断该用户是否已经买过 Long userId = UserHolder.getUser().getId(); synchronized (userId.toString().intern()) { int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); if (count > 0) { return Result.fail("用户已购买过一次!"); } // 先让库存数减一 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") .eq("voucher_id", voucherId).gt("stock", 0).update(); if (!success) { // 扣减库存失败,一般认为也是库存不足 return Result.fail("库存不足"); } // 创建订单 VoucherOrder voucherOrder = new VoucherOrder(); // 获取订单id,用前面的全局id生成器 long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); // 优惠券id直接根据传入的即可 voucherOrder.setVoucherId(voucherId); save(voucherOrder); // 返回订单编号 return Result.ok(orderId); } }
这一版代码使得不同的用户能拿到不同的锁,提高了并发性能
注意这里的 userId.toString().intern() 。因为 toString 方法的底层是 new String (),返回的是引用地址值(每次都创建新对象,导致地址值不一致)而不是实际值(synchronized 内部比较的是引用地址值)。但 String.intern() 的语义是返回字符串常量池中 “内容相同的那个唯一 String 对象” ,这就可以保证同一个 userId 对象一定能得到 true 的结果
似乎可以了,跑一下测试看看:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Resource private IVoucherOrderService voucherOrderService; @Test public void testConcurrentCreateVoucherOrder() throws InterruptedException { int threadCount = 100; ExecutorService executorService = Executors.newFixedThreadPool(20); CountDownLatch latch = new CountDownLatch(threadCount); Long voucherId = 11L; // 数据库里真实存在的秒杀券 ID Long userId = 10001L; // 随便写一个userId模拟同一个用户 for (int i = 0; i < threadCount; i++) { executorService.submit(() -> { try { // 模拟登录逻辑,保证ThreadLocal里面有User信息 UserDTO user = new UserDTO(); user.setId(userId); UserHolder.saveUser(user); Result result = voucherOrderService.seckillVoucher(voucherId); if (result.getSuccess()) { System.out.println(Thread.currentThread().getName() + " SUCCESS, orderId=" + result.getData()); } else { System.out.println(Thread.currentThread().getName() + " FAIL, reason=" + result.getErrorMsg()); } } finally { // 必须清理ThreadLocal UserHolder.removeUser(); latch.countDown(); } }); } latch.await(); }
回去看看数据库和日志,可以发现同一个用户买了 10 个库存。问题依然没有解决
回看一下代码,可以发现事物 @Transactional 是由 spring 管理的,因此只有当函数结束后才可以被提交。但是锁是在函数内部的,因此只有锁先被释放,事物才可以提交
那么在锁被释放到事物提交的这段时间,很有可能又有其它线程获取锁并执行完 createVoucherOrder 函数
解决方法也很简单,把锁移到外面,让事物先提交就行了
1 2 3 4 Long userId = UserHolder.getUser().getId(); synchronized (userId.toString().intern()) { return createVoucherOrder(voucherId,userId); }
但是这样忽略了一个重要的问题:事务失效
可以看到代码在 createVoucherOrder 方法的上面加了 @Transactional 事务,但是由于锁包括的那一段属于同类内部调用自己方法 ,因此会导致事务失效
原因是 Spring 代理只拦截 通过代理对象调用的方法 ,而这里属于内部调用,不会经过代理,因此事务不会被拦截和开启
回忆一下 spring 的事务相关概念:
1️⃣ 核心思想 Spring 的事务管理是通过 AOP(面向切面编程)代理 来实现的。
@Transactional 本质上是告诉 Spring:“在这个方法执行前后加上事务逻辑”。 Spring 会生成一个 代理对象 ,代理对象在方法调用时,会自动处理事务的开启、提交或回滚。 2️⃣ 代理对象
Spring 为我们写的类生成的一个 “包装类”:如果类实现了接口 → JDK 动态代理 如果类没有接口 → CGLIB 继承生成代理类 我们拿到的 Bean,其实是代理对象 ,而不是我们写的原始对象。代理对象在方法调用时,先执行事务逻辑,然后调用原始方法,方法返回后再处理事务提交或回滚。 调用流程:
1 调用 Bean 方法 -> 代理对象 intercept -> 开启事务 -> 调用原始方法 -> 方法返回 -> 提交/回滚事务
3️⃣ 事务生效条件
事务只在 通过代理对象调用的方法 时生效 内部调用(同一个类里直接 this.method ())不会经过代理 → 事务失效 解决方法:通过 AopContext.currentProxy() 获取代理对象,再调用方法 或者把调用移到外部类 4️⃣ 事务的实际行为
开启事务 :在方法开始,Spring 创建或加入一个数据库事务提交事务 :方法正常结束,事务提交回滚事务 :方法抛出异常,事务回滚(默认只回滚 RuntimeException)在这里调用的方法 createVoucherOrder 就是属于内部调用,不会走代理对象,也就导致上面加的事务没啥用
这里的改进方法就是获取代理对象再进行调用,要做三件事:
1️⃣ 在启动类添加注解 @EnableAspectJAutoProxy(exposeProxy = true)
作用:开启 Spring AOP 的 基于代理的切面支持 (事务本质上是切面编程思想的运用)。 参数 exposeProxy = true 的意义:默认情况下,Spring 只在 外部 Bean 调用 代理方法时才生效事务。 exposeProxy = true 会把当前代理对象放到 ThreadLocal ,可以通过 AopContext.currentProxy() 拿到。 为什么需要:在同一个类内部调用 createVoucherOrder 并让事务生效,必须通过代理对象调用,而不是直接 this.createVoucherOrder() 。 exposeProxy = true 就是为 AopContext.currentProxy() 提供前提。否则无法获取到代理对象 2️⃣ 改进 Service 的代码
将锁块内部代码改为:
1 2 3 4 synchronized (userId.toString().intern()) { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId,userId); }
3️⃣ 引入相关依赖
需要在 pom 文件里面引入依赖:
1 2 3 4 <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> </dependency>
暂时不用了解那么多,只需要知道 Spring 的注解式 AOP(包括事务)底层依赖 AspectJ 的织入能力 即可
# 分布式锁前一节学的 “一人一单” 问题最终用了事务 + 悲观锁解决了,但那个方法只能解决单 JVM 下的线程安全问题
但是对于多进程的集群而言每个进程都有自己的锁监视器,也就是说锁无法再限制多个线程了,如下图:
那么理所应当的可以想到解决方案,用一个所有进程共同的锁监视器即可
继续转换问题,那么现在就要寻找到一个共同锁监视器的实现对象,它应该满足以下几点:
redis、mysql 都可以作为实现对象,还有一个不熟悉的 zookeeper(是一个分布式协调服务,暂时不用在意)。分析比较一下它们实现上述功能的方案差异
这个教程主要是学习 redis 的,因此后面的分布式锁实现方案也用的是 redis
# Redis 分布式锁实现思路
这里用 SET lock thread1 NX EX 10 而不用 SETNX lock thread1 + expire lock 10 的原因是为了保证获取锁和加过期时间这两步操作的原子性
# 具体实现代码# 初版简易代码及问题分析按照上面的思想,定义出接口,然后用类去实现
这里就可以写出来一段能用的简略代码了。首先写一个管理锁的生成和释放的类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package com.hmdp.utils; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.concurrent.TimeUnit; public class SimpleRedisLock implements ILock{ // 可以把userId用来做保证唯一性的key private String name; private StringRedisTemplate stringRedisTemplate; public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate){ this.name = name; this.stringRedisTemplate = stringRedisTemplate; } private static final String KEY_PREFIX = "lock:"; @Override public boolean tryLock(long timeoutSec) { // 获取线程标识 long threadId = Thread.currentThread().getId(); // 获取锁 Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS); // 防止自动拆箱出现空指针 return Boolean.TRUE.equals(success); } @Override public void unlock() { stringRedisTemplate.delete(KEY_PREFIX + name); } }
然后修改一下下单的 VoucherOrderServiceImpl 类中的加锁逻辑,把原来的:
1 2 3 4 synchronized (userId.toString().intern()) { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId,userId); }
改成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 // 创建锁对象 SimpleRedisLock lock = new SimpleRedisLock("order" + userId, stringRedisTemplate); // 从锁对象获取锁 boolean isLock = lock.tryLock(1200); if (!isLock){ // 若获取锁失败,则返回错误或重试。这里直接返回错误(为了防止超卖问题) return Result.fail("不允许重复下单"); } try{ IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId,userId); }finally { // 释放锁 lock.unlock(); }
但是可以发现这个时候的代码还会存在一个问题,具体如下面的极端情况示例:
核心问题就是一个线程在释放锁的时候没有进行检查,可能释放的并不是自己的锁。解决方法也很简单,在释放锁之前进行一次检查即可。那么流程就变成如下这样:
也就是说,要给每个 JVM 的每个线程添加的锁给上一个特定的标识。在集群式环境下会有多个 JVM,那么最开始单纯地用线程 ID 来作为线程标识就不可行了(因为线程 ID 是有规律的,多个 JVM 可能会产生相同的线程 ID)。因此这里采用 UUID + 线程 ID 来做标识,保证了锁的唯一性
将 SimpleRedisLock 改为了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package com.hmdp.utils; import cn.hutool.core.lang.UUID; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.concurrent.TimeUnit; public class SimpleRedisLock implements ILock{ // 可以把userId用来做保证唯一性的key private String name; private StringRedisTemplate stringRedisTemplate; public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate){ this.name = name; this.stringRedisTemplate = stringRedisTemplate; } private static final String KEY_PREFIX = "lock:"; private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-"; // 获取线程标识 String threadId = ID_PREFIX + Thread.currentThread().getId(); @Override public boolean tryLock(long timeoutSec) { // 获取锁 Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS); // 防止自动拆箱出现空指针 return Boolean.TRUE.equals(success); } @Override public void unlock() { // 用key去redis数据库找value再和线程标识比对,若这两个相等则可以证明是自己的锁,可以释放,否则不应该释放 String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); if (threadId.equals(id)){ stringRedisTemplate.delete(KEY_PREFIX + name); } } }
自己也起了两个端口,用 postman 测了,没有问题。这里因为摆烂太久导致时间跨度有点大,甚至忘了拦截器那边的流程,折腾 redis 和 token 折腾了半天。。
# Lua 脚本解决误删问题即使做了判断,这里仍然会有某些极端情况导致误删,如下图所示,若线程 1 在判断锁标识成功之后恰好因为 JVM 的原因(一般来说是 JVM 的垃圾回收机制 GC 导致的 STW 停顿,在高并发场景下停十几秒是很常见的)而阻塞,导致释放锁的操作没有完成。若此时阻塞的时间过长导致锁自动超时释放,其它线程可以获取锁,当线程 1 阻塞解除之后再次释放锁,就会发生误删问题
这个问题出现的根本原因是判断锁标识和释放锁是分开操作的,若能保证它们是原子性操作则可以解决这个问题。这里用到前面互斥锁那里用过的 Lua 脚本,那次用是直接 AI 给的代码,这次再来详细复习它的相关函数 API
Redis 提供了调用函数,语法为:
1 redis.call('命令名称','key','其它参数')
如 set name jack :
1 redis.call('set','name','jack')
可以在 redis 的客户端控制台用 eval 命令执行脚本,举个例子为:
1 eval "return redis.call('set', 'name', 'jack')" 0
后面的 0 表示 key 类型的参数有 0 个(在这里参数分为 key 类型参数和其它类型参数),控制台会返回 “OK”,然后可以看到 name 被存进去了
还可以在 Lua 脚本里面加参数,具体规则和例子如下图:
Lua 里面数组的角标是从 1 开始的而非 0
那么可以写一段简易可用的 Lua 代码:
1 2 3 4 5 6 7 8 -- 获取锁中的线程标识 get key local id = redis.call('get',KEYS[1]) -- 比较线程标识与锁中的标识是否一致 if(id == ARGV[1]) then -- 释放锁 del key return redis.call('del',KEYS[1]) end return 0
Java 调用 Lua 脚本,可以用 StringRedisTemplete 提供的 API:
将 Lua 脚本写到 resources 文件夹下的新建文件 unlock.lua ,尽量减少代码耦合度,最终代码改为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 package com.hmdp.utils; import cn.hutool.core.lang.UUID; import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import java.util.Collections; import java.util.concurrent.TimeUnit; public class SimpleRedisLock implements ILock{ // 可以把userId用来做保证唯一性的key private String name; private StringRedisTemplate stringRedisTemplate; public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate){ this.name = name; this.stringRedisTemplate = stringRedisTemplate; } private static final String KEY_PREFIX = "lock:"; private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-"; // 作为Lua的执行媒介对象 private static final DefaultRedisScript<Long> UNLOCK_SCRIPT; static{ UNLOCK_SCRIPT = new DefaultRedisScript<>(); UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua")); UNLOCK_SCRIPT.setResultType(Long.class); } // 获取线程标识 String threadId = ID_PREFIX + Thread.currentThread().getId(); @Override public boolean tryLock(long timeoutSec) { // 获取锁 Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS); // 防止自动拆箱出现空指针 return Boolean.TRUE.equals(success); } @Override public void unlock() { // 调用lua脚本 stringRedisTemplate.execute( UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name), ID_PREFIX + Thread.currentThread().getId() ); } }
现在来进行测试,这里简述一下测试操作:
先进行登录,获得 token
在 IDEA 开两个端口,然后打个断点:
用 postman 分别发送两个 POST 请求(Header 中记得带上 token),路径分别为http://localhost:8081/voucher-order/seckill/11 , http://localhost:8082/voucher-order/seckill/11
这里要先发第一个请求并步过到 if 判断这里,可以看到获取到了锁,然后在 redis 客户端那里把它删掉
删掉之后再发第二个端口的请求,一样步过到 if 判断这里,可以发现两个线程都拿到了锁
然后步过第一个线程结束再看看 redis 客户端,可以看到它没有释放这个锁,步过第二个线程至结束,可以看到锁被正确释放了
经过测试,现代码可以正常执行原下单和防超卖逻辑,也保证了原子性。就先到这里了