总结:线程池的特点是,在线程的数量=corepoolsize后,仅任务队列满了之后,才会从任务队列中取出一个任务,然后构造一个新的线程,循环往复直到线程数量达到maximumpoolsize执行拒绝策略。
线程池-intsmaze线程池的思想是:在系统中开辟一块区域,其中存放一些待命的线程,这个区域被称为线程池。如果有需要执行的任务,则从线程池中借一个待命的线程来执行指定的任务,到任务结束可以再将所借线程归还。这样就避免了大量重复创建线程对象,浪费cpu,内存资源。
自定义线程池-intsmaze如果观察jdk提供的各种线程池的源码实现可以发现,除了jdk8新增的线程池newworkstealingpool以外,都是基于对threadpoolexecutor的封装实现,所以首先讲解threadpoolexecutor的具体功能。
threadpoolexecutor详解-intsmaze
threadpoolexecutor( corepoolsize, maximumpoolsize, keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, threadfactory threadfactory, rejectedexecutionhandler handler)
corepoolsize:指定线程池中线程数量
maximumpoolsize:最大线程数量
keepalivetime:线程数量超过corepoolsize时,多于的空闲线程的存活时间(超过这段时间,该空闲线程会被销毁)。
unit:keepalivetime的时间单位
workqueue:任务队列,提交但是未被执行的任务
threadfactory:创建线程的线程工厂,默认即可
handler:拒绝策略,当任务太多来不及处理,如何拒绝任务,默认为new abortpolicy()策略。
executorservice es = new threadpoolexecutor(3, 8, 60l, timeunit.seconds, new linkedblockingqueue<runnable>(), executors.defaultthreadfactory(), new rejectedexecutionhandler() { public void rejectedexecution(runnable r, threadpoolexecutor executor) { system.out.println("discard"); } });
任务队列--存放runnable对象-intsmaze总结:线程池的特点是,在线程的数量=corepoolsize后,仅任务队列满了之后,才会从任务队列中取出一个任务,然后构造一个新的线程,循环往复直到线程数量达到maximumpoolsize执行拒绝策略。
只要队列实现blockingqueue接口即可,注意concurrentlinkedqueue实现的最顶层的queue接口所以不能用在这里。
常用的有如下:
synchronousqueue:直接提交队列,该队列没有容量,每一个插入操作都要等待一个相应的删除操作,反之,每一个删除操作都要等待对应的插入操作。所以他不保存任务,总是将任务提交给线程执行,如果没有空闲的线程,则创建新的线程,当线程数量达到最大,则执行拒绝策略。
arrayblockingqueue:有界任务队列,线程池的线程数小于corepoolsize,则创建新的线程,大于corepoolsize,则将新的任务加入等待队列。若等待队列已满,则在总线程不大于maximumpoolsize下,创建新的线程执行任务,大于maximumpoolsize则执行拒绝策略。
linkedblockingqueue:无界队列,除非系统资源耗尽,否则不存在任务入队失败的情况。线程池的线程数小于corepoolsize,则创建新的线程,大于corepoolsize,则将新的任务加入等待队列。
priortyblockingqueue:优先任务队列,可以控制任务的执行先后顺序,是无界队列。arrayblockingqueue,linkedblockingqueue都是按照先进先出算法处理任务的,priorityblockingqueue可以根据任务自身的优先顺序先后执行。
拒绝策略-intsmaze线程池中的线程用完了,同时等待队列中的任务已经塞满了,再也塞不下新任务了,就需要拒绝策略:处理任务数量超过系统实际承受能力时,处理方式。
jdk内置四种拒绝策略:
abortpolicy:直接抛出异常(默认策略),就算线程池有空闲了,后面的线程也无法在运行了,要想后面的线程可以运行,要捕获异常信息。
callerrunspolicy:该策略直接在调用者线程中运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是任务提交线程的性能极有可能会急剧下降。
discardoldestpolicy:将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
discardpolicy:默默丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这可能是最好的一种解决方案。在线程池不空闲的时候,提交的任务都将丢弃,当有空闲的线程时提交的任务会执行。
下面是jdk的拒绝策略源码-intsmaze
public static class callerrunspolicy implements rejectedexecutionhandler { public callerrunspolicy() { } /** * 直接在调用者线程中运行当前被丢弃的任务 */ public void rejectedexecution(runnable r, threadpoolexecutor e) { if (!e.isshutdown()) { r.run(); } } } public static class abortpolicy implements rejectedexecutionhandler { public abortpolicy() { } public void rejectedexecution(runnable r, threadpoolexecutor e) { throw new rejectedexecutionexception("task " + r.tostring() + " rejected from " + e.tostring()); } } public static class discardpolicy implements rejectedexecutionhandler { public discardpolicy() { } /** * does nothing, which has the effect of discarding task r. */ public void rejectedexecution(runnable r, threadpoolexecutor e) { } } public static class discardoldestpolicy implements rejectedexecutionhandler { public discardoldestpolicy() { } /** * 将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。 */ public void rejectedexecution(runnable r, threadpoolexecutor e) { if (!e.isshutdown()) { e.getqueue().poll(); e.execute(r); } } }
总结:abortpolicy策略下,我们要catch异常,这样我们可以捕获到哪些任务被丢弃了。如果采用其他的策略,丢弃的任务无法定位的,只能通过下列程序中es.submit(new mytask(i));任务之前打印该任务,运行任务的run()逻辑是,在打印任务信息,两处日志比对来定位哪些任务被丢弃了。
public class mytask implements runnable{ private int number; public mytask(int number) { super(); this.number = number; } public void run() { system.out.println(system.currenttimemillis()+"thread id:"+thread.currentthread().getid()+"==="+number); try { thread.sleep(1000); } catch (interruptedexception e) { e.printstacktrace(); } }} public static void main(string[] args) {// executorservice es=new threadpoolexecutor(5,5,60l, timeunit.seconds, // new arrayblockingqueue<runnable>(1), executors.defaultthreadfactory(),new threadpoolexecutor.abortpolicy()); // executorservice es=new threadpoolexecutor(5,5,60l, timeunit.seconds,// new arrayblockingqueue<runnable>(5), executors.defaultthreadfactory(),new threadpoolexecutor.callerrunspolicy()); // executorservice es=new threadpoolexecutor(5,5,60l, timeunit.seconds,// new arrayblockingqueue<runnable>(5), executors.defaultthreadfactory(),new threadpoolexecutor.discardpolicy()); executorservice es=new threadpoolexecutor(5,5,60l, timeunit.seconds, new arrayblockingqueue<runnable>(5), executors.defaultthreadfactory(),new threadpoolexecutor.discardoldestpolicy()); for(int i=0;i<10000;i++) { try { system.out.println(i); es.submit(new mytask(i)); thread.sleep(100); } catch (exception e) { e.printstacktrace(); system.out.println("------------------------"+i); } } }
线程池执行逻辑源码解析-intsmaze
public future<?> submit(runnable task) { if (task == null) throw new nullpointerexception(); runnablefuture<void> ftask = newtaskfor(task, null); execute(ftask); return ftask; } /** * executes the given task sometime in the future. the task * may execute in a new thread or in an existing pooled thread. * * if the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code rejectedexecutionhandler}. * * @param command the task to execute * @throws rejectedexecutionexception at discretion of * {@code rejectedexecutionhandler}, if the task * cannot be accepted for execution * @throws nullpointerexception if {@code command} is null */ public void execute(runnable command) { if (command == null) throw new nullpointerexception(); /* * proceed in 3 steps: * * 1. if fewer than corepoolsize threads are running, try to * start a new thread with the given command as its first * task. the call to addworker atomically checks runstate and * workercount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. *如果少于corepoolsize线程正在运行,首先尝试用给定的命令启动一个新的线程任务。 自动调用addworker检查runstate和workercount, * 2. if a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. so we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. *如果任务可以成功排队,那么我们仍然需要 仔细检查我们是否应该添加一个线程 (因为现有的自从上次检查后死亡)或者那个 自进入该方法以来,该池关闭。 所以我们 重新检查状态,如果有必要的话回滚队列 停止,或者如果没有的话就开始一个新的线程。 * 3. if we cannot queue task, then we try to add a new * thread. if it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workercountof(c) < corepoolsize) { if (addworker(command, true)) return; c = ctl.get(); } if (isrunning(c) && workqueue.offer(command)) { int recheck = ctl.get(); if (! isrunning(recheck) && remove(command)) reject(command);//队列满了,执行拒绝策略 else if (workercountof(recheck) == 0) addworker(null, false); } else if (!addworker(command, false)) reject(command); } final void reject(runnable command) { handler.rejectedexecution(command, this);//这里就是调用我们传入的拒绝策略对象的方法 } /** * dispatch an uncaught exception to the handler. this method is * intended to be called only by the jvm. */ private void dispatchuncaughtexception(throwable e) { getuncaughtexceptionhandler().uncaughtexception(this, e); }
jdk的线程池实现类-intsmazenewfixedthreadpoo-intsmaze任务队列为linkedblockingqueue中(长度无限),线程数量和最大线程数量相同。功能参考前面的任务队列总结。
executorservice es=executors.newfixedthreadpool(5);//参数同时指定线程池中线程数量为5,最大线程数量为5public static executorservice newfixedthreadpool(int nthreads) { return new threadpoolexecutor(nthreads, nthreads, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>());}
newsinglethreadexecutor-intsmaze任务队列linkedblockingqueue中(长度无限),线程数量和最大线程数量均为1。
executorservice es=executors.newsinglethreadexecutor();//线程池中线程数量和最大线程数量均为1.public static executorservice newsinglethreadexecutor() { return new finalizabledelegatedexecutorservice (new threadpoolexecutor(1, 1, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>()));}
newcachedthreadpool-intsmaze任务队列为synchronousqueue,线程数量为0,最大线程数量为integer.max_value,所以只要有任务没有空闲线程就会创建就新线程。
executorservice es=executors.newcachedthreadpool();//指定线程池中线程数量为0,最大线程数量为integer.max_value,任务队列为synchronousqueuepublic static executorservice newcachedthreadpool() { return new threadpoolexecutor(0, integer.max_value, 60l, timeunit.seconds, new synchronousqueue<runnable>());}
newscheduledthreadpool- -定时线程-intsmaze任务队列为new delayedworkqueue(),返回的对象在executorservice接口上扩展了在指定时间执行某认为的功能,在某个固定的延时之后执行或周期性执行某个任务。
public static scheduledexecutorservice newscheduledthreadpool(int corepoolsize) { return new scheduledthreadpoolexecutor(corepoolsize);}public scheduledthreadpoolexecutor(int corepoolsize) { super(corepoolsize, integer.max_value, 0, nanoseconds, new delayedworkqueue());}public threadpoolexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue) { this(corepoolsize, maximumpoolsize, keepalivetime, unit, workqueue, executors.defaultthreadfactory(), defaulthandler);}
newsinglethreadscheduledexecutor- -定时线程-intsmaze相当于newscheduledthreadpool(int corepoolsize)中corepoolsize设置为1。
scheduledexecutorservice es=executors.newsinglethreadscheduledexecutor();
延迟线程池
class myscheduledtask implements runnable{ private string tname; public myscheduledtask(string tname) { this.tname=tname; } public void run() { system.out.println(tname+"任务时延2秒执行!!!"); }}public class intsmaze{ public static void main(string[] args) { scheduledexecutorservice scheduledthreadpool =executors.newscheduledthreadpool(2); myscheduledtask mt1=new myscheduledtask("mt1"); scheduledthreadpool.schedule(mt1,2,timeunit.seconds); }}
newworkstealingpool java8新增连接池-intsmaze
public static executorservice newworkstealingpool(int parallelism) { return new forkjoinpool (parallelism, forkjoinpool.defaultforkjoinworkerthreadfactory, null, true); }//创建指定数量的线程池来执行给定的并行级别,还会使用多个队列减少竞争 public static executorservice newworkstealingpool() { return new forkjoinpool (runtime.getruntime().availableprocessors(), forkjoinpool.defaultforkjoinworkerthreadfactory, null, true); }//前一个方法的简化,如果当前机器有4个cpu,则目标的并行级别被设置为4。
关闭线程池(很少使用,除了切换数据源时需要控制)-intsmaze希望程序执行完所有任务后退出,调用executorservice接口中的shutdown(),shutdownnow()方法。
用完一个线程池后,应该调用该线程池的shutdown方法,将启动线程池的关闭序列。调用shutdown方法后,线程池不在接收新的任务,但是会将以前所有已经提交的任务执行完。当线程池中的所有任务都执行完后,线程池中的所有线程都会死亡;shutdownnow方法会试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
线程池优化-intsmaze
一般来说确定线程池的大小需要考虑cpu数量,内存大小,jdbc连接等因素。在《java并发编程实践》一书中给出了一个估算线程池大小的经验公式:
ncpu=cpu的数量
ucpu=目标cpu的使用率,0<=ucpu<=1
w/c=等待时间与计算时间的比率
为保持处理器达到期望的使用率,最优的线程池的大小等于:
nthreads=ncpu*ucpu*(1+w/c)
在java中,可以通过
runtime.getruntime().availableprocessors()
取得可以cpu数量。
相关推荐:
java中线程池的图文代码详解
threadpoolexecutor线程池之submit方法
java中threadpoolexecutor线程池的submit方法详解
以上就是java并发线程池:详解threadpoolexecutor的详细内容。