模块  java.base
软件包  java.util.concurrent

Class CountedCompleter<T>

  • 实现的所有接口
    SerializableFuture<T>

    public abstract class CountedCompleter<T>
    extends ForkJoinTask<T>
    ForkJoinTask ,触发时执行完成操作,并且没有剩余的待处理操作。 与其他形式的ForkJoinTasks相比,CountedCompleters在子任务停顿和阻塞的情况下通常更强大,但编程不太直观。 CountedCompleter的使用类似于其他基于完成的组件(例如CompletionHandler )的使用,除了可能需要多个未决的完成来触发完成操作onCompletion(CountedCompleter) ,而不仅仅是一个。 除非另有初始化, pending count开始于零,但也可以是(原子),使用方法改变setPendingCount(int)addToPendingCount(int) ,和compareAndSetPendingCount(int, int) 在调用tryComplete() ,如果挂起的操作计数非零,则递减; 否则,执行完成动作,并且如果该完成者本身具有完成者,则该过程继续其完成者。 与相关同步组件(如PhaserSemaphore)的情况一样 ,这些方法仅影响内部计数; 他们没有建立任何进一步的内部簿记。 特别是,未维护待处理任务的身份。 如下所示,您可以创建在需要时记录部分或全部待处理任务或其结果的子类。 如下所示,还提供了支持完成遍历的定制的实用方法。 但是,由于CountedCompleters仅提供基本同步机制,因此创建进一步的抽象子类可能很有用,这些子类维护适用于一组相关用法的链接,字段和其他支持方法。

    具体的CountedCompleter类必须定义方法compute() ,在大多数情况下(如下图所示),在返回之前调用tryComplete() 该类还可以可选地覆盖方法onCompletion(CountedCompleter)以在正常完成时执行动作,并且方法onExceptionalCompletion(Throwable, CountedCompleter)可以在任何异常时执行动作。

    CountedCompleters通常不承担结果,在这种情况下,它们通常被声明为CountedCompleter<Void> ,并且将始终返回null作为结果值。 在其他情况下,您应覆盖方法getRawResult()以提供join(), invoke()和相关方法的结果。 通常,此方法应返回CountedCompleter对象的字段值(或一个或多个字段的函数),该对象在完成时保存结果。 默认情况下,方法setRawResult(T)在CountedCompleters中不起作用。 有可能(但很少适用)覆盖此方法以维护保存结果数据的其他对象或字段。

    一个不具有完成者的CountedCompleter (即, getCompleter()返回null )可以用作具有此附加功能的常规ForkJoinTask。 但是,任何具有另一个完成者的完成者仅用作其他计算的内部帮助者,因此其自身的任务状态(如Future.isDone()等方法中所报告的 )是任意的; 这种状况只有在明确调用改变complete(T)ForkJoinTask.cancel(boolean)ForkJoinTask.completeExceptionally(Throwable)或方法的特殊结束后compute 在任何异常完成时,异常可以被转发到任务的完成者(及其完成者,等等),如果存在并且尚未完成。 同样,取消内部的CountedCompleter只会对该完成者产生局部影响,因此通常不常用。

    示例用法。

    并行递归分解。 CountedCompleters可以安排在类似于RecursiveAction经常使用的树中,尽管设置它们的结构通常会有所不同。 这里,每个任务的完成者是计算树中的父。 尽管它们需要更多的簿记,但在对阵列或集合的每个元素应用可能耗时的操作(无法进一步细分)时,CountedCompleters可能是更好的选择; 特别是当某些元素的操作完成时间与其他元素完全不同时,或者由于内在的变化(例如I / O)或者诸如垃圾收集之类的辅助效果。 由于CountedCompleters提供了自己的延续,因此其他任务无需阻止等待执行它们。

    例如,这是一个实用程序方法的初始版本,它使用二分二递归分解将工作分成单个部分(叶子任务)。 即使工作被分成单独的调用,基于树的技术通常比直接分支叶子任务更可取,因为它们减少了线程间的通信并改善了负载平衡。 在递归的情况下,完成每对子任务的第二个触发其父级的完成(因为没有执行结果组合,方法onCompletion的默认无操作实现不被覆盖)。 实用程序方法设置root任务并调用它(这里隐式使用ForkJoinPool.commonPool() )。 始终将待处理计数设置为子任务数并在返回之前立即调用tryComplete()是简单且可靠的(但不是最佳的)。

       public static <E> void forEach(E[] array, Consumer<E> action) { class Task extends CountedCompleter<Void> { final int lo, hi; Task(Task parent, int lo, int hi) { super(parent); this.lo = lo; this.hi = hi; } public void compute() { if (hi - lo >= 2) { int mid = (lo + hi) >>> 1; // must set pending count before fork setPendingCount(2); new Task(this, mid, hi).fork(); // right child new Task(this, lo, mid).fork(); // left child } else if (hi > lo) action.accept(array[lo]); tryComplete(); } } new Task(null, 0, array.length).invoke(); } 
    通过注意在递归的情况下,任务在分支正确的任务后无关,因此可以在返回之前直接调用其左任务来改进此设计。 (这是尾递归删除的类比。)此外,当任务中的最后一个动作是分叉或调用子任务(“尾调用”)时,可以优化对tryComplete()的调用,但代价是待计数看起来“一个接一个”。
       public void compute() { if (hi - lo >= 2) { int mid = (lo + hi) >>> 1; setPendingCount(1); // looks off by one, but correct! new Task(this, mid, hi).fork(); // right child new Task(this, lo, mid).compute(); // direct invoke } else { if (hi > lo) action.accept(array[lo]); tryComplete(); } } 
    作为进一步优化,请注意左任务甚至不需要存在。 我们可以继续使用原始任务,并为每个fork添加一个挂起计数,而不是创建一个新任务。 此外,由于此树中没有任务实现onCompletion(CountedCompleter)方法,因此tryComplete可以替换为propagateCompletion()
       public void compute() { int n = hi - lo; for (; n >= 2; n /= 2) { addToPendingCount(1); new Task(this, lo + n/2, lo + n).fork(); } if (n > 0) action.accept(array[lo]); propagateCompletion(); } 
    当可以预先计算挂起计数时,可以在构造函数中建立它们:
       public static <E> void forEach(E[] array, Consumer<E> action) { class Task extends CountedCompleter<Void> { final int lo, hi; Task(Task parent, int lo, int hi) { super(parent, 31 - Integer.numberOfLeadingZeros(hi - lo)); this.lo = lo; this.hi = hi; } public void compute() { for (int n = hi - lo; n >= 2; n /= 2) new Task(this, lo + n/2, lo + n).fork(); action.accept(array[lo]); propagateCompletion(); } } if (array.length > 0) new Task(null, 0, array.length).invoke(); } 
    对这些类进行额外的优化可能需要专门针对叶子步骤的类,通过比较细分四次,而不是每次迭代两次,并使用自适应阈值而不是总是细分为单个元素。

    搜索。 CountedCompleters树可以在数据结构的不同部分中搜索值或属性,并在找到结果后立即在AtomicReference报告结果。 其他人可以轮询结果以避免不必要的工作。 (你还可以另外cancel其他任务,但通常更简单,更高效,让他们注意到结果已设置,如果是这样,则跳过进一步处理。)再次使用完全分区的数组进行说明(再次,在实践中,叶任务将几乎总是处理多个元素):

       class Searcher<E> extends CountedCompleter<E> { final E[] array; final AtomicReference<E> result; final int lo, hi; Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result, int lo, int hi) { super(p); this.array = array; this.result = result; this.lo = lo; this.hi = hi; } public E getRawResult() { return result.get(); } public void compute() { // similar to ForEach version 3 int l = lo, h = hi; while (result.get() == null && h >= l) { if (h - l >= 2) { int mid = (l + h) >>> 1; addToPendingCount(1); new Searcher(this, array, result, mid, h).fork(); h = mid; } else { E x = array[l]; if (matches(x) && result.compareAndSet(null, x)) quietlyCompleteRoot(); // root task is now joinable break; } } tryComplete(); // normally complete whether or not found } boolean matches(E e) { ... } // return true if found public static <E> E search(E[] array) { return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke(); } } 
    在这个例子中,以及其他,其中将任务有除非没有其他影响compareAndSet一个共同的结果,后无条件调用tryComplete可以是有条件的( if (result.get() == null) tryComplete(); ),因为没有进一步的簿记需要一次根本任务完成管理落成。

    记录子任务。 将多个子任务的结果组合在一起的CountedCompleter任务通常需要在方法onCompletion(CountedCompleter)访问这些结果。 如下面的类所示(执行map-reduce的简化形式,其中映射和缩减都是E类型),在分而治之的设计中执行此操作的一种方法是将每个子任务记录为其兄弟,以便它可以可以通过方法onCompletion访问。 这种技术适用于左右结果组合顺序无关紧要的减少; 有序减少需要明确的左/右指定。 在上述示例中看到的其他流线型的变体也可以适用。

       class MyMapper<E> { E apply(E v) { ... } } class MyReducer<E> { E apply(E x, E y) { ... } } class MapReducer<E> extends CountedCompleter<E> { final E[] array; final MyMapper<E> mapper; final MyReducer<E> reducer; final int lo, hi; MapReducer<E> sibling; E result; MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper, MyReducer<E> reducer, int lo, int hi) { super(p); this.array = array; this.mapper = mapper; this.reducer = reducer; this.lo = lo; this.hi = hi; } public void compute() { if (hi - lo >= 2) { int mid = (lo + hi) >>> 1; MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid); MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi); left.sibling = right; right.sibling = left; setPendingCount(1); // only right is pending right.fork(); left.compute(); // directly execute left } else { if (hi > lo) result = mapper.apply(array[lo]); tryComplete(); } } public void onCompletion(CountedCompleter<?> caller) { if (caller != this) { MapReducer<E> child = (MapReducer<E>)caller; MapReducer<E> sib = child.sibling; if (sib == null || sib.result == null) result = child.result; else result = reducer.apply(child.result, sib.result); } } public E getRawResult() { return result; } public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) { return new MapReducer<E>(null, array, mapper, reducer, 0, array.length).invoke(); } } 
    这里,方法onCompletion采用与组合结果的许多完成设计共同的形式。 这个回调式方法在每个任务中被触发一次,在两个不同的上下文中,其中挂起的计数是或变为零:(1)由任务本身,如果其挂起的计数在调用tryComplete为零,或者(2)当它们完成并将未决计数减少到零时,通过它的任何子任务。 caller参数区分了案例。 通常,当呼叫者是this ,不需要采取任何措施。 否则,可以使用调用者参数(通常通过强制转换)来提供要组合的值(和/或指向其他值的链接)。 假设正确使用挂起计数, onCompletion内的操作在完成任务及其子任务时发生(一次)。 此方法中不需要其他同步,以确保访问此任务的字段或其他已完成任务的线程安全性。

    完成遍历 如果使用onCompletion处理完成不适用或不方便,则可以使用方法firstComplete()nextComplete()创建自定义遍历。 例如,要定义仅以第三个ForEach示例的形式拆分右侧任务的MapReducer,完成必须协同地减少未使用的子任务链接,这可以按如下方式完成:

       class MapReducer<E> extends CountedCompleter<E> { // version 2 final E[] array; final MyMapper<E> mapper; final MyReducer<E> reducer; final int lo, hi; MapReducer<E> forks, next; // record subtask forks in list E result; MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper, MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) { super(p); this.array = array; this.mapper = mapper; this.reducer = reducer; this.lo = lo; this.hi = hi; this.next = next; } public void compute() { int l = lo, h = hi; while (h - l >= 2) { int mid = (l + h) >>> 1; addToPendingCount(1); (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork(); h = mid; } if (h > l) result = mapper.apply(array[l]); // process completions by reducing along and advancing subtask links for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) { for (MapReducer t = (MapReducer)c, s = t.forks; s != null; s = t.forks = s.next) t.result = reducer.apply(t.result, s.result); } } public E getRawResult() { return result; } public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) { return new MapReducer<E>(null, array, mapper, reducer, 0, array.length, null).invoke(); } } 

    触发。 一些CountedCompleters本身从不分叉,而是在其他设计中用作管道; 包括完成一个或多个异步任务触发另一个异步任务的那些。 例如:

       class HeaderBuilder extends CountedCompleter<...> { ... } class BodyBuilder extends CountedCompleter<...> { ... } class PacketSender extends CountedCompleter<...> { PacketSender(...) { super(null, 1); ... } // trigger on second completion public void compute() { } // never called public void onCompletion(CountedCompleter<?> caller) { sendPacket(); } } // sample use: PacketSender p = new PacketSender(); new HeaderBuilder(p, ...).fork(); new BodyBuilder(p, ...).fork(); 
    从以下版本开始:
    1.8
    另请参见:
    Serialized Form
    • 构造方法详细信息

      • CountedCompleter

        protected CountedCompleter​(CountedCompleter<?> completer,
                                   int initialPendingCount)
        使用给定的完成者和初始挂起计数创建一个新的CountedCompleter。
        参数
        completer - 此任务的完成者,或 null如果没有)
        initialPendingCount - 初始待处理计数
      • CountedCompleter

        protected CountedCompleter​(CountedCompleter<?> completer)
        使用给定的完成符创建一个新的CountedCompleter,初始挂起计数为零。
        参数
        completer - 此任务的完成者,如果没有, null
      • CountedCompleter

        protected CountedCompleter()
        创建一个新的CountedCompleter,没有完成者,初始挂起计数为零。
    • 方法详细信息

      • compute

        public abstract void compute()
        该任务执行的主要计算。
      • onCompletion

        public void onCompletion​(CountedCompleter<?> caller)
        调用方法tryComplete()并且挂起计数为零时,或者调用无条件方法complete(T)时执行操作。 默认情况下,此方法不执行任何操作 您可以通过检查给定调用者参数的标识来区分个案。 如果不等于this ,那么它通常是一个子任务,可能包含要组合的结果(和/或其他结果的链接)。
        参数
        caller - 调用此方法的任务(可能是此任务本身)
      • onExceptionalCompletion

        public boolean onExceptionalCompletion​(Throwable ex,
                                               CountedCompleter<?> caller)
        当方法执行一个动作ForkJoinTask.completeExceptionally(Throwable)调用或方法compute()抛出一个异常,而这个任务尚未否则正常完成。 在进入此方法时,此任务ForkJoinTask.isCompletedAbnormally() 此方法的返回值控制进一步传播:如果true并且此任务具有尚未完成的完成true ,则该完成true也会异常完成,但与此完成true相同。 除了返回true之外,此方法的默认实现不执行任何true
        参数
        ex - 例外
        caller - 调用此方法的任务(可能是此任务本身)
        结果
        true如果此异常应传播到此任务的完成者(如果存在)
      • getCompleter

        public final CountedCompleter<?> getCompleter()
        返回在此任务的构造函数中建立的完成 null如果没有,则返回 null
        结果
        完成者
      • getPendingCount

        public final int getPendingCount()
        返回当前挂起的计数。
        结果
        当前待处理的计数
      • setPendingCount

        public final void setPendingCount​(int count)
        将挂起计数设置为给定值。
        参数
        count - 伯爵
      • addToPendingCount

        public final void addToPendingCount​(int delta)
        将(原子地)给定值添加到挂起计数中。
        参数
        delta - 要添加的值
      • compareAndSetPendingCount

        public final boolean compareAndSetPendingCount​(int expected,
                                                       int count)
        仅在当前保持给定预期值时,将待处理计数(原子地)设置为给定计数。
        参数
        expected - 预期值
        count - 新值
        结果
        true如果成功
      • decrementPendingCountUnlessZero

        public final int decrementPendingCountUnlessZero()
        如果挂起的计数非零,则(原子地)递减它。
        结果
        进入此方法时的初始(未减少)挂起计数
      • getRoot

        public final CountedCompleter<?> getRoot()
        返回当前计算的根; 即,如果没有完成者,则执行此任务,否则完成者的根。
        结果
        当前计算的根源
      • tryComplete

        public final void tryComplete()
        如果挂起的计数非零,则递减计数; 否则调用onCompletion(CountedCompleter) ,然后类似地尝试完成此任务的完成,如果存在,则将此任务标记为完成。
      • propagateCompletion

        public final void propagateCompletion()
        等效于tryComplete()但不沿完成路径调用onCompletion(CountedCompleter) :如果挂起计数非零,则递减计数; 否则,类似地尝试完成此任务的完成,如果存在,则将此任务标记为完成。 onCompletion不应该或不需要为每个完成onCompletion调用onCompletion情况下,该方法可能是有用的。
      • complete

        public void complete​(T rawResult)
        无论挂起计数如何,调用onCompletion(CountedCompleter) ,将此任务标记为完成,并在此任务的完成上进一步触发tryComplete() (如果存在)。 给定rawResult作为参数传递给setRawResult(T)调用之前onCompletion(CountedCompleter)或纪念这一任务已完成; 它的值仅对覆盖setRawResult类有意义。 此方法不会修改挂起计数。

        一旦获得任何一个(相对于所有)几个子任务结果,当强制完成时,该方法可能是有用的。 但是,在未覆盖setRawResult的常见(和推荐)情况下,可以使用quietlyCompleteRoot()更简单地获得此效果。

        重写:
        completeForkJoinTask<T>
        参数
        rawResult - 原始结果
      • firstComplete

        public final CountedCompleter<?> firstComplete()
        如果此任务的挂起计数为零,则返回此任务; 否则减少其待处理的计数并返回null 此方法旨在与完成遍历循环中的nextComplete()一起使用。
        结果
        此任务,如果挂起计数为零, null
      • nextComplete

        public final CountedCompleter<?> nextComplete()
        如果此任务没有完成者,则调用ForkJoinTask.quietlyComplete()并返回null 或者,如果完成者的待处理计数非零,则递减计数并返回null 否则,返回完成者。 此方法可用作同类任务层次结构的完成遍历循环的一部分:
           for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) { // ... process c ... } 
        结果
        完成者,如果没有, null
      • quietlyCompleteRoot

        public final void quietlyCompleteRoot()
        相当于 getRoot().quietlyComplete()
      • helpComplete

        public final void helpComplete​(int maxTasks)
        如果此任务尚未完成,则尝试最多处理此任务在完成路径上的给定数量的其他未处理任务(如果已知存在)。
        参数
        maxTasks - 要处理的最大任务数。 如果小于或等于零,则不处理任务。
      • exec

        protected final boolean exec()
        实现CountedCompleters的执行约定。
        Specified by:
        exec在类 ForkJoinTask<T>
        结果
        true如果已知此任务已正常完成
      • getRawResult

        public T getRawResult()
        返回计算结果。 默认情况下,返回null ,这适用于Void操作,但在其他情况下应该被覆盖,几乎总是返回完成时保存结果的字段的字段或函数。
        Specified by:
        getRawResult在类 ForkJoinTask<T>
        结果
        计算的结果
      • setRawResult

        protected void setRawResult​(T t)
        结果承载CountedCompleters的方法可以选择用于帮助维护结果数据。 默认情况下,什么也不做。 不建议覆盖。 但是,如果重写此方法以更新现有对象或字段,则通常必须将其定义为线程安全的。
        Specified by:
        setRawResultForkJoinTask<T>
        参数
        t - 该值