目录

多线程Callable接口

Callable和Future出现的原因

创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。 这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。 如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。

自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果,这也是所谓的“异步”模型

Callable和Future介绍

Callable接口代表一段可以调用并返回结果的代码;Future接口表示异步任务(还没有完成的任务)给出的未来结果。所以说Callable用于产生结果,Future用于获取结果。

Callable接口使用泛型去定义它的返回类型。Executors类提供了一些有用的方法在线程池中执行Callable内的任务。由于Callable任务是并行的(并行就是整体看上去是并行的,其实在某个时间点只有一个线程在执行),我们必须等待它返回的结果。 java.util.concurrent.Future对象为我们解决了这个问题。在线程池提交Callable任务后返回了一个Future对象,使用它可以知道Callable任务的状态和得到Callable返回的执行结果。Future提供了get()方法让我们可以等待Callable结束并获取它的执行结果。

Callable使用

Callable一般是配合线程池工具ExecutorService来使用的。,在ExecutorService接口中声明了若干个submit方法的重载版本:

1
2
3
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

Future

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。

Future类位于java.util.concurrent包下,它是一个接口:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15

public interface Future<V> {
  /* 试图取消一个线程的执行。注意是试图取消,并不一定能取消成功。因为任务可能已完成、已取消、或者一些其它因素不能取消,
  存在取消失败的可能。boolean类型的返回值是“是否取消成功”的意思。参数mayInterruptIfRunning表示是否采用中断的方式取消线程执
  行。所以有时候,为了让任务有能够取消的功能,就使用Callable来代替Runnable。如果为了可取消性而使用 Future但又不提供可用的结
  果,则可以声明 Future<?>形式类型、并返回 null作为底层任务的结果。*/
    boolean cancel(boolean mayInterruptIfRunning); 
  
    boolean isCancelled(); // 任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
    boolean isDone(); // 任务是否已经完成,若任务完成,则返回true;
  /*注意调用get方法会阻塞当前线程,直到得到结果。所以实际编码中建议使用可以设置超时时间的重载get方法。*/
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) // 获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
        throws InterruptedException, ExecutionException, TimeoutException;
}

也就是说Future提供了三种功能:判断任务是否完成;能够中断任务;能够获取任务执行结果。

看一个简单实用的demo:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 自定义Callable
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        // 模拟计算需要一秒
        Thread.sleep(1000);
        return 2;
    }
    public static void main(String args[]){
        // 使用
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        // 注意调用get方法会阻塞当前线程,直到得到结果。
        // 所以实际编码中建议使用可以设置超时时间的重载get方法。
        System.out.println(result.get()); 
    }
}

因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。

FutureTask

FutureTask实现了RunnableFuture接口,这个接口的定义如下:

1
2
3
public interface RunnableFuture<V> extends Runnable, Future<V> {  
    void run();  
} 

FutureTask在jdk8中的成员变量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     *
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;

state表示任务的运行状态,初始状态为NEW。运行状态只会在set、setException、cancel方法中终止。COMPLETING、INTERRUPTING是任务完成后的瞬时状态。

FutureTask可以由执行者调度,这一点很关键。它对外提供的方法基本上就是Future和Runnable接口的组合:get()、cancel、isDone()、isCancelled()和run(),而run()方法通常都 是由执行者调用,我们基本上不需要直接调用它。

示例代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// 自定义Callable,与上面一样
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        // 模拟计算需要一秒
        Thread.sleep(1000);
        return 2;
    }
    public static void main(String args[]){
        // 使用
        ExecutorService executor = Executors.newCachedThreadPool();
        FutureTask<Integer> futureTask = new FutureTask<>(new Task());
        executor.submit(futureTask);
        System.out.println(futureTask.get());
    }
}

使用上与第一个Demo有一点小的区别。首先,调用submit方法是没有返回值的。这里实际上是调用的submit(Runnable task)方法,而上面的Demo,调用的是submit(Callable<T> task)方法。

然后,这里是使用FutureTask直接取get取值,而上面的Demo是通过submit方法返回的Future去取值。

在很多高并发的环境下,有可能Callable和FutureTask会创建多次。FutureTask能够在高并发环境下确保任务只执行一次。这块有兴趣的同学可以参看FutureTask源码。