700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > 第四十章:基于SpringBoot Quartz完成定时任务分布式多节点负载持久化

第四十章:基于SpringBoot Quartz完成定时任务分布式多节点负载持久化

时间:2021-07-16 10:51:26

相关推荐

第四十章:基于SpringBoot  Quartz完成定时任务分布式多节点负载持久化

在上一章【第三十九章:基于SpringBoot & Quartz完成定时任务分布式单节点持久化】中我们已经完成了任务的持久化,当我们创建一个任务时任务会被quartz定时任务框架自动持久化到数据库,我们采用的是SpringBoot项目托管的dataSource来完成的数据源提供,当然也可以使用quartz内部配置数据源方式,我们的标题既然是提到了定时任务的分布式多节点,那么怎么才算是多节点呢?当有节点故障或者手动停止运行后是否可以自动漂移任务到可用的分布式节点呢?

本章目标

完成定时任务分布式多节点配置,当单个节点关闭时其他节点自动接管定时任务。创建任务时传递自定义参数,方便任务处理后续业务逻辑。

构建项目

注意:我们本章项目需要结合上一章共同完成,有一点要注意的是任务在持久化到数据库内时会保存任务的全路径,如:com.hengyu.chapter39.timers.GoodStockCheckTimerquartz在运行任务时会根据任务全路径去执行,如果不一致则会提示找不到指定类,我们本章在创建项目时package需要跟上一章完全一致。

我们这里就不去直接创建新项目了,直接复制上一章项目的源码为新的项目命名为Chapter40

配置分布式

在上一章配置文件quartz.properties中我们其实已经为分布式做好了相关配置,下面我们就来看一下分布式相关的配置。

分布式相关配置:

1. org.quartz.scheduler.instanceId: 定时任务的实例编号,如果手动指定需要保证每个节点的唯一性,因为quartz不允许出现两个相同instanceId的节点,我们这里指定为Auto就可以了,我们把生成编号的任务交给quartz

2. org.quartz.jobStore.isClustered: 这个属性才是真正的开启了定时任务的分布式配置,当我们配置为truequartz框架就会调用ClusterManager来初始化分布式节点。

3. org.quartz.jobStore.clusterCheckinInterval:配置了分布式节点的检查时间间隔,单位:毫秒。

下面是quartz.properties配置文件配置信息:

#调度器实例名称org.quartz.scheduler.instanceName = quartzScheduler#调度器实例编号自动生成org.quartz.scheduler.instanceId = AUTO#持久化方式配置org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX#持久化方式配置数据驱动,MySQL数据库org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate#quartz相关数据表前缀名org.quartz.jobStore.tablePrefix = QRTZ_#开启分布式部署org.quartz.jobStore.isClustered = true#配置是否使用org.quartz.jobStore.useProperties = false#分布式节点有效性检查时间间隔,单位:毫秒org.quartz.jobStore.clusterCheckinInterval = 10000#线程池实现类org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool#执行最大并发线程数量org.quartz.threadPool.threadCount = 10#线程优先级org.quartz.threadPool.threadPriority = 5#配置为守护线程,设置后任务将不会执行#org.quartz.threadPool.makeThreadsDaemons=true#配置是否启动自动加载数据库内的定时任务,默认trueorg.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true复制代码

当我们启动任务节点时,会根据org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread属性配置进行是否自动加载任务,默认true自动加载数据库内的任务到节点。

测试分布式

上一章项目节点名称:quartz-cluster-node-first

本章项目节点名称:quartz-cluster-node-second

由于我们quartz-cluster-node-first的商品库存检查定时任务是每隔30秒执行一次,所以任务除非手动清除否则是不会被清空的,在运行项目测试之前需要将application.yml配置文件的端口号、项目名称修改下,保证quartz-cluster-node-secondquartz-cluster-node-first端口号不一致,可以同时运行,修改后为:

spring:application:name: quzrtz-cluster-node-secondserver:port: 8082复制代码

然后修改相应控制台输出,为了能够区分任务执行者是具体的节点。

Chapter40Application启动类修改日志输出:logger.info("【【【【【【定时任务分布式节点 - quartz-cluster-node-second 已启动】】】】】】");GoodAddTimer商品添加任务类修改日志输出:logger.info("分布式节点quartz-cluster-node-second,商品添加完成后执行任务,任务时间:{}",new Date());GoodStockCheckTimer商品库存检查任务类修改日志输出:logger.info("分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:{}",new Date());复制代码

下面我们启动本章项目,查看控制台输出内容,如下所示:

