700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > Spring集成quartz实现的定时任务调用

Spring集成quartz实现的定时任务调用

时间:2018-09-24 06:57:04

相关推荐

Spring集成quartz实现的定时任务调用

本示例使用Spring+quartz实现定时任务的调度,通过zookeeper+curator实现分布式锁,保证分布式任务串行运行,通过自定义注解实现任务的扫描及注册;

1.添加相关的maven依赖,不包括spring

<dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>2.8.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.8.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.8.0</version></dependency><dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.2.2</version></dependency>

2.定义任务扫描的相关注解:

/*** 调度任务注解*/@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface Scheduler {}/*** 调度任务是否为安全的并行任务,默认为true,不允许并行*/@Target(ElementType.FIELD)@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface ScheduleTaskConcurrent {/*** 任务名称* @return*/String value();}/*** 任务表达式*/@Target(ElementType.FIELD)@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface ScheduleTaskCronExpression {/*** 任务名称* @return*/String value();}/*** 任务方法*/@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface ScheduleTaskMethod {/*** 任务名称* @return*/String value();}/*** 调度任务是否为分布式安全的并行任务,默认为true,不允许并行*/@Target(ElementType.FIELD)@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface ScheduleTaskRemoteConcurrent {/*** 任务名称* @return*/String value();}/*** 任务是否执行*/@Target(ElementType.FIELD)@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface ScheduleTaskRunning {/*** 任务名称* @return*/String value();}

3.定义任务对象对应的实体bean:

