您好,欢迎访问一九零五行业门户网

SpringBoot Schedule调度任务的动态管理方法是什么

前言定时任务动态管理分为两种方式:
方式一:web前台配置trigger触发器(关联cron)、threadpooltaskscheduler类创建scheduler方式下进行schedule调度任务的动态管理
方式二:基于已创建的schedule调度任务的动态管理,即以组件类 @scheduled注解声明schedule调度,在启动程序前一次性初始化,如:
@componentpublic class testtask { private datetimeformatter df = datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss"); @scheduled(cron = "0/2 * * * * ?") public void robreceiveexpiretask() { system.out.println(df.format(localdatetime.now()) + "测试测试"); }}
缺陷:目前无法在运行期间增加schedule以及stop、start、reset等管理。
一、架构流程图
二、代码实现流程架构为springboot + spring + mybatis-plus
1.引入库pom.xml
<?xml version="1.0" encoding="utf-8"?><project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactid>merak-hyper-automation-boot</artifactid> <groupid>com.merak.automation</groupid> <version>1.0.0</version> </parent> <modelversion>4.0.0</modelversion> <artifactid>automation-quartz</artifactid> <packaging>jar</packaging> <repositories> <repository> <id>aliyun</id> <name>aliyun repository</name> <url>http://maven.aliyun.com/nexus/content/groups/public</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <dependencies> <!-- spring框架基本的核心工具 --> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-context-support</artifactid> </dependency> <!-- springweb模块 --> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-web</artifactid> </dependency> <!-- mysql --> <dependency> <groupid>mysql</groupid> <artifactid>mysql-connector-java</artifactid> </dependency> <!-- druid数据连接池 --> <dependency> <groupid>com.alibaba</groupid> <artifactid>druid-spring-boot-starter</artifactid> </dependency> <dependency> <groupid>org.apache.commons</groupid> <artifactid>commons-lang3</artifactid> </dependency> <dependency> <groupid>jakarta.validation</groupid> <artifactid>jakarta.validation-api</artifactid> </dependency> <dependency> <groupid>com.alibaba</groupid> <artifactid>fastjson</artifactid> </dependency> <dependency> <groupid>commons-io</groupid> <artifactid>commons-io</artifactid> </dependency> <dependency> <groupid>com.fasterxml.jackson.core</groupid> <artifactid>jackson-annotations</artifactid> </dependency> <!--引入quartz定时框架--> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-quartz</artifactid> <version>2.2.5.release</version> </dependency> </dependencies> <build> <plugins> <!-- 打包跳过测试 --> <plugin> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-maven-plugin</artifactid> </plugin> </plugins> </build></project>
resources目录下文件/application.yml:
spring:
profiles:
active: dev
resources目录下文件/application-dev.yml:
server:
port: 12105
servlet:
context-path: /automation-quartz
management:
endpoints:
web:
exposure:
include: '*'
# spring配置
spring:
resources:
static-locations: classpath:/static/,classpath:/templates/
mvc:
throw-exception-if-no-handler-found: true
static-path-pattern: /**
application:
name: automation-workflow
main:
allow-bean-definition-overriding: true
# 文件上传
servlet:
multipart:
# 单个文件大小
max-file-size: 2000mb
# 设置总上传的文件大小
max-request-size: 4000mb
#json 时间戳统一转换
jackson:
date-format: yyyy-mm-dd hh:mm:ss
time-zone: gmt+8
aop:
proxy-target-class: true
autoconfigure:
exclude: com.alibaba.druid.spring.boot.autoconfigure.druiddatasourceautoconfigure
datasource:
dynamic:
druid:
# 全局druid参数,绝大部分值和默认保持一致。(现已支持的参数如下,不清楚含义不要乱设置)
# 连接池的配置信息
# 初始化大小,最小,最大
initial-size: 1
min-idle: 1
maxactive: 20
# 配置获取连接等待超时的时间
maxwait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timebetweenevictionrunsmillis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minevictableidletimemillis: 300000
validationquery: select 1
testwhileidle: true
testonborrow: false
testonreturn: false
# 打开pscache,并且指定每个连接上pscache的大小
poolpreparedstatements: true
maxpoolpreparedstatementperconnectionsize: 20
# 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
filters: stat,wall,slf4j
# 通过connectproperties属性来打开mergesql功能;慢sql记录
connectionproperties: druid.stat.mergesql\=true;druid.stat.slowsqlmillis\=5000
datasource:
master:
url: jdbc:mysql://127.0.0.1:3308/merak_dev?characterencoding=utf-8&useunicode=true&usessl=false
username: root
password: root
driver-class-name: com.mysql.jdbc.driver
#mybatis plus 设置
mybatis-plus:
mapper-locations: classpath*:com/merak/hyper/automation/persist/**/xml/*mapper.xml
global-config:
# 关闭mp3.0自带的banner
banner: false
db-config:
id-type: id_worker_str
# 默认数据库表下划线命名
table-underline: true
configuration:
log-impl: org.apache.ibatis.logging.slf4j.slf4jimpl
# log-impl: org.apache.ibatis.logging.stdout.stdoutimpl
logging:
level:
com.merar.hyper: debug
com.merak.hyper.automation.persist.**.mapper: debug
org.springframework: warn
2.代码流程启动merakquartzapplication类
package com.merak.hyper.automation;import org.mybatis.spring.annotation.mapperscan;import org.slf4j.logger;import org.slf4j.loggerfactory;import org.springframework.boot.springapplication;import org.springframework.boot.autoconfigure.springbootapplication;import org.springframework.boot.autoconfigure.security.servlet.securityautoconfiguration;import org.springframework.context.annotation.bean;import org.springframework.scheduling.annotation.enableasync;import org.springframework.scheduling.annotation.enablescheduling;import org.springframework.scheduling.concurrent.threadpooltaskexecutor;import org.springframework.scheduling.concurrent.threadpooltaskscheduler;import java.util.concurrent.executor;import java.util.concurrent.threadpoolexecutor;/** * @author chenjun * @version 1.0 * @classname: merakquartzapplication * @description: 工单任务调度 * @date 2022/9/22 10:30 */@enablescheduling@enableasync@mapperscan(basepackages = {"com.merak.hyper.automation.persist.**.mapper"})@springbootapplication(scanbasepackages = {"com.merak.hyper.automation.**"}, exclude = {securityautoconfiguration.class})public class merakquartzapplication { public static final logger log = loggerfactory.getlogger(merakquartzapplication.class); public static void main(string[] args) { springapplication.run(merakquartzapplication.class, args); } private int taskschedulercorepoolsize = 15; private int awaitterminationseconds = 60; private string threadnameprefix = "taskexecutor-"; /** * @description: 实例化threadpooltaskscheduler对象,用于创建scheduledfuture<?> scheduledfuture */ @bean public threadpooltaskscheduler threadpooltaskscheduler() { threadpooltaskscheduler taskscheduler = new threadpooltaskscheduler(); taskscheduler.setpoolsize(taskschedulercorepoolsize); taskscheduler.setthreadnameprefix(threadnameprefix); taskscheduler.setwaitfortaskstocompleteonshutdown(false); taskscheduler.setawaitterminationseconds(awaitterminationseconds); /**需要实例化线程*/ taskscheduler.initialize();// isinitialized = true; log.info("初始化threadpooltaskscheduler threadnameprefix=" + threadnameprefix + ",poolsize=" + taskschedulercorepoolsize + ",awaitterminationseconds=" + awaitterminationseconds); return taskscheduler; } /** * @description: 实例化threadpooltaskexecutor对象,管理asynctask启动的线程,应用类为 scheduledhelper */ @bean("asynctaskexecutor") public executor taskexecutor() { threadpooltaskexecutor taskexecutor = new threadpooltaskexecutor(); taskexecutor.setcorepoolsize(5); taskexecutor.setmaxpoolsize(50); taskexecutor.setqueuecapacity(200); taskexecutor.setkeepaliveseconds(60); taskexecutor.setthreadnameprefix("asynctaskexecutor-"); taskexecutor.setwaitfortaskstocompleteonshutdown(true); taskexecutor.setawaitterminationseconds(60); //修改拒绝策略为使用当前线程执行 taskexecutor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy()); //初始化线程池 taskexecutor.initialize(); return taskexecutor; }}
一、启动时项目启动时,加载任务关联的触发器,并全量执行流程。
initlinerunner类:
package com.merak.hyper.automation.scheduling;import com.baomidou.mybatisplus.core.conditions.query.querywrapper;import com.merak.hyper.automation.persist.entity.autotriggerinfo;import com.merak.hyper.automation.persist.entity.busworkflow;import com.merak.hyper.automation.persist.service.iautotriggerinfoservice;import com.merak.hyper.automation.persist.service.ibusworkflowservice;import com.merak.hyper.automation.util.commonutil;import com.merak.hyper.automation.util.scheduleutil;import org.slf4j.logger;import org.slf4j.loggerfactory;import org.springframework.beans.factory.annotation.autowired;import org.springframework.boot.commandlinerunner;import org.springframework.core.annotation.order;import org.springframework.stereotype.component;import java.time.localdatetime;import java.time.format.datetimeformatter;import java.util.iterator;import java.util.list;import java.util.map;/** * 项目启动时,加载数字员工关联的触发器,并全量执行 * @date: 2020/12/25:16:00 **/@component@order(1)public class initlinerunner implements commandlinerunner { public static final logger log = loggerfactory.getlogger(initlinerunner.class); private datetimeformatter df = datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss"); @autowired private taskservice taskservice; @autowired private iautotriggerinfoservice triggerinfoservice; @autowired private ibusworkflowservice workflowservice; @override public void run(string... args) { log.info("项目启动:加载数字员工关联的触发器信息并全量执行," + df.format(localdatetime.now())); querywrapper<busworkflow> wrapper = new querywrapper<>(); wrapper.eq("wf_type", "3");//3:云托管 wrapper.eq("wf_state", "1"); list<busworkflow> busworkflows = workflowservice.list(wrapper); list<autotriggerinfo> triggerinfos = triggerinfoservice.list(); if( 0 == busworkflows.size() || 0 == triggerinfos.size() ){ log.info("数字员工关联的触发器信息不正确,员工记录数:"+busworkflows.size()+",触发器记录数:"+triggerinfos.size()); } else{ //数字员工关联的触发器信息 map<string,autotriggerinfo> loadwfidandtriggerinfo = commonutil.loadwfidandtriggerinfo(busworkflows,triggerinfos); iterator<map.entry<string, autotriggerinfo>> entries = loadwfidandtriggerinfo.entryset().iterator(); while (entries.hasnext()) { map.entry<string, autotriggerinfo> entry = entries.next(); string wfid = entry.getkey(); busworkflow workflow = busworkflows.stream().filter( t -> wfid.equals(t.getwfid()) ).findany().orelse(null); if( null != workflow ){ scheduleutil.start(new scheduletask(wfid,string.valueof(workflow.getwfcreateuserid()),taskservice), entry.getvalue()); } } log.info("数字员工关联的触发器信息全量执行完成,数字员工定时个数:"+loadwfidandtriggerinfo.size()+","+df.format(localdatetime.now())); } }}核心代码:```java scheduleutil.start(new scheduletask(wfid,string.valueof(workflow.getwfcreateuserid()),taskservice), entry.getvalue());
scheduler管理工具类:启动、取消、修改等管理
package com.merak.hyper.automation.util;import com.merak.hyper.automation.scheduling.scheduletask;import com.merak.hyper.automation.persist.entity.autotriggerinfo;import org.slf4j.logger;import org.slf4j.loggerfactory;import org.springframework.scheduling.concurrent.threadpooltaskscheduler;import org.springframework.scheduling.support.crontrigger;import java.util.hashmap;import java.util.map;import java.util.concurrent.scheduledfuture;/** * @version 1.0 * @classname: scheduleutil * @description: scheduler管理工具类:启动、取消、修改等管理 */public class scheduleutil { public static final logger log = loggerfactory.getlogger(scheduleutil.class); private static threadpooltaskscheduler threadpooltaskscheduler = springcontextutils.getbean(threadpooltaskscheduler.class); //存储[数字员工wfi,dscheduledfuture]集合 private static map<string, scheduledfuture<?>> scheduledfuturemap = new hashmap<>(); /** * 启动 * * @param scheduletask 定时任务 * @param triggerinfo */ public static boolean start(scheduletask scheduletask, autotriggerinfo triggerinfo) { string wfid = scheduletask.getid(); log.info("启动数字员工"+wfid+"定时任务线程" + scheduletask.getid()); scheduledfuture<?> scheduledfuture = threadpooltaskscheduler.schedule(scheduletask, new crontrigger(triggerinfo.getlogicconfig())); scheduledfuturemap.put(wfid, scheduledfuture); return true; } /** * 取消 * * @param scheduletask 定时任务 */ public static boolean cancel(scheduletask scheduletask) { log.info("关闭定时任务线程 taskid " + scheduletask.getid()); scheduledfuture<?> scheduledfuture = scheduledfuturemap.get(scheduletask.getid()); if (scheduledfuture != null && !scheduledfuture.iscancelled()) { scheduledfuture.cancel(false); } scheduledfuturemap.remove(scheduletask.getid()); return true; } /** * 修改 * * @param scheduletask 定时任务 * @param triggerinfo */ public static boolean reset(scheduletask scheduletask, autotriggerinfo triggerinfo) { //先取消定时任务 cancel(scheduletask); //然后启动新的定时任务 start(scheduletask, triggerinfo); return true; }}
scheduletask类:scheduletask任务类
package com.merak.hyper.automation.scheduling;import org.slf4j.logger;import org.slf4j.loggerfactory;/** * @version 1.0 * @classname: scheduletask * @description: scheduletask,关联任务id、用户id和具体执行的taskservice类,实现runnable类 */public class scheduletask implements runnable { private static final int timeout = 30000; private string id; private string userid; private taskservice service; public static final logger log = loggerfactory.getlogger(scheduletask.class); public string getid() { return id; } /** * @param id 任务id * @param service 业务类 */ public scheduletask(string id, string userid, taskservice service) { this.id = id; this.userid = userid; this.service = service; } @override public void run() { log.info("scheduletask-执行数字员工消息的发送,id:"+ this.id + ",用户id:"+userid); service.work(this.id,this.userid); }}
/** * @version 1.0 * @classname: taskservice * @description: taskservice */public interface taskservice { /** * 业务处理方法 * @param keyword 关键参数 * @param userid */ void work(string keyword,string userid);}/** * @description: taskservice实现类,具体执行定时调度的业务 */@servicepublic class taskserviceimpl implements taskservice { public static final logger log = loggerfactory.getlogger(taskserviceimpl.class); @autowired private iautodeviceinfoservice deviceinfoservice; @override public void work(string wfid,string userid) { try { log.info("定时任务:根据数字员工wfid"+ wfid +",用户id:"+userid+",发送消息..."); //sendrobotmsg(wfid,userid); thread.sleep(200); } catch (interruptedexception e) { e.printstacktrace(); } }
二、通过web配置的变更,动态管理定时任务
scheduledcontroller类:scheduled web业务层:启动、取消、修改等管理schedule
调度任务信息变更(如1:trigger cron变更 2:任务停止 3:任务新增加等)
package com.merak.hyper.automation.controller;import com.merak.hyper.automation.common.core.domain.ajaxresult;import com.merak.hyper.automation.common.core.vo.scheduledapivo;import com.merak.hyper.automation.persist.entity.autotriggerinfo;import com.merak.hyper.automation.persist.service.iautotriggerinfoservice;import com.merak.hyper.automation.util.scheduledhelper;import org.slf4j.logger;import org.slf4j.loggerfactory;import org.springframework.beans.factory.annotation.autowired;import org.springframework.web.bind.annotation.*;/** * @version 1.0 * @classname: scheduledcontroller * @description: scheduled web业务层:启动、取消、修改等管理schedule */@restcontroller@requestmapping("/api/scheduled")public class scheduledcontroller { public static final logger log = loggerfactory.getlogger(scheduledcontroller.class); @autowired private iautotriggerinfoservice triggerinfoservice; @autowired private scheduledhelper scheduledhelper; @postmapping("/add") public ajaxresult addscheduleds(@requestbody scheduledapivo scheduledapivo){ autotriggerinfo autotriggerinfo = triggerinfoservice.getbyid(scheduledapivo.gettriggerid()); scheduledhelper.addscheduleds(scheduledapivo,autotriggerinfo); return ajaxresult.success(); } @postmapping("/reset") public ajaxresult resetscheduleds(@requestbody scheduledapivo scheduledapivo){ autotriggerinfo autotriggerinfo = triggerinfoservice.getbyid(scheduledapivo.gettriggerid()); scheduledhelper.resetscheduleds(scheduledapivo,autotriggerinfo); return ajaxresult.success(); } @postmapping("/stop") public ajaxresult stopscheduleds(@requestbody scheduledapivo scheduledapivo){ autotriggerinfo autotriggerinfo = triggerinfoservice.getbyid(scheduledapivo.gettriggerid()); scheduledhelper.stopscheduleds(scheduledapivo); return ajaxresult.success(); }}scheduledhelper类:对外提供scheduledhelper管理:创建、变更、停止```javapackage com.merak.hyper.automation.util;import com.merak.hyper.automation.scheduling.scheduletask;import com.merak.hyper.automation.scheduling.taskservice;import com.merak.hyper.automation.common.core.vo.scheduledapivo;import com.merak.hyper.automation.persist.entity.autotriggerinfo;import org.slf4j.logger;import org.slf4j.loggerfactory;import org.springframework.scheduling.annotation.async;import org.springframework.stereotype.component;/** * @version 1.0 * @classname: scheduledhelper * @description:对外提供scheduledhelper管理:创建、变更、停止 */@componentpublic class scheduledhelper { public static final logger log = loggerfactory.getlogger(scheduledhelper.class); /** * @description: 对外(web)提供异步的scheduleds增加操作 */ @async("asynctaskexecutor") public void addscheduleds(scheduledapivo scheduledapivo, autotriggerinfo triggerinfo) { //addschedule任务 log.warn("创建原数字员工["+scheduledapivo.getwfid()+"],同步启动schedule任务"); taskservice taskservice = springcontextutils.getbean(taskservice.class); scheduleutil.start(new scheduletask(scheduledapivo.getwfid(), scheduledapivo.getuserid(), taskservice), triggerinfo); } @async("asynctaskexecutor") public void resetscheduleds(scheduledapivo scheduledapivo,autotriggerinfo triggerinfo) { //cron值改变,变更schedule任务 log.warn("数字员工["+scheduledapivo.getwfid()+"]关联的触发器信息cron值改变,变更schedule任务"); taskservice taskservice = springcontextutils.getbean(taskservice.class); scheduleutil.reset(new scheduletask(scheduledapivo.getwfid(), scheduledapivo.getuserid(), taskservice), triggerinfo); } @async("asynctaskexecutor") public void stopscheduleds(scheduledapivo scheduledapivo) { //移除wfid,停止原schedule任务 log.warn("原数字员工["+scheduledapivo.getwfid()+"]无效,同步停止schedule任务"); taskservice taskservice = springcontextutils.getbean(taskservice.class); scheduleutil.cancel(new scheduletask(scheduledapivo.getwfid(), scheduledapivo.getuserid(), taskservice)); }}
springcontextutils类:
package com.merak.hyper.automation.util;import org.springframework.beans.beansexception;import org.springframework.context.applicationcontext;import org.springframework.context.applicationcontextaware;import org.springframework.stereotype.component;/** * @version 1.0 * @classname: springcontextutils * @description: 加载class对象 */@componentpublic class springcontextutils implements applicationcontextaware { private static applicationcontext applicationcontext; @override public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception { springcontextutils.applicationcontext = applicationcontext; } public static object getbean(string name) { return applicationcontext.getbean(name); } public static <t> t getbean(class<t> requiredtype) { return applicationcontext.getbean(requiredtype); } public static <t> t getbean(string name, class<t> requiredtype) { return applicationcontext.getbean(name, requiredtype); } public static boolean containsbean(string name) { return applicationcontext.containsbean(name); } public static boolean issingleton(string name) { return applicationcontext.issingleton(name); } public static class<? extends object> gettype(string name) { return applicationcontext.gettype(name); }}
scheduledapivo类:
import java.io.serializable;/** * @version 1.0 * @classname: scheduledapivo * @description: scheduled web业务层api传递参数vo类 */public class scheduledapivo implements serializable { private string wfid; private string userid; private string triggerid; //set get 略}
最终:web端通过发送http请求 ,调用scheduledhelper管理类接口,实现scheduled创建、变更、停止操作
log.info("3:云托管更新启动数字员工操作"); scheduledapivo scheduledapivo = new scheduledapivo(); scheduledapivo.setwfid(wfid); scheduledapivo.setuserid(string.valueof(updateuserid)); scheduledapivo.settriggerid(newtriggerinfo.getid()); string webhookbody = json.tojsonstring(scheduledapivo); emsapiutil.sendquartzmessage(url, "add", webhookbody); ******************** 分隔 ************************ public static boolean sendquartzmessage(string quartzurl, string method, string webhookbody){ boolean result = false; try{ //org.apache.httpcomponents.httpclient sendpost,pom依赖如下dependency string resp = httpclientutil.sendpostbyjson(quartzurl+"/"+method, webhookbody,0); if( "error".equals(resp) || resp.contains("405 not allowed")){ log.error("调用任务调度中心消息发送失败,地址:"+quartzurl); } else { jsonobject jsonobject = json.parseobject(resp); if( "200".equals(string.valueof(jsonobject.get("code"))) ){ result = true; } else{ log.error("调用任务调度中心失败,msg:"+string.valueof(jsonobject.get("msg"))); } } }catch (exception e){ log.error("调用任务调度中心失败,msg:"+e.getmessage()); } return result; }
<dependency> <groupid>org.apache.httpcomponents</groupid> <artifactid>httpclient</artifactid> <version>4.5.2</version> </dependency>
以上就是springboot schedule调度任务的动态管理方法是什么的详细内容。
其它类似信息

推荐信息