-11-12 10:28:39.969 INFO 11048 --- [ main] c.hengyu.chapter39.Chapter40Application : 【【【【【【定时任务分布式节点 - quartz-cluster-node-second 已启动】】】】】】-11-12 10:28:41.930 INFO 11048 --- [lerFactoryBean]] o.s.s.quartz.SchedulerFactoryBean : Starting Quartz Scheduler now, after delay of 2 seconds-11-12 10:28:41.959 INFO 11048 --- [lerFactoryBean]] org.quartz.core.QuartzScheduler: Scheduler schedulerFactoryBean_$_yuqiyu1510453719308 started.-11-12 10:28:51.963 INFO 11048 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore: ClusterManager: detected 1 failed or restarted instances.-11-12 10:28:51.963 INFO 11048 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore: ClusterManager: Scanning for instance "yuqiyu1510450938654"'s failed in-progress jobs.-11-12 10:28:51.967 INFO 11048 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore: ClusterManager: ......Freed 1 acquired trigger(s).-11-12 10:29:00.024 INFO 11048 --- [ryBean_Worker-1] c.h.c.timers.GoodStockCheckTimer : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 10:29:00 CST 复制代码

可以看到项目启动完成后自动分配的instanceIdyuqiyu1510450938654,生成的规则是当前用户的名称+时间戳。然后ClusterManager分布式管理者自动介入进行扫描是否存在匹配的触发器任务,如果存在则会自动执行任务逻辑,而商品库存检查定时任务确实由quartz-cluster-node-second进行输出的。

测试任务自动漂移

下面我们也需要把quartz-cluster-node-first的输出进行修改,如下所示:

Chapter39Application启动类修改日志输出:logger.info("【【【【【【定时任务分布式节点 - quartz-cluster-node-first 已启动】】】】】】");GoodAddTimer商品添加任务类修改日志输出:logger.info("分布式节点quartz-cluster-node-first,商品添加完成后执行任务,任务时间:{}",new Date());GoodStockCheckTimer商品库存检查任务类修改日志输出:logger.info("分布式节点quartz-cluster-node-first,执行库存检查定时任务,执行时间:{}",new Date());复制代码

接下来我们启动quartz-cluster-node-first,并查看控制台的输出内容:

-11-12 10:34:09.750 INFO 192 --- [ main] c.hengyu.chapter39.Chapter39Application : 【【【【【【定时任务分布式节点 - quartz-cluster-node-first 已启动】】】】】】-11-12 10:34:11.690 INFO 192 --- [lerFactoryBean]] o.s.s.quartz.SchedulerFactoryBean : Starting Quartz Scheduler now, after delay of 2 seconds-11-12 10:34:11.714 INFO 192 --- [lerFactoryBean]] org.quartz.core.QuartzScheduler: Scheduler schedulerFactoryBean_$_yuqiyu1510454049066 started.复制代码

项目启动完成后,定时节点并没有实例化ClusterManager来完成分布式节点的初始化,因为quartz检测到有其他的节点正在处理任务,这样也是保证了任务执行的唯一性。

关闭quartz-cluster-node-second

我们关闭quartz-cluster-node-second运行的项目,预计的目的可以达到quartz-cluster-node-first会自动接管数据库内的任务,完成任务执行的自动漂移,我们来查看quartz-cluster-node-first的控制台输出内容:

-11-12 10:34:09.750 INFO 192 --- [ main] c.hengyu.chapter39.Chapter39Application : 【【【【【【定时任务分布式节点 - quartz-cluster-node-first 已启动】】】】】】-11-12 10:34:11.690 INFO 192 --- [lerFactoryBean]] o.s.s.quartz.SchedulerFactoryBean : Starting Quartz Scheduler now, after delay of 2 seconds-11-12 10:34:11.714 INFO 192 --- [lerFactoryBean]] org.quartz.core.QuartzScheduler: Scheduler schedulerFactoryBean_$_yuqiyu1510454049066 started.-11-12 10:41:11.793 INFO 192 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore: ClusterManager: detected 1 failed or restarted instances.-11-12 10:41:11.793 INFO 192 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore: ClusterManager: Scanning for instance "yuqiyu1510453719308"'s failed in-progress jobs.-11-12 10:41:11.797 INFO 192 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore: ClusterManager: ......Freed 1 acquired trigger(s).-11-12 10:41:11.834 INFO 192 --- [ryBean_Worker-1] c.h.c.timers.GoodStockCheckTimer : 分布式节点quartz-cluster-node-first,执行库存检查定时任务,执行时间:Sun Nov 12 10:41:11 CST 复制代码

