您好,欢迎访问一九零五行业门户网

SpringBoot线程池和Java线程池怎么使用

springboot线程池和java线程池的用法和实现原理使用默认的线程池方式一:通过@async注解调用public class asynctest { @async public void async(string name) throws interruptedexception { system.out.println("async" + name + " " + thread.currentthread().getname()); thread.sleep(1000); }}
启动类上需要添加@enableasync注解,否则不会生效。
@springbootapplication//@enableasyncpublic class test1application { public static void main(string[] args) throws interruptedexception { configurableapplicationcontext run = springapplication.run(test1application.class, args); asynctest bean = run.getbean(asynctest.class); for(int index = 0; index <= 10; ++index){ bean.async(string.valueof(index)); } }}
方式二:直接注入 threadpooltaskexecutor此时可不加 @enableasync注解
@springboottestclass test1applicationtests { @resource threadpooltaskexecutor threadpooltaskexecutor; @test void contextloads() { runnable runnable = () -> { system.out.println(thread.currentthread().getname()); }; for(int index = 0; index <= 10; ++index){ threadpooltaskexecutor.submit(runnable); } }}
线程池默认配置信息springboot线程池的常见配置:
spring: task: execution: pool: core-size: 8 max-size: 16 # 默认是 integer.max_value keep-alive: 60s # 当线程池中的线程数量大于 corepoolsize 时,如果某线程空闲时间超过keepalivetime,线程将被终止 allow-core-thread-timeout: true # 是否允许核心线程超时,默认true queue-capacity: 100 # 线程队列的大小,默认integer.max_value shutdown: await-termination: false # 线程关闭等待 thread-name-prefix: task- # 线程名称的前缀
springboot 线程池的实现原理taskexecutionautoconfiguration 类中定义了 threadpooltaskexecutor,该类的内部实现也是基于java原生的 threadpoolexecutor类。initializeexecutor()方法在其父类中被调用,但是在父类中 rejectedexecutionhandler 被定义为了 private rejectedexecutionhandler rejectedexecutionhandler = new threadpoolexecutor.abortpolicy(); ,并通过initialize()方法将abortpolicy传入initializeexecutor()中。
注意在taskexecutionautoconfiguration 类中,threadpooltaskexecutor类的bean的名称为: applicationtaskexecutor 和 taskexecutor。
// taskexecutionautoconfiguration#applicationtaskexecutor()@lazy@bean(name = { application_task_executor_bean_name, asyncannotationbeanpostprocessor.defaul t_task_executor_bean_name })@conditionalonmissingbean(executor.class)public threadpooltaskexecutor applicationtaskexecutor(taskexecutorbuilder builder) { return builder.build();}
// threadpooltaskexecutor#initializeexecutor()@overrideprotected executorservice initializeexecutor( threadfactory threadfactory, rejectedexecutionhandler rejectedexecutionhandler) { blockingqueue<runnable> queue = createqueue(this.queuecapacity); threadpoolexecutor executor; if (this.taskdecorator != null) { executor = new threadpoolexecutor( this.corepoolsize, this.maxpoolsize, this.keepaliveseconds, timeunit.seconds, queue, threadfactory, rejectedexecutionhandler) { @override public void execute(runnable command) { runnable decorated = taskdecorator.decorate(command); if (decorated != command) { decoratedtaskmap.put(decorated, command); } super.execute(decorated); } }; } else { executor = new threadpoolexecutor( this.corepoolsize, this.maxpoolsize, this.keepaliveseconds, timeunit.seconds, queue, threadfactory, rejectedexecutionhandler); } if (this.allowcorethreadtimeout) { executor.allowcorethreadtimeout(true); } this.threadpoolexecutor = executor; return executor;}
// executorconfigurationsupport#initialize()public void initialize() { if (logger.isinfoenabled()) { logger.info("initializing executorservice" + (this.beanname != null ? " '" + this.beanname + "'" : "")); } if (!this.threadnameprefixset && this.beanname != null) { setthreadnameprefix(this.beanname + "-"); } this.executor = initializeexecutor(this.threadfactory, this.rejectedexecutionhandler);}
覆盖默认的线程池覆盖默认的 taskexecutor对象,bean的返回类型可以是threadpooltaskexecutor也可以是executor。
@configurationpublic class threadpoolconfiguration { @bean("taskexecutor") public threadpooltaskexecutor taskexecutor() { threadpooltaskexecutor taskexecutor = new threadpooltaskexecutor(); //设置线程池参数信息 taskexecutor.setcorepoolsize(10); taskexecutor.setmaxpoolsize(50); taskexecutor.setqueuecapacity(200); taskexecutor.setkeepaliveseconds(60); taskexecutor.setthreadnameprefix("myexecutor--"); taskexecutor.setwaitfortaskstocompleteonshutdown(true); taskexecutor.setawaitterminationseconds(60); //修改拒绝策略为使用当前线程执行 taskexecutor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy()); //初始化线程池 taskexecutor.initialize(); return taskexecutor; }}
管理多个线程池如果出现了多个线程池,例如再定义一个线程池 taskexecutor2,则直接执行会报错。此时需要指定bean的名称即可。
@bean("taskexecutor2")public threadpooltaskexecutor taskexecutor2() { threadpooltaskexecutor taskexecutor = new threadpooltaskexecutor(); //设置线程池参数信息 taskexecutor.setcorepoolsize(10); taskexecutor.setmaxpoolsize(50); taskexecutor.setqueuecapacity(200); taskexecutor.setkeepaliveseconds(60); taskexecutor.setthreadnameprefix("myexecutor2--"); taskexecutor.setwaitfortaskstocompleteonshutdown(true); taskexecutor.setawaitterminationseconds(60); //修改拒绝策略为使用当前线程执行 taskexecutor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy()); //初始化线程池 taskexecutor.initialize(); return taskexecutor;}
引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。
@resourcethreadpooltaskexecutor taskexecutor2;
对于使用@async注解的多线程则在注解中指定bean的名字即可。
@async("taskexecutor2") public void async(string name) throws interruptedexception { system.out.println("async" + name + " " + thread.currentthread().getname()); thread.sleep(1000); }
线程池的四种拒绝策略
java常用的四种线程池threadpoolexecutor 类的构造函数如下:
public threadpoolexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue) { this(corepoolsize, maximumpoolsize, keepalivetime, unit, workqueue, executors.defaultthreadfactory(), defaulthandler);}
newcachedthreadpool不限制最大线程数(maximumpoolsize=integer.max_value),如果有空闲的线程超过需要,则回收,否则重用已有的线程。
new threadpoolexecutor(0, integer.max_value, 60l, timeunit.seconds, new synchronousqueue<runnable>());
newfixedthreadpool定长线程池,超出线程数的任务会在队列中等待。
return new threadpoolexecutor(nthreads, nthreads, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>());
newscheduledthreadpool类似于newcachedthreadpool,线程数无上限,但是可以指定corepoolsize。可实现延迟执行、周期执行。
public scheduledthreadpoolexecutor(int corepoolsize) { super(corepoolsize, integer.max_value, 0, nanoseconds, new delayedworkqueue());}
周期执行:
scheduledexecutorservice scheduledthreadpool = executors.newscheduledthreadpool(5);scheduledthreadpool.scheduleatfixedrate(()->{ system.out.println("rate");}, 1, 1, timeunit.seconds);
延时执行:
scheduledthreadpool.schedule(()->{ system.out.println("delay 3 seconds");}, 3, timeunit.seconds);
newsinglethreadexecutor单线程线程池,可以实现线程的顺序执行。
public static executorservice newsinglethreadexecutor() { return new finalizabledelegatedexecutorservice (new threadpoolexecutor(1, 1, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>()));}
java 线程池中的四种拒绝策略callerrunspolicy:线程池让调用者去执行。
abortpolicy:如果线程池拒绝了任务,直接报错。
discardpolicy:如果线程池拒绝了任务,直接丢弃。
discardoldestpolicy:如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。
callerrunspolicy直接在主线程中执行了run方法。
public static class callerrunspolicy implements rejectedexecutionhandler { public callerrunspolicy() { } public void rejectedexecution(runnable r, threadpoolexecutor e) { if (!e.isshutdown()) { r.run(); } }}
效果类似于:
runnable thread = ()->{ system.out.println(thread.currentthread().getname()); try { thread.sleep(0); } catch (interruptedexception e) { throw new runtimeexception(e); }};thread.run();
abortpolicy直接抛出rejectedexecutionexception异常,并指示任务的信息,线程池的信息。、
public static class abortpolicy implements rejectedexecutionhandler { public abortpolicy() { } public void rejectedexecution(runnable r, threadpoolexecutor e) { throw new rejectedexecutionexception("task " + r.tostring() + " rejected from " + e.tostring()); }}
discardpolicy
什么也不做。
public static class discardpolicy implements rejectedexecutionhandler { public discardpolicy() { } public void rejectedexecution(runnable r, threadpoolexecutor e) { }}
discardoldestpolicy
e.getqueue().poll() : 取出队列最旧的任务。
e.execute(r) : 当前任务入队。
public static class discardoldestpolicy implements rejectedexecutionhandler { public discardoldestpolicy() { } public void rejectedexecution(runnable r, threadpoolexecutor e) { if (!e.isshutdown()) { e.getqueue().poll(); e.execute(r); } }}
java 线程复用的原理java的线程池中保存的是 java.util.concurrent.threadpoolexecutor.worker 对象,该对象在 被维护在private final hashset<worker> workers = new hashset<worker>();。workqueue是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到workqueue队列中。
private final class worker extends abstractqueuedsynchronizer implements runnable{ /** * this class will never be serialized, but we provide a * serialversionuid to suppress a javac warning. */ private static final long serialversionuid = 6138294804551838833l; /** thread this worker is running in. null if factory fails. */ final thread thread; /** initial task to run. possibly null. */ runnable firsttask; /** per-thread task counter */ volatile long completedtasks; /** * creates with given first task and thread from threadfactory. * @param firsttask the first task (null if none) */ worker(runnable firsttask) { setstate(-1); // inhibit interrupts until runworker this.firsttask = firsttask; this.thread = getthreadfactory().newthread(this); } /** delegates main run loop to outer runworker */ public void run() { runworker(this); } // lock methods // // the value 0 represents the unlocked state. // the value 1 represents the locked state. protected boolean isheldexclusively() { return getstate() != 0; } protected boolean tryacquire(int unused) { if (compareandsetstate(0, 1)) { setexclusiveownerthread(thread.currentthread()); return true; } return false; } protected boolean tryrelease(int unused) { setexclusiveownerthread(null); setstate(0); return true; } public void lock() { acquire(1); } public boolean trylock() { return tryacquire(1); } public void unlock() { release(1); } public boolean islocked() { return isheldexclusively(); } void interruptifstarted() { thread t; if (getstate() >= 0 && (t = thread) != null && !t.isinterrupted()) { try { t.interrupt(); } catch (securityexception ignore) { } } }}
work对象的执行依赖于 runworker(),与我们平时写的线程不同,该线程处在一个循环中,并不断地从队列中获取新的任务执行。因此线程池中的线程才可以复用,而不是像我们平常使用的线程一样执行完毕就结束。
final void runworker(worker w) { thread wt = thread.currentthread(); runnable task = w.firsttask; w.firsttask = null; w.unlock(); // allow interrupts boolean completedabruptly = true; try { while (task != null || (task = gettask()) != null) { w.lock(); // if pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. this // requires a recheck in second case to deal with // shutdownnow race while clearing interrupt if ((runstateatleast(ctl.get(), stop) || (thread.interrupted() && runstateatleast(ctl.get(), stop))) && !wt.isinterrupted()) wt.interrupt(); try { beforeexecute(wt, task); throwable thrown = null; try { task.run(); } catch (runtimeexception x) { thrown = x; throw x; } catch (error x) { thrown = x; throw x; } catch (throwable x) { thrown = x; throw new error(x); } finally { afterexecute(task, thrown); } } finally { task = null; w.completedtasks++; w.unlock(); } } completedabruptly = false; } finally { processworkerexit(w, completedabruptly); }}
以上就是springboot线程池和java线程池怎么使用的详细内容。
其它类似信息

推荐信息