- java.lang.Object
-
- java.util.concurrent.Flow
-
public final class Flow extends Object
相互关联的接口和用于建立流量控制组件的静态方法,其中Publishers
产生由一个或多个消费的项目Subscribers
,各由一个管理Subscription
。这些接口对应于reactive-streams规范。 它们适用于并发和分布式异步设置:所有(七种)方法都以
void
“单向”消息样式定义。 通信依赖于简单形式的流控制(方法Flow.Subscription.request(long)
),其可用于避免在“基于推送”的系统中可能发生的资源管理问题。例子。
Flow.Publisher
通常定义自己的Flow.Subscription
实现; 在方法subscribe
构造一个并将其发布到调用Flow.Subscriber
。 它以异步方式向订户发布项目,通常使用Executor
。 例如,这是一个非常简单的发布者,只向单个订阅者发出(如果请求)单个TRUE
项目。 由于订阅者只接收单个项目,因此该类不使用大多数实现中所需的缓冲和排序控制(例如SubmissionPublisher
)。class OneShotPublisher implements Publisher<Boolean> { private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based private boolean subscribed; // true after first subscribe public synchronized void subscribe(Subscriber<? super Boolean> subscriber) { if (subscribed) subscriber.onError(new IllegalStateException()); // only one allowed else { subscribed = true; subscriber.onSubscribe(new OneShotSubscription(subscriber, executor)); } } static class OneShotSubscription implements Subscription { private final Subscriber<? super Boolean> subscriber; private final ExecutorService executor; private Future<?> future; // to allow cancellation private boolean completed; OneShotSubscription(Subscriber<? super Boolean> subscriber, ExecutorService executor) { this.subscriber = subscriber; this.executor = executor; } public synchronized void request(long n) { if (!completed) { completed = true; if (n <= 0) { IllegalArgumentException ex = new IllegalArgumentException(); executor.execute(() -> subscriber.onError(ex)); } else { future = executor.submit(() -> { subscriber.onNext(Boolean.TRUE); subscriber.onComplete(); }); } } } public synchronized void cancel() { completed = true; if (future != null) future.cancel(false); } } }
Flow.Subscriber
安排要求和处理物品。 除非有要求,否则不会发出项目(Flow.Subscriber.onNext(T)
的调用),但可能会要求提供多个项目。 许多订阅者实现可以按照以下示例的样式进行排列,其中缓冲区大小为1个单步,较大的大小通常允许更有效的重叠处理和更少的通信; 例如,值为64,这将使未完成的请求总数保持在32到64之间。由于对给定Flow.Subscription
的订阅者方法调用是严格排序的,因此除非订阅者维护多个订阅,否则这些方法不需要使用锁定或易失性(在在哪种情况下,最好定义多个订阅者,每个订阅者都有自己的订阅)。class SampleSubscriber<T> implements Subscriber<T> { final Consumer<? super T> consumer; Subscription subscription; final long bufferSize; long count; SampleSubscriber(long bufferSize, Consumer<? super T> consumer) { this.bufferSize = bufferSize; this.consumer = consumer; } public void onSubscribe(Subscription subscription) { long initialRequestSize = bufferSize; count = bufferSize - bufferSize / 2; // re-request when half consumed (this.subscription = subscription).request(initialRequestSize); } public void onNext(T item) { if (--count <= 0) subscription.request(count = bufferSize - bufferSize / 2); consumer.accept(item); } public void onError(Throwable ex) { ex.printStackTrace(); } public void onComplete() {} }
默认值
defaultBufferSize()
可以提供一个有用的起点,用于根据预期的费率,资源和用途选择Flow组件中的请求大小和容量。 或者,当永远不需要流量控制时,订户最初可以请求有效无限数量的项目,如:class UnboundedSubscriber<T> implements Subscriber<T> { public void onSubscribe(Subscription subscription) { subscription.request(Long.MAX_VALUE); // effectively unbounded } public void onNext(T item) { use(item); } public void onError(Throwable ex) { ex.printStackTrace(); } public void onComplete() {} void use(T item) { ... } }
- 从以下版本开始:
- 9
-
-
嵌套类汇总
嵌套类 变量和类型 类 描述 static interface
Flow.Processor<T,R>
充当订阅服务器和发布服务器的组件。static interface
Flow.Publisher<T>
订阅者收到的项目(和相关控制消息)的生产者。static interface
Flow.Subscriber<T>
消息的接收者。static interface
Flow.Subscription
消息控制链接Flow.Publisher
和Flow.Subscriber
。
-