控制台已经输出了持久的定时任务,输出节点是quartz-cluster-node-first,跟我们预计的一样,节点quartz-cluster-node-first完成了自动接管quartz-cluster-node-second的工作,而这个过程肯定有一段时间间隔,而这个间隔可以修改quartz.properties配置文件内的属性org.quartz.jobStore.clusterCheckinInterval进行调节。

关闭quartz-cluster-node-first

我们同样可以测试启动任务节点quartz-cluster-node-second后,再关闭quartz-cluster-node-first任务节点,查看quartz-cluster-node-second控制台的输出内容:

-11-12 10:53:31.010 INFO 3268 --- [ main] c.hengyu.chapter39.Chapter40Application : 【【【【【【定时任务分布式节点 - quartz-cluster-node-second 已启动】】】】】】-11-12 10:53:32.967 INFO 3268 --- [lerFactoryBean]] o.s.s.quartz.SchedulerFactoryBean : Starting Quartz Scheduler now, after delay of 2 seconds-11-12 10:53:32.992 INFO 3268 --- [lerFactoryBean]] org.quartz.core.QuartzScheduler: Scheduler schedulerFactoryBean_$_yuqiyu1510455210493 started.-11-12 10:53:52.999 INFO 3268 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore: ClusterManager: detected 1 failed or restarted instances.-11-12 10:53:52.999 INFO 3268 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore: ClusterManager: Scanning for instance "yuqiyu1510454049066"'s failed in-progress jobs.-11-12 10:53:53.003 INFO 3268 --- [_ClusterManager] o.s.s.quartz.LocalDataSourceJobStore: ClusterManager: ......Freed 1 acquired trigger(s).-11-12 10:54:00.020 INFO 3268 --- [ryBean_Worker-1] c.h.c.timers.GoodStockCheckTimer : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 10:54:00 CST 复制代码

得到的结果是同样可以完成任务的自动漂移。

如果两个节点同时启动,哪个节点先把节点信息注册到数据库就获得了优先执行权。

传递参数到任务

我们平时在使用任务时,如果是针对性比较强的业务逻辑,肯定需要特定的参数来完成业务逻辑的实现。

下面我们来模拟商品秒杀的场景,当我们添加商品后自动添加一个商品提前五分钟的秒杀提醒,为关注该商品的用户发送提醒消息。

我们在节点quartz-cluster-node-first中添加秒杀提醒任务,如下所示:

