了解一下什么是futuretask?futuretask 是一个可取消的异步计算。
futuretask提供了对future的基本实现,可以调用方法去开始和取消一个计算,可以查询计算是否完成,并且获取计算结果。
futuretask只能在计算完成后获取到计算结果,一旦计算完成,将不能重启或者取消,除非调用runandreset方法。
futuretask除了实现了future接口以外,还实现了runnable接口,因此futuretask是可以交由线程池的executor执行,也可以直接使用一个异步线程调用执行(futuretask.run())。
futuretask 是如何实现的呢?首先,我们看一下futuretask类的继承结构,如下图,它实现的是runnablefuture接口,而runnablefuture继承自future和函数式接口runnable,所以说futuretask本质就是一个可运行的future。
future 接口约定了一些异步计算类必须要实现的功能,源码如下:
package java.util.concurrent;public interface future<v> { /** * 尝试取消任务的执行,并返回取消结果。 * 参数mayinterruptifrunning:是否中断线程。 */ boolean cancel(boolean mayinterruptifrunning); /** * 判断任务是否被取消(正常结束之前被被取消返回true) */ boolean iscancelled(); /** * 判断当前任务是否执行完毕,包括正常执行完毕、执行异常或者任务取消。 */ boolean isdone(); /** * 获取任务执行结果,任务结束之前会阻塞。 */ v get() throws interruptedexception, executionexception; /** * 在指定时间内尝试获取执行结果。若超时则抛出超时异常timeoutexception */ v get(long timeout, timeunit unit) throws interruptedexception, executionexception, timeoutexception;}
runnable 接口我们都很熟悉,他就是一个函数式接口,我们常用其创建一个线程。
package java.lang;?@functionalinterfacepublic interface runnable { ? ? ? ?public abstract void run();}
futuretask就是一个将要被执行的任务,它包含了以上接口具体的实现,futuretask内部定义了任务的状态state和一些状态的常量,它的内部核心是一个callable callable,我们通过构造函数可以传入callable或者是runnable,最后都会内部转为callable,因为我们需要获取异步任务的执行结果,只有通过callable创建的线程才会返回结果。
我们可以通过此时的状态判断future中iscancelled(),isdone()的返回结果。
以下为futuretask源码,内含核心源码分析注释
package java.util.concurrent;import java.util.concurrent.locks.locksupport;public class futuretask<v> implements runnablefuture<v> { /** * 任务的运行状态 */ private volatile int state; private static final int new = 0; // 新建 private static final int completing = 1; // 完成 private static final int normal = 2; // 正常 private static final int exceptional = 3; // 异常 private static final int cancelled = 4; // 取消 private static final int interrupting = 5; // 中断中 private static final int interrupted = 6; // 中断的 private callable<v> callable; /** * 返回结果 */ private object outcome; private volatile thread runner; private volatile waitnode waiters; ... public futuretask(callable<v> callable) { if (callable == null) throw new nullpointerexception(); this.callable = callable; this.state = new; } public futuretask(runnable runnable, v result) { this.callable = executors.callable(runnable, result); this.state = new; } public boolean iscancelled() { return state >= cancelled; } public boolean isdone() { return state != new; } /* * 取消任务实现 * 如果任务还没有启动就调用了cancel(true),任务将永远不会被执行。 * 如果任务已经启动,参数mayinterruptifrunning将决定任务是否应该中断执行该任务的线程,以尝试中断该任务。 * 如果任务任务已经取消、已经完成或者其他原因不能取消,尝试将失败。 */ public boolean cancel(boolean mayinterruptifrunning) { if (!(state == new && unsafe.compareandswapint(this, stateoffset, new, mayinterruptifrunning ? interrupting : cancelled))) return false; try { // in case call to interrupt throws exception if (mayinterruptifrunning) { try { thread t = runner; if (t != null) t.interrupt(); } finally { // final state unsafe.putorderedint(this, stateoffset, interrupted); } } } finally { finishcompletion(); } return true; } /* * 等待获取结果 * 获取当前状态,判断是否执行完成。并且判断时间是否超时 * 如果任务没有执行完成,就阻塞等待完成,若超时抛出超时等待异常。 */ public v get() throws interruptedexception, executionexception { int s = state; if (s <= completing) s = awaitdone(false, 0l); return report(s); } /* * 等待获取结果 * 获取当前状态,判断是否执行完成。 * 如果任务没有执行完成,就阻塞等待完成。 */ public v get(long timeout, timeunit unit) throws interruptedexception, executionexception, timeoutexception { if (unit == null) throw new nullpointerexception(); int s = state; if (s <= completing && (s = awaitdone(true, unit.tonanos(timeout))) <= completing) throw new timeoutexception(); return report(s); } /** * 根据状态判断返回结果还是异常 */ private v report(int s) throws executionexception { object x = outcome; if (s == normal) return (v)x; if (s >= cancelled) throw new cancellationexception(); throw new executionexception((throwable)x); } protected void done() { } /** * 设置结果借助cas确认状态是否完成状态 */ protected void set(v v) { if (unsafe.compareandswapint(this, stateoffset, new, completing)) { outcome = v; unsafe.putorderedint(this, stateoffset, normal); // final state finishcompletion(); } } /** * 设置异常,当运行完成出现异常,设置异常状态 */ protected void setexception(throwable t) { if (unsafe.compareandswapint(this, stateoffset, new, completing)) { outcome = t; unsafe.putorderedint(this, stateoffset, exceptional); // final state finishcompletion(); } } /* * 执行callable获取结果,或者异常 * 判断状态是不是启动过的,如果是新建才可以执行run方法 */ public void run() { if (state != new || !unsafe.compareandswapobject(this, runneroffset, null, thread.currentthread())) return; try { callable<v> c = callable; if (c != null && state == new) { v result; boolean ran; try { result = c.call(); ran = true; } catch (throwable ex) { result = null; ran = false; setexception(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= interrupting) handlepossiblecancellationinterrupt(s); } } /** * 重新执行 */ protected boolean runandreset() { if (state != new || !unsafe.compareandswapobject(this, runneroffset, null, thread.currentthread())) return false; boolean ran = false; int s = state; try { callable<v> c = callable; if (c != null && s == new) { try { c.call(); // don't set result ran = true; } catch (throwable ex) { setexception(ex); } } } finally { runner = null; s = state; if (s >= interrupting) handlepossiblecancellationinterrupt(s); } return ran && s == new; } /* * 处理可能取消的中断 */ private void handlepossiblecancellationinterrupt(int s) { if (s == interrupting) while (state == interrupting) thread.yield(); } static final class waitnode { volatile thread thread; volatile waitnode next; waitnode() { thread = thread.currentthread(); } } /** * 移除并唤醒所有等待线程,执行done,置空callable */ private void finishcompletion() { // assert state > completing; for (waitnode q; (q = waiters) != null;) { if (unsafe.compareandswapobject(this, waitersoffset, q, null)) { for (;;) { thread t = q.thread; if (t != null) { q.thread = null; locksupport.unpark(t); } waitnode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } /** * 等待完成 * 首先判断是否超时 * 处理中断的,然后处理异常状态的,处理完成的... */ private int awaitdone(boolean timed, long nanos) throws interruptedexception { final long deadline = timed ? system.nanotime() + nanos : 0l; waitnode q = null; boolean queued = false; for (;;) { if (thread.interrupted()) { removewaiter(q); throw new interruptedexception(); } int s = state; if (s > completing) { if (q != null) q.thread = null; return s; } else if (s == completing) // cannot time out yet thread.yield(); else if (q == null) q = new waitnode(); else if (!queued) queued = unsafe.compareandswapobject(this, waitersoffset, q.next = waiters, q); else if (timed) { nanos = deadline - system.nanotime(); if (nanos <= 0l) { removewaiter(q); return state; } locksupport.parknanos(this, nanos); } else locksupport.park(this); } } /** * 去除等待 */ private void removewaiter(waitnode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removewaiter race for (waitnode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!unsafe.compareandswapobject(this, waitersoffset, q, s)) continue retry; } break; } } } // unsafe mechanics private static final sun.misc.unsafe unsafe; private static final long stateoffset; private static final long runneroffset; private static final long waitersoffset; static { try { unsafe = sun.misc.unsafe.getunsafe(); class<?> k = futuretask.class; stateoffset = unsafe.objectfieldoffset (k.getdeclaredfield("state")); runneroffset = unsafe.objectfieldoffset (k.getdeclaredfield("runner")); waitersoffset = unsafe.objectfieldoffset (k.getdeclaredfield("waiters")); } catch (exception e) { throw new error(e); } }}
futuretask 运行流程一般来说,我们可以认为futuretask具有以下三种状态:
未启动:新建的futuretask,在run()没执行之前,futuretask处于未启动状态。
private static final int new = 0; // 新建
已启动:futuretask对象的run方法启动并执行的过程中,futuretask处于已启动状态。
已完成:futuretask正常执行结束,或者futuretask执行被取消(futuretask对象cancel方法),或者futuretask对象run方法执行抛出异常而导致中断而结束,futuretask都处于已完成状态。
private static final int completing = 1; // 完成private static final int normal = 2; // 完成后正常设置结果private static final int exceptional = 3; // 完成后异常设置异常private static final int cancelled = 4; // 执行取消private static final int interrupting = 5; // 中断中private static final int interrupted = 6; // 中断的
futuretask 的使用使用一(直接新建一个线程调用):
futuretask<integer> task = new futuretask<>(new callable() { @override public integer call() throws exception { return sum(); }});new thread(task).stat();integer result = task.get();
使用二(结合线程池使用)
futuretask<integer> task = new futuretask<>(new callable() { @override public integer call() throws exception { return sum(); }});executors.newcachedthreadpool().submit(task);integer result = task.get();
以上就是java futuretask源码分析及使用详解的详细内容。