/*** 任务对象*/public class TaskBean {private String name;private String cronExpression;private Boolean concurrent = true;private Boolean remoteConcurrent=true;private Boolean running = true;private Object targetObject;private String targetMethod;public TaskBean(String name, String cronExpression, Boolean running, Boolean concurrent, Boolean remoteConcurrent,Object targetObject, String targetMethod) {this.name = name;this.cronExpression = cronExpression;this.concurrent = concurrent;this.remoteConcurrent = remoteConcurrent;this.running = running;this.targetObject = targetObject;this.targetMethod = targetMethod;}public TaskBean(String cronExpression, Object targetObject, String targetMethod) {this(null, cronExpression, null, null, targetObject, targetMethod);}public TaskBean(String name, String cronExpression, Object targetObject, String targetMethod) {this(name, cronExpression, null, null, targetObject, targetMethod);}public TaskBean(String cronExpression, Boolean running, Boolean stateful, Object targetObject, String targetMethod) {this(null, cronExpression, running, stateful, targetObject, targetMethod);}public TaskBean(String name, String cronExpression, Boolean running, Boolean concurrent, Object targetObject, String targetMethod) {this.name = StringUtils.isEmpty(name) ? getDefaultName() : name;this.cronExpression = cronExpression;this.running = running != null ? running : this.running;this.concurrent = concurrent != null ? concurrent : this.concurrent;this.targetObject = targetObject;this.targetMethod = targetMethod;}public TaskBean(String name, String cronExpression, Boolean running, Boolean concurrent,Object targetObject, String targetMethod,Boolean remoteConcurrent) {this.name = StringUtils.isEmpty(name) ? getDefaultName() : name;this.cronExpression = cronExpression;this.running = running != null ? running : this.running;this.concurrent = concurrent != null ? concurrent : this.concurrent;this.targetObject = targetObject;this.targetMethod = targetMethod;this.remoteConcurrent = remoteConcurrent != null ? remoteConcurrent : this.remoteConcurrent;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getCronExpression() {return cronExpression;}public void setCronExpression(String cronExpression) {this.cronExpression = cronExpression;}public Boolean getConcurrent() {return concurrent;}public void setConcurrent(Boolean concurrent) {this.concurrent = concurrent;}public Boolean getRemoteConcurrent() {return remoteConcurrent;}public void setRemoteConcurrent(Boolean remoteConcurrent) {this.remoteConcurrent = remoteConcurrent;}public Boolean getRunning() {return running;}public void setRunning(Boolean running) {this.running = running;}public Object getTargetObject() {return targetObject;}public void setTargetObject(Object targetObject) {this.targetObject = targetObject;}public String getTargetMethod() {return targetMethod;}public void setTargetMethod(String targetMethod) {this.targetMethod = targetMethod;}private String getDefaultName() {return this.targetObject.getClass().getName();}}

4.定义任务初始化工厂类及其实现:

/*** 任务初始化工厂类*/public interface ScheduleFactory {public void init(List<TaskBean> taskBeanList) throws Exception;}@Component("quartzSchedulerFactory")public class QuartzScheduleFactory implements ScheduleFactory{@Autowiredprivate Scheduler quartzScheduler;public void init(List<TaskBean> taskBeanList) throws Exception {if(taskBeanList ==null || taskBeanList.size()<=0 ){return;}for(TaskBean taskBean : taskBeanList){if (taskBean != null) {if (taskBean.getRunning()) {QuartzSchedulerUtils.createScheduleJob(quartzScheduler, taskBean);}}}}}

5.zk实现的分布式锁:

zk配置信息的加载

public class LocalZookeeperPropertiesLoader{private static void putConfig(Properties localProperties, Map.Entry entry) {if (entry.getKey().equals(ZookeeperConfigConstants.ZOOKEEPER_ADDRESS)) {localProperties.put(ZookeeperConfigConstants.ZOOKEEPER_ADDRESS, entry.getValue());}}/*** 从默认配置文件 config.properties 中得到ZooKeeper的配置信息* 其中 ZooKeeper 的地址由 (系统属性 优先于 环境变量 优先于 配置文件)指定* @return 配置信息*/@Overridepublic Properties load() {Properties localProperties = new Properties();try {InputStream resourceAsStream = LocalZookeeperPropertiesLoader.class.getClassLoader().getResourceAsStream("config" + ".properties");localProperties.load(resourceAsStream);} catch (Exception e) {throw new RuntimeException(e);}System.getenv().entrySet().forEach(entry -> putConfig(localProperties, entry));System.getProperties().entrySet().forEach(entry -> putConfig(localProperties, entry));return localProperties;}@Overridepublic void destroy() {}

初始化curatorFrameWork

public class CRMSCuratorFrameworkFactory {private CRMSCuratorFrameworkFactory() {}public static CuratorFramework createCuratorFramework(CuratorProperties configuration){return createCuratorFramework(configuration, null);}public static CuratorFramework createCuratorFramework(CuratorProperties curatorProperties, TreeCacheListener listener, String... paths){CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(curatorProperties.getConnectString()).connectionTimeoutMs(curatorProperties.getConnectionTimeoutMs()).retryPolicy(curatorProperties.getRetryPolicy()).sessionTimeoutMs(curatorProperties.getSessionTimeoutMs()).build();curatorFramework.start();if(paths.length > 0 && listener != null){for(String path :paths){try {if(curatorFramework.checkExists().forPath(path)==null){curatorFramework.create().creatingParentsIfNeeded().forPath(path);}TreeCache watcher =TreeCache.newBuilder(curatorFramework,path).build();watcher.getListenable().addListener(listener);watcher.start();}catch (Exception e){throw new RuntimeException(e);}}}return curatorFramework;}}

互斥锁获取后的回调

@FunctionalInterfacepublic interface MutexLockCallable {void call();}

分布式锁的实现

//zk实现的分布式互斥锁public class ZookeeperMetuxLock {private static Logger log = LoggerFactory.getLogger(ZookeeperMetuxLock.class);/*** 分布式互斥锁* @param lockName 锁对应的zk节点的名称* @param time阻塞时间* @param unit阻塞时间单位* @param callback 获得锁需要执行的内容*/public static boolean execute(String lockName,long time,TimeUnit unit, MetuxLockCallble callback){boolean acquired = false;CuratorProperties curatorProperties = CuratorPropertiesBuilder.getInstance().build(new LocalZookeeperPropertiesLoader().load());try (CuratorFramework curatorFramework = CRMSCuratorFrameworkFactory.createCuratorFramework(curatorProperties)) {//互斥分布式锁InterProcessMutex mutexLock = new InterProcessMutex(curatorFramework, lockName);if (mutexLock.acquire(time, unit)) {try {callback.call();} finally {mutexLock.release();acquired = true;}}} catch (Exception ex) {log.error("MutexLock lock error! message:" + ex.getMessage(), ex);} finally {return acquired;}}/*** 分布式互斥锁** @param lockName 锁对应的zookeeper节点名字* @param callback 获取锁后执行的内容* @return 是否成功获取锁*/public static boolean execute(String lockName, MetuxLockCallble callback) {return execute(lockName, -1, null, callback);}}

6.quartz的job的实现:

public class QuartzJob implements Job {private static Logger log = LoggerFactory.getLogger(QuartzJob .class);private static final String MUTEX_PATH_ON_ZOOKEEPER_PREFIX = "/scheduler_lock/";public void execute(JobExecutionContext context) throws JobExecutionException {final TaskFactoryBean taskFactoryBean = (TaskFactoryBean) context.getMergedJobDataMap().get(QuartzSchedulerUtils.SCHEDULEFACTORYBEAN);if (taskFactoryBean.getRemoteConcurrent()) {String lockName = MUTEX_PATH_ON_ZOOKEEPER_PREFIX + taskFactoryBean.getName();ZookeeperMutexLock.execute(lockName, 0, TimeUnit.MICROSECONDS, () -> {QuartzJob .executeTask(taskFactoryBean);});} else {executeTask(taskFactoryBean);}}private static void executeTask(TaskFactoryBean taskFactoryBean) {try {Object targetObject = taskFactoryBean.getTargetObject();String targetMethod = taskFactoryBean.getTargetMethod();Method method = targetObject.getClass().getMethod(targetMethod, new Class[]{});if (method != null) {method.invoke(targetObject, new Object[]{});} else {log.error("task " + taskFactoryBean.getName() + " execute error! message: execute method is null");}} catch (Exception ex) {log.error("task " + taskFactoryBean.getName() + " execute error! message:" + ex.getMessage(), ex);}}

7.创建job的工具类:

public class QuartzSchedulerUtils {public static final String SCHEDULEFACTORYBEAN = "scheduleFactoryBean";public static void createScheduleJob(Scheduler scheduler, TaskBean taskBean)throws SchedulerException {//同步或者异步Class<? extends Job> jobClass = QuartzJob.class;//构建job信息String jobName = taskBean.getName();String jobGroupName = jobName;JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName,jobGroupName).build();//放入参数,运行时的方法可以获取jobDetail.getJobDataMap().put(SCHEDULEFACTORYBEAN,taskBean);//表达式调度构造器CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(taskBean.getCronExpression());//按新的表达式构造一个新的triggerCronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(jobName,jobGroupName).withSchedule(scheduleBuilder).build();scheduler.scheduleJob(jobDetail,cronTrigger);}}

8.定义调度管理的入口类:

@Component("schedulerManager")@Lazy(false)public class SchedulerManager implements ApplicationContextAware {private static Logger log = LoggerFactory.getLogger(SchedulerManager.class);private List<TaskFactoryBean> taskFactoryBeen = new ArrayList<TaskFactoryBean>();private SchedulerFactory schedulerFactory;@PostConstructpublic void init() throws Exception {schedulerFactory.init(taskFactoryBeen);}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {initTaskFactoryBeen(applicationContext);schedulerFactory = (SchedulerFactory) applicationContext.getBean("quartzSchedulerFactory");}private void initTaskFactoryBeen(ApplicationContext applicationContext) {Map<String, Object> taskBeanMap = applicationContext.getBeansWithAnnotation(Scheduler.class);if (taskBeanMap != null && taskBeanMap.size() > 0) {for (String beanName : taskBeanMap.keySet()) {Object taskBean = taskBeanMap.get(beanName);try {List<TaskFactoryBean> currentTaskFactoryBeen = buildTaskFactoryBean(taskBean);if (currentTaskFactoryBeen != null && currentTaskFactoryBeen.size() > 0) {taskFactoryBeen.addAll(currentTaskFactoryBeen);}} catch (Exception ex) {log.error("Initializes the scheduling bean " + beanName + " failure ! message:" + ex.getMessage(), ex);throw new RuntimeException(ex);}}}}private List<TaskFactoryBean> buildTaskFactoryBean(Object taskBean) throws Exception {Object targetBean = SpringTargetBeanUtils.getTarget(taskBean);Map<String, String> cronMap = new HashMap<String, String>();Map<String, Boolean> runningMap = new HashMap<String, Boolean>();Map<String, Boolean> concurrentMap = new HashMap<String, Boolean>();Map<String, Boolean> remoteConcurrentMap = new HashMap<String, Boolean>();Field[] fields = targetBean.getClass().getDeclaredFields();for (Field field : fields) {field.setAccessible(true);if (field.isAnnotationPresent(ScheduleTaskCronExpression.class)) {ScheduleTaskCronExpression scheduleTaskCronExpression = field.getAnnotation(ScheduleTaskCronExpression.class);String taskName = scheduleTaskCronExpression.value();String cronExpression = (String) field.get(targetBean);cronMap.put(taskName, cronExpression);} else if (field.isAnnotationPresent(ScheduleTaskRunning.class)) {ScheduleTaskRunning scheduleTaskRunning = field.getAnnotation(ScheduleTaskRunning.class);String taskName = scheduleTaskRunning.value();Boolean running = (Boolean) field.get(targetBean);runningMap.put(taskName, running);} else if (field.isAnnotationPresent(ScheduleTaskConcurrent.class)) {ScheduleTaskConcurrent scheduleTaskConcurrent = field.getAnnotation(ScheduleTaskConcurrent.class);String taskName = scheduleTaskConcurrent.value();Boolean concurrent = (Boolean) field.get(targetBean);concurrentMap.put(taskName, concurrent);}else if (field.isAnnotationPresent(ScheduleTaskRemoteConcurrent.class)) {ScheduleTaskRemoteConcurrent scheduleTaskRemoteConcurrent = field.getAnnotation(ScheduleTaskRemoteConcurrent.class);String taskName = scheduleTaskRemoteConcurrent.value();Boolean concurrent = (Boolean) field.get(targetBean);remoteConcurrentMap.put(taskName, concurrent);}}List<TaskFactoryBean> currentTaskFactoryBeen = new ArrayList<TaskFactoryBean>();Method[] methods = targetBean.getClass().getDeclaredMethods();for (Method method : methods) {if (method.isAnnotationPresent(ScheduleTaskMethod.class)) {ScheduleTaskMethod scheduleTaskMethod = method.getAnnotation(ScheduleTaskMethod.class);String taskName = scheduleTaskMethod.value();String methodName = method.getName();String cronExpression = cronMap.get(taskName);Boolean running = runningMap.get(taskName);Boolean concurrent = concurrentMap.get(taskName);Boolean remoteConcurrent = remoteConcurrentMap.get(taskName);TaskFactoryBean taskFactoryBean = new TaskFactoryBean(taskName, cronExpression, running, concurrent, taskBean, methodName, remoteConcurrent);currentTaskFactoryBeen.add(taskFactoryBean);}}return currentTaskFactoryBeen;}public List<TaskFactoryBean> getTaskFactoryBeen() {return taskFactoryBeen;}

9.相关spring及zk的配置:

<bean id="quartzScheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean" lazy-init="false"><!-- quartz 延时加载 --><property name="startupDelay" value="1"/></bean>

config.properies

#zookeeper addressszk_address=ip:2181#zk propertieszookeeper.config.sessionTimeoutMs=60000zookeeper.config.connectionTimeoutMs=3000zookeeper.config.retry.baseSleepTimeMs=100zookeeper.config.retry.maxRetries=3

10. 创建test类:

@Component@Schedulerpublic class TestTask {private static final String ADDUSERTASK = "TestTask";@ScheduleTaskCronExpression(ADDUSERTASK)private String taskEx = "* * * * * ? *";@ScheduleTaskRunning(ADDUSERTASK)private Boolean taskRunning = true;@ScheduleTaskRemoteConcurrent(ADDUSERTASK)private Boolean remoteCon = true;@ScheduleTaskMethod(ADDUSERTASK)public void task() {System.out.println("this is the task");}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。