您好,欢迎访问一九零五行业门户网

SpringBoot定时任务功能怎么实现

一 背景项目中需要一个可以动态新增定时定时任务的功能,现在项目中使用的是xxl-job定时任务调度系统,但是经过一番对xxl-job功能的了解,发现xxl-job对项目动态新增定时任务,动态删除定时任务的支持并不是那么好,所以需要自己手动实现一个定时任务的功能
二 动态定时任务调度1 技术选择
timer or scheduledexecutorservice
这两个都能实现定时任务调度,先看下timer的定时任务调度
public class mytimertask extends timertask { private string name; public mytimertask(string name){ this.name = name; } public string getname() { return name; } public void setname(string name) { this.name = name; } @override public void run() { //task calendar instance = calendar.getinstance(); system.out.println(new simpledateformat("yyyy-mm-dd hh:mm:ss").format(instance.gettime())); }}timer timer = new timer();mytimertask timertask = new mytimertask("no.1");//首次执行,在当前时间的1秒以后,之后每隔两秒钟执行一次timer.schedule(timertask,1000l,2000l);
在看下scheduledthreadpoolexecutor的实现
//org.apache.commons.lang3.concurrent.basicthreadfactoryscheduledexecutorservice executorservice = new scheduledthreadpoolexecutor(1, new basicthreadfactory.builder().namingpattern("example-schedule-pool-%d").daemon(true).build());executorservice.scheduleatfixedrate(new runnable() { @override public void run() { //do something }},initialdelay,period, timeunit.hours);
两个都能实现定时任务,那他们的区别呢,使用阿里p3c会给出建议和区别
多线程并行处理定时任务时,timer运行多个timetask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用scheduledexecutorservice则没有这个问题。
从建议上来看,是一定要选择scheduledexecutorservice了,我们看看源码看看为什么timer出现问题会终止执行
/** * the timer thread. */private final timerthread thread = new timerthread(queue);public timer() { this("timer-" + serialnumber());}public timer(string name) { thread.setname(name); thread.start();}
新建对象时,我们看到开启了一个线程,那么这个线程在做什么呢?一起看看
class timerthread extends thread { boolean newtasksmaybescheduled = true; /** * 每一件一个任务都是一个quene */ private taskqueue queue; timerthread(taskqueue queue) { this.queue = queue; } public void run() { try { mainloop(); } finally { // someone killed this thread, behave as if timer cancelled synchronized(queue) { newtasksmaybescheduled = false; queue.clear(); // 清除所有任务信息 } } } /** * the main timer loop. (see class comment.) */ private void mainloop() { while (true) { try { timertask task; boolean taskfired; synchronized(queue) { // wait for queue to become non-empty while (queue.isempty() && newtasksmaybescheduled) queue.wait(); if (queue.isempty()) break; // queue is empty and will forever remain; die // queue nonempty; look at first evt and do the right thing long currenttime, executiontime; task = queue.getmin(); synchronized(task.lock) { if (task.state == timertask.cancelled) { queue.removemin(); continue; // no action required, poll queue again } currenttime = system.currenttimemillis(); executiontime = task.nextexecutiontime; if (taskfired = (executiontime<=currenttime)) { if (task.period == 0) { // non-repeating, remove queue.removemin(); task.state = timertask.executed; } else { // repeating task, reschedule queue.reschedulemin( task.period<0 ? currenttime - task.period : executiontime + task.period); } } } if (!taskfired) // task hasn't yet fired; wait queue.wait(executiontime - currenttime); } if (taskfired) // task fired; run it, holding no locks task.run(); } catch(interruptedexception e) { } } }}
我们看到,执行了 mainloop(),里面是 while (true)方法无限循环,获取程序中任务对象中的时间和当前时间比对,相同就执行,但是一旦报错,就会进入finally中清除掉所有任务信息。
这时候我们已经找到了答案,timer是在被实例化后,启动一个线程,不间断的循环匹配,来执行任务,他是单线程的,一旦报错,线程就终止了,所以不会执行后续的任务,而scheduledthreadpoolexecutor是多线程执行的,就算其中有一个任务报错了,并不影响其他线程的执行。
2 使用scheduledthreadpoolexecutor
从上面看,使用scheduledthreadpoolexecutor还是比较简单的,但是我们要实现的更优雅一些,所以选择 taskscheduler来实现
@componentpublic class crontaskregistrar implements disposablebean { private final map<runnable, scheduledtask> scheduledtasks = new concurrenthashmap<>(16); @autowired private taskscheduler taskscheduler; public taskscheduler getscheduler() { return this.taskscheduler; } public void addcrontask(runnable task, string cronexpression) { addcrontask(new crontask(task, cronexpression)); } private void addcrontask(crontask crontask) { if (crontask != null) { runnable task = crontask.getrunnable(); if (this.scheduledtasks.containskey(task)) { removecrontask(task); } this.scheduledtasks.put(task, schedulecrontask(crontask)); } } public void removecrontask(runnable task) { set<runnable> runnables = this.scheduledtasks.keyset(); iterator it1 = runnables.iterator(); while (it1.hasnext()) { schedulingrunnable schedulingrunnable = (schedulingrunnable) it1.next(); long taskid = schedulingrunnable.gettaskid(); schedulingrunnable cancelrunnable = (schedulingrunnable) task; if (taskid.equals(cancelrunnable.gettaskid())) { scheduledtask scheduledtask = this.scheduledtasks.remove(schedulingrunnable); if (scheduledtask != null){ scheduledtask.cancel(); } } } } public scheduledtask schedulecrontask(crontask crontask) { scheduledtask scheduledtask = new scheduledtask(); scheduledtask.future = this.taskscheduler.schedule(crontask.getrunnable(), crontask.gettrigger()); return scheduledtask; } @override public void destroy() throws exception { for (scheduledtask task : this.scheduledtasks.values()) { task.cancel(); } this.scheduledtasks.clear(); }}
taskscheduler是本次功能实现的核心类,但是他是一个接口
public interface taskscheduler { /** * schedule the given {@link runnable}, invoking it whenever the trigger * indicates a next execution time. * <p>execution will end once the scheduler shuts down or the returned * {@link scheduledfuture} gets cancelled. * @param task the runnable to execute whenever the trigger fires * @param trigger an implementation of the {@link trigger} interface, * e.g. a {@link org.springframework.scheduling.support.crontrigger} object * wrapping a cron expression * @return a {@link scheduledfuture} representing pending completion of the task, * or {@code null} if the given trigger object never fires (i.e. returns * {@code null} from {@link trigger#nextexecutiontime}) * @throws org.springframework.core.task.taskrejectedexception if the given task was not accepted * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress) * @see org.springframework.scheduling.support.crontrigger */ @nullable scheduledfuture<?> schedule(runnable task, trigger trigger);
前面的代码可以看到,我们在类中注入了这个类,但是他是接口,我们怎么知道是那个实现类呢,以往出现这种情况要在类上面加@primany或者@quality来执行实现的类,但是我们看到我的注入上并没有标记,因为是通过另一种方式实现的
@configurationpublic class schedulingconfig { @bean public taskscheduler taskscheduler() { threadpooltaskscheduler taskscheduler = new threadpooltaskscheduler(); // 定时任务执行线程池核心线程数 taskscheduler.setpoolsize(4); taskscheduler.setremoveoncancelpolicy(true); taskscheduler.setthreadnameprefix("taskschedulerthreadpool-"); return taskscheduler; }}
在spring初始化时就注册了bean taskscheduler,而我们可以看到他的实现是threadpooltaskscheduler,在网上的资料中有人说threadpooltaskscheduler是taskscheduler的默认实现类,其实不是,还是需要我们去指定,而这种方式,当我们想替换实现时,只需要修改配置类就行了,很灵活。
而为什么说他是更优雅的实现方式呢,因为他的核心也是通过scheduledthreadpoolexecutor来实现的
public scheduledexecutorservice getscheduledexecutor() throws illegalstateexception { assert.state(this.scheduledexecutor != null, "threadpooltaskscheduler not initialized"); return this.scheduledexecutor;}
三 多节点任务执行问题这次的实现过程中,我并没有选择xxl-job来进行实现,而是采用了taskscheduler来实现,这也产生了一个问题,xxl-job是分布式的程序调度系统,当想要执行定时任务的应用使用xxl-job时,无论应用程序中部署多少个节点,xxl-job只会选择其中一个节点作为定时任务执行的节点,从而不会产生定时任务在不同节点上同时执行,导致重复执行问题,而使用taskscheduler来实现,就要考虑多节点重复执行问题。当然既然有问题,就有解决方案
&middot; 方案一 将定时任务功能拆出来单独部署,且只部署一个节点 &middot; 方案二 使用redis setnx的形式,保证同一时间只有一个任务在执行
我选择的是方案二来执行,当然还有一些方式也能保证不重复执行,这里就不多说了,一下是我的实现
public void executetask(long taskid) { if (!redisservice.setifabsent(string.valueof(taskid),"1",2l, timeunit.seconds)) { log.info("已有执行中定时发送短信任务,本次不执行!"); return; }
以上就是springboot定时任务功能怎么实现的详细内容。
其它类似信息

推荐信息