package com.hengyu.chapter39.timers;import org.quartz.JobDataMap;import org.quartz.JobExecutionContext;import org.quartz.JobExecutionException;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.scheduling.quartz.QuartzJobBean;/*** 商品秒杀提醒定时器* 为关注该秒杀商品的用户进行推送提醒* ========================** @author 恒宇少年* Created with IntelliJ IDEA.* Date:/11/12* Time:9:23* 码云:/jnyqy* ========================*/public class GoodSecKillRemindTimerextends QuartzJobBean{/*** logback*/private Logger logger = LoggerFactory.getLogger(GoodSecKillRemindTimer.class);/*** 任务指定逻辑* @param jobExecutionContext* @throws JobExecutionException*/@Overrideprotected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {//获取任务详情内的数据集合JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();//获取商品编号Long goodId = dataMap.getLong("goodId");logger.info("分布式节点quartz-cluster-node-first,开始处理秒杀商品:{},关注用户推送消息.",goodId);//.../}}复制代码

在秒杀提醒任务逻辑中,我们通过获取JobDetailJobDataMap集合来获取在创建任务的时候传递的参数集合,我们这里约定了goodId为商品的编号,在创建任务的时候传递到JobDataMap内,这样quartz在执行该任务的时候就会自动将参数传递到任务逻辑中,我们也就可以通过JobDataMap获取到对应的参数值。

设置秒杀提醒任务

我们找到节点项目quartz-cluster-node-first中的GoodInfoService,编写方法buildGoodSecKillRemindTimer设置秒杀提醒任务,如下所示:

/*** 构建商品秒杀提醒定时任务* 设置五分钟后执行* @throws Exception*/public void buildGoodSecKillRemindTimer(Long goodId) throws Exception{//任务名称String name = UUID.randomUUID().toString();//任务所属分组String group = GoodSecKillRemindTimer.class.getName();//秒杀开始时间long startTime = System.currentTimeMillis() + 1000 * 5 * 60;JobDetail jobDetail = JobBuilder.newJob(GoodSecKillRemindTimer.class).withIdentity(name,group).build();//设置任务传递商品编号参数jobDetail.getJobDataMap().put("goodId",goodId);//创建任务触发器Trigger trigger = TriggerBuilder.newTrigger().withIdentity(name,group).startAt(new Date(startTime)).build();//将触发器与任务绑定到调度器内scheduler.scheduleJob(jobDetail,trigger);}复制代码

我们模拟秒杀提醒时间是添加商品后的5分钟,我们通过调用jobDetail实例的getJobDataMap方法就可以获取该任务数据集合,直接调用put方法就可以进行设置指定key的值,该集合继承了StringKeyDirtyFlagMap并且实现了Serializable序列化,因为需要将数据序列化到数据库的qrtz_job_details表内。

修改保存商品方法,添加调用秒杀提醒任务:

/*** 保存商品基本信息* @param good 商品实例* @return*/public Long saveGood(GoodInfoEntity good) throws Exception{goodInfoRepository.save(good);//构建创建商品定时任务buildCreateGoodTimer();//构建商品库存定时任务buildGoodStockTimer();//构建商品描述提醒定时任务buildGoodSecKillRemindTimer(good.getId());return good.getId();}复制代码

添加测试商品

下面我们调用节点quartz-cluster-node-first的测试Chapter39ApplicationTests.addGood方法完成商品的添加,由于我们的quartz-cluster-node-second项目并没有停止,所以我们在quartz-cluster-node-second项目的控制台查看输出内容:

-11-12 11:45:00.008 INFO 11652 --- [ryBean_Worker-5] c.h.c.timers.GoodStockCheckTimer : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:45:00 CST -11-12 11:45:30.013 INFO 11652 --- [ryBean_Worker-6] c.h.c.timers.GoodStockCheckTimer : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:45:30 CST -11-12 11:45:48.230 INFO 11652 --- [ryBean_Worker-7] c.hengyu.chapter39.timers.GoodAddTimer : 分布式节点quartz-cluster-node-second,商品添加完成后执行任务,任务时间:Sun Nov 12 11:45:48 CST -11-12 11:46:00.008 INFO 11652 --- [ryBean_Worker-8] c.h.c.timers.GoodStockCheckTimer : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:46:00 CST -11-12 11:46:30.016 INFO 11652 --- [ryBean_Worker-9] c.h.c.timers.GoodStockCheckTimer : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:46:30 CST -11-12 11:47:00.011 INFO 11652 --- [yBean_Worker-10] c.h.c.timers.GoodStockCheckTimer : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:47:00 CST -11-12 11:47:30.028 INFO 11652 --- [ryBean_Worker-1] c.h.c.timers.GoodStockCheckTimer : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:47:30 CST -11-12 11:48:00.014 INFO 11652 --- [ryBean_Worker-2] c.h.c.timers.GoodStockCheckTimer : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:48:00 CST -11-12 11:48:30.013 INFO 11652 --- [ryBean_Worker-3] c.h.c.timers.GoodStockCheckTimer : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:48:30 CST -11-12 11:49:00.010 INFO 11652 --- [ryBean_Worker-4] c.h.c.timers.GoodStockCheckTimer : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:49:00 CST -11-12 11:49:30.028 INFO 11652 --- [ryBean_Worker-5] c.h.c.timers.GoodStockCheckTimer : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:49:30 CST -11-12 11:49:48.290 INFO 11652 --- [ryBean_Worker-6] c.h.c.timers.GoodSecKillRemindTimer: 分布式节点quartz-cluster-node-second,开始处理秒杀商品:15,关注用户推送消息.-11-12 11:50:00.008 INFO 11652 --- [ryBean_Worker-7] c.h.c.timers.GoodStockCheckTimer : 分布式节点quartz-cluster-node-second,执行库存检查定时任务,执行时间:Sun Nov 12 11:50:00 CST 复制代码

秒杀任务在添加完成商品后的五分钟开始执行的,而我们也正常的输出了传递过去的goodId商品编号的参数,而秒杀提醒任务执行一次后也被自动释放了。

总结

本章主要是结合上一章完成了分布式任务的讲解,完成了测试持久化的定时任务自动漂移,以及如何向定时任务传递参数。当然在实际的开发过程中,任务创建是需要进行封装的,目的也是为了减少一些冗余代码以及方面后期统一维护定时任务。

本章源码已经上传到码云:

SpringBoot配套源码地址:/hengboy/spr…

SpringCloud配套源码地址:/hengboy/spr…

SpringBoot相关系列文章请访问:目录:SpringBoot学习目录

QueryDSL相关系列文章请访问:QueryDSL通用查询框架学习目录

SpringDataJPA相关系列文章请访问:目录:SpringDataJPA学习目录

SpringBoot相关文章请访问:目录:SpringBoot学习目录,感谢阅读!

欢迎加入QQ技术交流群,共同进步。

QQ技术交流群

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。