0.前提回顾 在东林在线微课堂-我的课表相关:已经可以实现课表的增删改查接口,但是在查看已学习课程时候有两个字段没有返回:
我们需要在查询结果中返回已学习课时数、正在学习的章节名称。虽然我们在learning_lesson表中设计了两个字段:
learned_sections:已学习章节数
latest_learn_time:最近学习时间
以上的问题归纳下来,就是一个学习进度统计 问题,这在在线教育、视频播放领域是一个非常常见的问题。
大部分人的学习自律性是比较差的,属于“买了就算会了”的状态。如果学员学习积极性下降,学习结果也会不尽人意,从而产生挫败感。导致购买课程的欲望也会随之下降,形成恶性循环,不利于我们卖课。
所以,我们推出学习计划 的功能,让学员制定一套学习计划,每周要学几节课。系统会做数据统计,每一周计划是否达标,达标后给予奖励,未达标则提醒用户,达到督促用户持续学习的目的。
用户学习效果好了,产生了好的结果,就会有继续学习、购买课程的欲望,形成良性循环。
因此,学习计划、学习进度统计其实是学习辅助中必不可少的环节。
1.学习计划相关 准备阶段—分析业务流程 1.==学习计划 == 在我的课程页面,可以对有效的课程添加学习计划:
学习计划就是简单设置一下用户每周计划学习几节课:
有了计划以后,我们就可以在我的课程页面展示用户计划的完成情况,提醒用户尽快学习:
可以看到,在学习计划中是需要统计用户“已经学习的课时数量”。
2.==学习进度统计== 在原型图《课程学习页-录播课-课程学习页-目录》中,可以看到学习课程的原型图:
一个课程往往包含很多个章(chapter) ,每一章下又包含了很多小节(section) 。章本身没有课程内容,只是划分课程的一个概念。小节分两种,一种是视频 ;一种是每章最后的阶段考试 —-> 用户学完一个视频/参加了最终的考试都算学完了一个小节。
==统计学习进度:====用户学了多少小节[①视频:完播率超过75%②考试:考试提交]==
因而引出几个问题:
因此,用户在播放视频的过程中,需要不断地提交视频的播放进度,当我们发现视频进度超过75%的时候就标记这一小节为已完成 因此,我们需要记录视频是否完成 ,也需要记录用户具体播放到第几秒视频 [这样下次播放就可以实现视频自动续播]
也就是说,要记录用户学习进度,需要记录下列核心信息:
小节的基础信息(id、关联的课程id等)
当前的播放进度(第几秒)
当前小节是否已学完(播放进度是否超75%)
用户每学习一个小节,就会新增一条学习记录,当该课程的全部小节学习完毕,则该课程就从学习中 进入已学完 状态了。整体流程如图:
准备阶段—字段分析 数据表的设计要满足学习计划[learning_lesson
表在我的课表需求完成设计] 、学习进度[目前需要] 的功能需求:
按照之前的分析,用户学习的课程包含多个小节,小节的类型包含两种:
视频:视频播放进度超过50%就算当节学完
考试:考完就算一节学完
学习进度除了要记录哪些小节学完,还要记录学过的小节、每小节的播放的进度(方便续播)。因此,需要记录的数据就包含以下部分:
学过的小节的基础信息
小节id
小节对应的lessonId课表id
用户id:学习课程的人
小节的播放进度信息
视频播放进度:也就是播放到了第几秒
是否已经学完:播放进度有没有超过50%
第一次学完的时间:用户可能重复学习,第一次从未学完到学完的时间要记录下来
再加上一些表基础字段,整张表结构就出来了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 CREATE TABLE IF NOT EXISTS `learning_record` ( `id` bigint NOT NULL COMMENT '学习记录的id', `lesson_id` bigint NOT NULL COMMENT '对应课表的id', `section_id` bigint NOT NULL COMMENT '对应小节的id', `user_id` bigint NOT NULL COMMENT '用户id', `moment` int DEFAULT '0' COMMENT '视频的当前观看时间点,单位秒', `finished` bit(1) NOT NULL DEFAULT b'0' COMMENT '是否完成学习,默认false', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '第一次观看时间', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间(最近一次观看时间)', PRIMARY KEY (`id`) USING BTREE, KEY `idx_update_time` (`update_time`) USING BTREE, KEY `idx_user_id` (`user_id`) USING BTREE, KEY `idx_lesson_id` (`lesson_id`,`section_id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='学习记录表';
准备阶段—ER图 准备阶段—表结构 准备阶段—Mybatis-Plus代码生成
准备阶段—类型枚举 我们需要准备一些VO和DTO等
==————————具体实现———————-== 按照用户的学习顺序,依次有下面几个接口:
创建学习计划
查询学习记录
提交学习记录
查询我的计划
1.创建学习计划 1.原型图 在个人中心的我的课表列表中,没有学习计划的课程都会有一个创建学习计划 的按钮,在原型图就能看到:
创建学习计划,本质就是让用户设定自己每周的学习频率:
2.设计数据库 3.业务逻辑图 当我们创建学习计划时,就是根据课程id和用户id去更新learning_lesson
表,写入week_freq
并更新plan_status
为计划进行中即可。
4.接口分析 而学习频率我们在设计learning_lesson表的时候已经有两个字段来表示了:
当我们创建学习计划时,就是根据课程id和用户id去更新learning_lesson
表,写入week_freq
并更新plan_status
为计划进行中即可。
因此请求参数就是课程的id、每周学习频率。再按照Restful风格,最终接口如下:
5.具体实现
无
6.具体难点和亮点 就是简单的创建学习计划【根据userId和courseId课程id更新一行数据的weekFreq和status字段】
2.查询学习记录 1.原型图 用户创建完计划自然要开始学习课程,在用户学习视频的页面,首先要展示课程的一些基础信息。例如课程信息、章节目录以及每个小节的学习进度:
其中:
①课程、章节、目录信息等数据都在课程微服务。【课程信息是必备的】
②学习进度肯定是在学习微服务。【学习进度却不一定存在】
2.设计数据库 3.业务逻辑图 ①课程、章节、目录信息等数据都在课程微服务。【课程信息是必备的】
②学习进度肯定是在学习微服务。【学习进度却不一定存在】
因此,查询这个接口的请求———>课程微服务【查询课程、章节信息】,再由课程微服务———>学习微服务【查询学习进度】,合并后一起返回给前端即可。
所以,学习中心要提供一个查询章节学习进度的Feign接口,事实上这个接口已经在tj-api模块的LearningClient中定义好了:
根据courseId和userId获取课表id和最近学习的小节id,然后根据课表id获取多条学习记录。【小节id,小节视频播放进度,小节是否学习完】
4.接口分析 对应的DTO也都在tj-api模块定义好了,因此整个接口规范如下:
5.具体实现
6.具体难点和亮点 无[就是查询而已]
3.提交学习记录(每15秒提交–很难懂!!!) 1.原型图 2.设计数据库 3.业务逻辑图 之前分析业务流程的时候已经聊过,学习记录==用户当前学了哪些小节,以及学习到该小节的进度如何。而小节类型分为考试、视频两种。
考试比较简单,只要提交了就说明这一节学完了。
视频比较麻烦,需要记录用户的播放进度,进度超过75%才算学完。因此视频播放的过程中需要不断提交播放进度到服务端,而服务端则需要保存学习记录到数据库。
只要记录了用户学过的每一个小节,以及小节对应的学习进度、是否学完。无论是视频续播 、还是统计学习计划进度 ,都可以轻松实现了。
因此,提交学习记录就是提交小节的信息和小节的学习进度信息。考试提交一次即可,视频则是播放中频繁提交。提交的信息包括两大部分:
小节的基本信息
小节id
lessonId课程id
小节类型:可能是视频,也可能是考试。考试无需提供播放进度信息
提交时间
播放进度信息
视频时长:时长结合播放进度可以判断有没有超过50%
视频播放进度:也就是第几秒
具体业务思路:
4.接口分析 综上,提交学习记录的接口信息如下:
5.具体实现
serviceimpl层代码整体逻辑:
其中处理课表:
其中处理视频:
其中处理考试:
无【全用的mybatisplus完成】
6.具体难点和亮点
问题一:学习记录服务有必要提交到服务端?在客户端不就可以保存
我设置的是课程学习页面播放视频时/考试后,需要提交学习记录信息到服务端保存。每隔15s提交一次。【保证换个设备还可以查看】
问题二:实现思路是什么?
①获取当前用户
②处理学习记录 —>2.1判断提交类型,①处理视频[存在记录更新学习记录并且判断是否第一次学习,不存在就新增学习记录]②处理考试[只需要新增学习记录,返回true一定是已学习]
③处理课表记录 —> 3.1查找课表,3.2判断是否全部学完,3.3放在一个更新课表[①本来就修改的字段②学习完全部小节,多修改一个字段③第一次学习,多修改一个字段]
判断是否是考试 :通过前端传入的dto判断sectionType字段
判断记录已经存在 :通过lessonid课程id和sectionId小节id确定一行record,如果有就是存在
判断是否第一次学习完 :通过判断record的finished字段未完成&&前端传入的视频播放秒数moment*2>前端传入的视频总长duration
判断判断是否学习完全部课程 :当前lesson的learnedsections已学习小节数+1>课程总小节数【课程微服务查询出】
4.查询我的计划(封装数据难) 1.原型图 在个人中心的我的课程页面,会展示用户的学习计划及本周 的学习进度,原型如图:
2.设计数据库 3.业务逻辑图 需要注意的是这个查询其实是一个分页查询,因为页面最多展示10行,而学员同时在学的课程可能会超过10个,这个时候就会分页展示,当然这个分页可能是滚动分页,所以没有进度条。另外,查询的是我的 学习计划,隐含的查询条件就是当前登录用户,这个无需传递,通过请求头即可获得。
因此查询参数只需要分页 参数即可。
查询结果中有很多对于已经学习的小节数量的统计,因此将来我们一定要保存用户对于每一个课程的学习记录 ,哪些小节已经学习了,哪些已经学完了。只有这样才能统计出学习进度。
查询的结果如页面所示,分上下两部分。:
①总的统计信息:
本周已完成总章节数:需要对学习记录做统计
课程总计划学习数量:累加课程的总计划学习频率即可
本周学习积分:积分暂不实现
②正在学习的N个课程信息的集合,其中每个课程包含下列字段:
该课程本周学了几节:统计学习记录
计划学习频率:在learning_lesson表中有对应字段
该课程总共学了几节:在learning_lesson表中有对应字段
课程总章节数:查询课程微服务
该课程最近一次学习时间:在learning_lesson表中有对应字段
4.接口分析 综上,查询学习计划进度的接口信息如下:
5.具体实现
输出结果分为两个模块: ①本周计划和积分奖励
②课程信息:
6.具体难点和亮点
【本质就是,学习记录表一行就是学了一个小节;表内每个课程都有一个week_freq,计算总和就是本周计划总数】
1 2 3 4 5 6 7 8 9 #统计用户本周已学习小节总数 select count(*) from learning_record where user_id=2 and finished=true and finish_time between '2022-10-11 10:12:34' and '2022-10-20 10:12:34'; #统计用户本周计划学习小节总数 select sum(week_freq) from learning_lesson where user_id=129 and status in(0,1) and plan_status=1;
问题二:怎么统计当前用户课表和某个课程本周的学习小节数
【本质就是,分页查询就是加个limit;某个课程要根据group by lesson_id课程id,因为一个课程在record记录表每个小节id都有一行数据,一个课程id在lesson课表中每个课程有一个】
1 2 3 4 5 6 7 8 9 10 11 #分页查询当前用户的课表 select * from learning_lesson where user_id=2 and status in(0,1) and plan_status=1 limit 0,2; #查询某个课程本周的学习小节数 select lesson_id,count(*) from learning_record where user_id=2 and finished=true and finish_time between '2024-08-19 10:12:34' and '2024-08-26 10:12:34' group by lesson_id;
1 2 3 4 5 6 7 public static LocalDateTime getWeekBeginTime(LocalDate now) { return now.minusDays(now.getDayOfWeek().getValue() - 1).atStartOfDay(); } public static LocalDateTime getWeekEndTime(LocalDate now) { return LocalDateTime.of(now.plusDays(8 - now.getDayOfWeek().getValue()), LocalTime.MAX); }
5.定时监测—课程是否过期 1.原型图 定期检查learning_lesson表中的课程是否过期,如果过期则将课程状态修改为已过期。
2.设计数据库 3.业务逻辑图 4.接口分析 5.具体实现
3.mapper层
6.具体难点和亮点
①SpringTask定时任务使用@Scheduled注解+@Async异步调用+@Retryable重试机制 —》保证既定时执行又异步且具备重试功能的健壮任务
②实现SchedulingConfigurer接口
③Quartz框架
④MQ延迟队列 【在定时任务方法里面发送消息给MQ,让MQ进行业务修改】
==面试题==
你在开发中参与了哪些功能开发让你觉得比较有挑战性?
答:我参与了整个学习中心的功能开发,其中有很多的学习辅助功能都很有特色。比如视频播放的进度记录。我们网站的课程是以录播视频为主,为了提高用户的学习体验,需要实现视频续播功能。这个功能本身并不复杂,只不过我们产品提出的要求比较高:
要达成这个目的,使用传统的手段显然是不行的。
首先,要做到切换设备后还能续播,用户的播放进度必须保存在服务端,而不是客户端。
其次,用户突然断开或者切换设备,续播的时间误差不能超过30秒,那播放进度的记录频率就需要比较高。我们会在前端每隔15秒就发起一次心跳请求,提交最新的播放进度,记录到服务端[写在数据库内,可能会导致数据库压力过大问题]。这样用户下一次续播时直接读取服务端的播放进度,就可以将时间误差控制在15秒左右。
==———————-高并发优化——————–== 1.高并发方案[三个]
其中,②水平扩展和③服务保护侧重的是运维层面的处理。而①提高单机并发能力侧重的则是业务层面的处理,也就是我们程序员在开发时可以做到的。
1.1方案一:提高单机并发[数据库方面-读写优化] 在机器性能一定的情况下,提高单机并发能力就是要尽可能缩短业务的响应时间(R esponseT ime),而对响应时间影响最大的往往是对数据库的操作。而从数据库角度来说,我们的业务无非就是读 /写 两种类型。
对于==读>写== 的业务,其优化手段大家都比较熟悉了,主要包括两方面:
对于==读<写== 的业务,大家可能较少碰到,优化的手段可能也不太熟悉,这也是我们要讲解的重点。
对于高并发写的优化方案有:
优化代码和SQL
同步写 —> 异步写
合并写数据请求
1.1.1 同步写 –> 异步写 由于各个业务之间是同步串行执行,因此整个业务的响应时间就是每一次数据库写业务的响应时间之和,并发能力肯定不会太好。
优化的思路很简单,利用MQ可以把同步业务变成异步,从而提高效率。
当我们接收到用户请求后,可以先不处理业务,而是发送MQ消息并返回给用户结果。
而后通过消息监听器监听MQ消息,处理后续业务。
这样一来,用户请求处理和后续数据库写就从同步变为异步,用户无需等待后续的数据库写操作,响应时间自然会大大缩短。并发能力自然大大提高。
①无需等待复杂业务处理,大大减少了响应时间 ②利用MQ暂存消息,起到流量削峰整形 ③降低写数据库频率,减轻数据库并发压力
①依赖于MQ的可靠性 ②只是降低一些频率,但是没有减少数据库写次数
业务复杂, 业务链较长,有多次数据库写操作的业务
1.1.2 合并写请求 合并写请求方案其实是参考高并发读的优化思路:当读数据库并发较高时,我们可以把数据缓存到Redis ,这样就无需访问数据库,大大减少数据库压力,减少响应时间。
合并写请求就是指当写数据库并发较高时,不再直接写到数据库。而是先将数据缓存到Redis,然后定期将缓存中的数据批量写入数据库。
由于Redis是内存操作,写的效率也非常高,这样每次请求的处理速度大大提高,响应时间大大缩短(↓),并发能力肯定有很大的提升。
而且由于数据都缓存到Redis了,积累一些数据后再批量写入数据库,这样数据库的写频率(↓)、写次数(↓)都大大减少,对数据库压力小了非常多!
①写缓存速度快,响应时间大大缩短(↓) ②降低数据库的写频率(↓)和写次数(↓)
①实现相对复杂 ②依赖Redis可靠性 ③不支持事务和复杂业务
写频率高,写业务相对简单的业务
2.业务优化-提交学习记录 2.1 业务优化选型分析 提交进度统计包含大量的数据库读、写操作 。不过提交播放记录还是以写数据库 为主。因此优化的方向还是以高并发写优化为主。
考试:每章只能考一次,还不能重复考试。因此属于低频行为(×),可以忽略
视频进度:前端每隔15秒就提交一次请求。在一个视频播放的过程中,可能有数十次请求,但完播(进度超50%)的请求只会有一次。因此多数情况下都是更新一下播放进度即可。
也就是说,95%的请求都是在更新learning_record
表中的moment
视频播放秒数字段,以及learning_lesson
表中的最近正在学习的小节id和最近学习时间两个字段上。
而播放进度信息,不管更新多少次,下一次续播肯定是从最后的一次播放进度开始续播。也就是说我们只需要记住最后一次即可。因此可以采用合并写方案来降低数据库写的次数和频率,而异步写做不到。
综上,提交播放进度业务虽然看起来复杂,但大多数请求的处理很简单,就是==更新播放进度 ==。并且播放进度数据是可以合并的(覆盖之前旧数据)。我们建议采用合并写请求方案:
2.2 Redis数据结构[hash哈希] 我们的优化方案要处理的不是所有的提交学习记录请求。仅仅是视频播放时的高频更新播放进度的请求,对应的业务分支如图:
这条业务支线的流程如下:
查询播放记录,判断是否存在【存在就更新学习记录,不存在就新增学习记录】
判断当前进度是否是第一次学完【播放进度要超过50% + 原本的记录状态是未学完】
更新课表中最近学习小节id、学习时间【无论如何】
这里有多次数据库操作,例如:
查询播放记录:需要知道播放记录是否存在、播放记录当前的完成状态
更新record学习记录表的播放记录:更新播放进度
更新课表lesson表最近学习小节id、时间
一方面我们要缓存写数据 ,减少写数据库频率;另一方面我们要缓存播放记录 ,减少查询数据库。因此,缓存中至少要包含3个字段:
既然一个课程包含多个小节,我们完全可以把一个课程的多个小节作为一个KEY来缓存,==Redis最终数据结构如图==:
这样做有两个好处:
可以大大减少需要创建的KEY的数量,减少内存占用。
一个课程创建一个缓存,当用户在多个视频间跳转时,整个缓存的有效期都会被延续,不会频繁的创建和销毁缓存数据
2.3 业务逻辑修改–redis缓存 添加缓存之后,业务逻辑更改为:
提交播放进度后,如果是更新播放进度则不写数据库,而是写缓存
需要一个定时任务,定期将缓存数据写入数据库
变化后的业务具体流程为:
1.提交学习记录
2.判断是否是考试
是:新增学习记录,并标记有小节被学完。走步骤8
否:走视频流程,步骤3
3.查询播放记录缓存,如果缓存不存在则查询数据库并建立缓存
4.判断记录是否存在
4.1.否:新增一条学习记录
4.2.是:走更新学习记录流程,步骤5
5.判断是否是第一次学完(进度超50%,旧的状态是未学完)
5.1.不是第一次学完:仅仅是要更新播放进度,因此直接写入Redis并结束
5.2.是第一次学完:代表小节学完,走步骤6
6.更新学习记录状态为已学完
7.清理Redis缓存:因为学习状态变为已学完,与缓存不一致,因此这里清理掉缓存,这样下次查询时自然会更新缓存,保证数据一致。
8.更新课表中已学习小节的数量+1
9.判断课程的小节是否全部学完
2.4 业务逻辑修改–定时任务将redis缓存到数据库
但是定时任务的持久化方式在播放进度记录业务中存在一些问题,主要就是时效性问题。我们的产品要求视频续播的时间误差不能超过30秒。
假如定时任务间隔较短,例如20秒一次,对数据库的更新频率太高,压力太大
假如定时任务间隔较长,例如2分钟一次,更新频率较低,续播误差可能超过2分钟,不满足需求
因此,我们考虑将用户==最后一次提交==的播放进度写入数据库
==【只要用户一直在提交记录,Redis中的播放进度就会一直变化。如果Redis中的播放进度不变,肯定是停止了播放,是最后一次提交】==
因此,我们只要能判断Redis中的播放进度是否变化 即可—–>每当前端提交(15s)播放记录时,我们可以设置一个延迟任务 并保存这次提交的进度 。等待20秒后(因为前端每15秒提交一次,20秒就是等待下一次提交),检查Redis中的缓存的进度与任务中的进度是否一致。
不一致:说明持续在提交,无需处理
一致:说明是最后一次提交,更新学习记录、更新课表最近学习小节和时间到数据库中
流程如下:
3.延迟任务方案(定时任务) 针对2.4提出用户提交的播放记录是否变化,我们需要将更新播放记录做一个延迟任务,等待超过一个提交周期(20s)后检查播放进度
延迟任务的实现方案有很多,常见的有四类:
DelayQueue
Redisson
MQ
时间轮
原理
JDK自带延迟队列,基于阻塞队列实现。
基于Redis数据结构模拟JDK的DelayQueue实现
利用MQ的特性。例如RabbitMQ的死信队列
时间轮算法
优点
不依赖第三方服务
分布式系统下可用不占用JVM内存
分布式系统下可以不占用JVM内存
不依赖第三方服务性能优异
缺点
占用JVM内存只能单机使用
依赖第三方服务
依赖第三方服务
只能单机使用
以上四种方案都可以解决问题,不过本例中我们会使用DelayQueue方案。因为这种方案使用成本最低,而且不依赖任何第三方服务,减少了网络交互。
但缺点也很明显,就是需要占用JVM内存,在数据量非常大的情况下可能会有问题。但考虑到任务存储时间比较短(只有20秒),因此也可以接收。
【如果数据量非常大,DelayQueue不能满足业务需求,大家也可以替换为其它延迟队列方式,例如Redisson、MQ等】
3.1 DelayQueue实现原理 1 2 3 4 5 6 7 //实现了BlockingQueue接口【是一个阻塞队列】 public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); // ... 略 }
其中:DelayQueue内部的元素必须是Delayed类型,这其实就是一个延迟任务的规范接口 从源码中可以看出,Delayed类型必须具备两个方法:
getDelay() :获取延迟任务的剩余延迟时间
compareTo(T t) :比较两个延迟任务的延迟时间,判断执行顺序
可见,Delayed类型的延迟任务具备两个功能:①获取剩余延迟时间、②比较执行顺序
将来每一次提交播放记录,就可以将播放记录保存在这样的一个Delayed
类型的延迟任务里并设定20秒的延迟时间。然后交给DelayQueue
队列。DelayQueue
会调用compareTo
方法,根据剩余延迟时间对任务排序。剩余延迟时间越短的越靠近队首,这样就会被优先执行。
3.2 DelayQueue具体用法 首先定义一个Delayed类型的延迟任务类,要能保持任务数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class DelayTask<T> implements Delayed { //实现Delayed接口【实现两个方法】 //数据 private T data; //执行时间(纳秒) private long activeTime; public DelayTask(T data, Duration delayTime) { this.data = data; this.activeTime = System.nanoTime() + delayTime.toNanos(); //当前时间+延迟时间 } //返回任务剩余的时间 @Override public long getDelay(TimeUnit unit) { //设定时间-当前时间[和构造方法不一定是一个时间] return unit.convert(Math.max(0,activeTime-System.nanoTime()), TimeUnit.NANOSECONDS); } //排序 @Override public int compareTo(Delayed o) { long l=this.getDelay(TimeUnit.NANOSECONDS)-o.getDelay(TimeUnit.NANOSECONDS); if(l>0){ return 1; }else if(l<0){ return -1; }else{ return 0; } } }
接下来就可以创建延迟任务,交给延迟队列保存:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Slf4j class DelayTaskTest { @Test void testDelayQueue() throws InterruptedException { // 1.初始化延迟队列 DelayQueue<DelayTask<String>> queue = new DelayQueue<>(); // 2.向队列中添加延迟执行的任务 log.info("开始初始化延迟任务。。。。"); queue.add(new DelayTask<>("延迟任务3", Duration.ofSeconds(3))); queue.add(new DelayTask<>("延迟任务1", Duration.ofSeconds(1))); queue.add(new DelayTask<>("延迟任务2", Duration.ofSeconds(2))); // 3.尝试执行任务 while (true) { DelayTask<String> task = queue.take(); //take方法是阻塞式,如果没有延迟任务就会阻塞 log.info("开始执行延迟任务:{}", task.getData()); } } }
注意 :本用例直接同一个线程来执行任务了。当没有任务的时候线程会被阻塞。而在实际开发中,我们会准备线程池,开启多个线程来执行队列中的任务。
4.具体改造[直接看这里!!!] 具体改造之后的业务逻辑图:
4.1 定义延迟任务工具类
读取redis数据【用于判断记录是否已经存在,先在redis查询】
是第一次学习,更新学习记录,删除redis
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 package com.tianji.learning.task; import com.tianji.common.utils.JsonUtils; import com.tianji.common.utils.StringUtils; import com.tianji.learning.domain.po.LearningLesson; import com.tianji.learning.domain.po.LearningRecord; import com.tianji.learning.mapper.LearningRecordMapper; import com.tianji.learning.service.ILearningLessonService; import lombok.Data; import lombok.NoArgsConstructor; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.time.Duration; import java.time.LocalDateTime; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.DelayQueue; @Slf4j @Component @RequiredArgsConstructor public class LearningRecordDelayTaskHandler { private final StringRedisTemplate redisTemplate; private final LearningRecordMapper recordMapper; private final ILearningLessonService lessonService; private final DelayQueue<DelayTask<RecordTaskData>> queue = new DelayQueue<>(); private final static String RECORD_KEY_TEMPLATE = "learning:record:{}"; //业务前缀【防止只有一个1不知道是谁的课表1】 private static volatile boolean begin = true; @PostConstruct //项目初始前执行 public void init(){ //异步 //1.自定义线程池【注册到spring容器内,注入线程池,线程池调用业务】 //2.使用CompletableFuture【内部也是多线程】 CompletableFuture.runAsync(this::handleDelayTask); } @PreDestroy public void destroy(){ begin = false; //多线程共享数据begin,必须用volatile其他线程可见性!!! log.debug("延迟任务停止执行!"); } //处理延迟任务[判断数据是否一致] public void handleDelayTask(){ while (begin) { try { // 1.[不断地取]获取到期的延迟任务 DelayTask<RecordTaskData> task = queue.take(); //take是阻塞式,没有任务就阻塞 RecordTaskData data = task.getData(); // 2.查询Redis缓存 LearningRecord record = readRecordCache(data.getLessonId(), data.getSectionId()); if (record == null) { continue; } // 3.比较数据,moment值 if(!Objects.equals(data.getMoment(), record.getMoment())) { //redis中数据和任务中的数据 // 不一致,说明用户还在持续提交播放进度,放弃旧数据 continue; } // 4.一致,持久化播放进度数据到数据库 // 4.1.更新学习记录的moment时刻 record.setFinished(null); recordMapper.updateById(record); // 4.2.更新课表最近学习信息 LearningLesson lesson = new LearningLesson(); lesson.setId(data.getLessonId()); lesson.setLatestSectionId(data.getSectionId()); lesson.setLatestLearnTime(LocalDateTime.now()); lessonService.updateById(lesson); } catch (Exception e) { log.error("处理延迟任务发生异常", e); } } } //将数据添加到redis,并且添加一个延迟检查任务到DelayQueue public void addLearningRecordTask(LearningRecord record){ // 1.添加数据到Redis缓存 writeRecordCache(record); // 2.提交延迟任务到延迟队列 DelayQueue queue.add(new DelayTask<>(new RecordTaskData(record), Duration.ofSeconds(20))); } //将更新学习记录的数据缓存起来 public void writeRecordCache(LearningRecord record) { log.debug("更新学习记录的缓存数据"); try { // 1.数据转换 String json = JsonUtils.toJsonStr(new RecordCacheData(record)); //转为json【id,moment,finished】 // 2.写入Redis String key = StringUtils.format(RECORD_KEY_TEMPLATE, record.getLessonId()); //learning:record:{lessonId} redisTemplate.opsForHash().put(key, record.getSectionId().toString(), json); //KEY[lessonid]-HashKey[sectionid]-HashValue[{id:xxx,moment:xxx,finished:xxx}] // 3.添加缓存过期时间 redisTemplate.expire(key, Duration.ofMinutes(1)); //设置过期时间1分钟 } catch (Exception e) { log.error("更新学习记录缓存异常", e); } } //读取redis数据[检查记录是否已经存在] public LearningRecord readRecordCache(Long lessonId, Long sectionId){ try { // 1.读取Redis数据 String key = StringUtils.format(RECORD_KEY_TEMPLATE, lessonId); //learning:record:{lessonId} Object cacheData = redisTemplate.opsForHash().get(key, sectionId.toString()); //根据hash类型,根据key获取hashkey[sectionId]对应的一行数据value if (cacheData == null) { return null; } // 2.数据检查和转换 return JsonUtils.toBean(cacheData.toString(), LearningRecord.class); //转为json } catch (Exception e) { log.error("缓存读取异常", e); return null; } } //删除redis数据 public void cleanRecordCache(Long lessonId, Long sectionId){ // 删除数据---删除hashKey里面的一行数据[不能是redisTemplate.delete()这样是删除lessonId了,太大了] String key = StringUtils.format(RECORD_KEY_TEMPLATE, lessonId); redisTemplate.opsForHash().delete(key, sectionId.toString()); } @Data @NoArgsConstructor //redis的hash里面value的三个属性 private static class RecordCacheData{ private Long id; private Integer moment; private Boolean finished; public RecordCacheData(LearningRecord record) { this.id = record.getId(); this.moment = record.getMoment(); this.finished = record.getFinished(); } } @Data @NoArgsConstructor //延迟任务所需要的三个属性 private static class RecordTaskData{ private Long lessonId; private Long sectionId; private Integer moment; public RecordTaskData(LearningRecord record) { this.lessonId = record.getLessonId(); this.sectionId = record.getSectionId(); this.moment = record.getMoment(); } } }
4.2 改造提交学习记录
插入到redis,直接返回false这样后续4的更新学习记录就不会执行
4.3 测试 不是第一次学完,多次提交的情况: