AbstractExecutorService实现分析

AbstractExecutorService提供了ExecutorService中submit、invokeAll、invokeAny方法的默认实现。

在AbstractExecutorService中,定义了protected的方法newTaskFor,用于包装Runnable对象或是Callable对象,返回Future对象,默认实现为返回FutureTask。在AbstractExecutorService的子类实现中,可以覆盖newTaskFor方法的实现,返回新的Future实现。如下所示

public class CustomeThreadPoolExecutor extends ThreadPoolExecutor {
    static class CustomTask<V> implements RunnableFuture<V> {}
    
    protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
        return new CustomTask<V>(c);
    }
    
    protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
        return new CustomeTask<V>(r, v);
    }
}

有了newTaskFor方法之后,submit方法的典型实现如下

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T ) {
    return new FutureTask<T>(runnable, value);
}

可以看到,submit方法的实现非常的简单。将task和result包装成RunnableFuture之后,交给execute来执行。在AbstractExecutorService中,execute并没有实现,而是交由具体的子类来实现。

invokeAll方法,有两个版本:可以设置超时的版本和不可设置超时的版本。下面就来分析一下可以设置超时的invokeAll版本。

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeunit, TimeUnit unit) throws InterruptedException {
    
    if (tasks == null) throw new NullPointerException();
    
    // 转换时间大小
    long nanos = unit.toNanos(timeout);
    ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
    boolean done = false;
    try {
        // 生成所有的Future,默认的Future实现为FutureTask
        for (Callable<T> t : tasks) {
            futures.add(newTaskFor(t));
        }
        
        // 当前的系统时间加上超时的时间,就是截止时间
        final long deadline = System.nanoTime() + nanos;
        final int size = futures.size();
        
        for (int i = 0; i < size; i++) {
            execute((Runnable) futures.get(i));
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) return futures;
        }
        
        for (int i = 0; i < size; i++) {
            Future<T> f = futures.get(i);
            // 判断当前的任务是否已经完成。如果没有完成的话,使用get操作来获取任务的结果。get操作使用可设置超时时间的版本。
            if (!f.isDone()) {
                if (nanos <= 0L) return futures;
                try {
                    f.get(nanos, TimeUnit.NANOSECONDS);
                } catch (CancellationException ignore) {}
                catch (ExecutionException ignore) {
                    
                } catch (TimeoutException toe) {return futures;}
                // 截止时间减掉系统的当前时间,就是剩余的可用时间
                nanos = deadline - System.nanoTime();
            }
        }
        // 执行到此处,设置deon为true,说明所有的任务在截止时间之前都已经执行完成。
        done = true;
        return futures;
     } finally {
        // finally处理。done为false,说明前面的执行抛出了异常。需要将任务执行取消操作
        if (!done) {
            for (int i = 0, size = futures.size(); i < size; i++) {
                futures.get(i).cancel(true);
            }
        }
    }
}

invokeAny方法,是调用内部方法doInvokeAny方法实现的。

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException,ExecutionException,TimeoutException {
    
    if (tasks == null) throw new NullPointerException();
    int ntasks = tasks.size();
    if (ntasks == 0) throw new IllegalArgumentException();
    
    ArrayList<Future<T>> futures = new ArrayList<>(ntasks);
    //构造CompletionService的实例,将this传入构造函数中,当作Executor,内部队列使用默认的LinkedBlockingQueue
    ExecutorCompletionService<T> ecs = new ExecutorCompletionService<>(this);
    
    try {
        ExecutorException ee = null;
        //设置截止时间
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        
        Iterator<? extends Callable<T>> it = tasks.iterator();
        //首先提交第一个任务。
        futures.add(ecs.submit(it.next()));
        --ntasks;
        //活跃任务计数
        int active = 1;
        
        for (;;) {
            // 从CompletionService中获取已完成任务。如果内部队列中没有任务已经完成,则直接返回null
            Future<T> f = ecs.poll();
            // 当前没有任务完成的状态
            if (f == null) {
                
                if (ntasks > 0) {
                    --ntasks;
                    futures.add(ecs.submit(it.next()));
                    ++active;
                }
                // 没有提前返回的情况,并且active为0,表示没有再次的提交任务,表示发生了异常
                else if (active == 0) {
                    break;
                }
                // 设置超时的情况下,使用带有超时时间的poll方法。
                else if (timed) {
                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                    if (f == null) throw new TimeoutException();
                    nanos = deadline - System.nanoTime();
                }
                // 否则使用阻塞线程的take方法。
                else {
                    f = ecs.take();
                }
            }
            // 有任务已经完成
            if (f != null) {
                --active;
                try {
                    return f.get();
                } catch (ExecutionException eex) {
                    // 当前任务抛出异常的话,异常并没有抛出去,而是保存了起来,去查看其他的任务的情况。如果后续的任务能够执行完成,则异常并不会返回给上层调用。
                    ee = eex;
                } catch (RuntimeException rex) {
                    ee = new ExecutionException(rex);
                }
            }
        }
        
        if (ee == null) {
            ee = new ExecutionException();
        }
        throw ee;
    }
    // finally块中的代码,会在return之前返回。正常返回的情况下,也需要将任务取消
    finally {
        for (int i = 0, size = futures.size(); i < size; i++) {
            futures.get(i).cancel(true);
        }
    }
}
Comments
Write a Comment