致敬Reactor,JDK 9新特性。
JDK 9中也引入了这一潮流特性,不过和Reactor3.0不同,JDK 9只引入了一个类,其他全都需要自己编。
核心
Flow
Flow是响应式流编程的核心类,也就是说它既可以创建发送元素的生产者,还可以创建接收元素的消费者。
它提供了几个内部接口:
- Publisher。函数式编程接口,发布者,满足Reactive Steams编程规范。
- Subscriber。订阅者,接收消息,同样内置了onSubscribe()、onNext()、onError()和onComplete()方法。
- Subscription。订阅通道,发布者和订阅者沟通的桥梁。
- Processor。集发布者和订阅者于一身。
如果你看Reactor一文,上述角色很容易理解清楚。
JDK的目的是让我们自己造轮子,但是还怕我们造不明白,所以在注释中给了一个例子。
我的实现
Publisher
public class MyPublisher implements Flow.Publisher {
@Override
public void subscribe(Flow.Subscriber subscriber) {
MySubscription subscription = new MySubscription(subscriber);
subscriber.onSubscribe(subscription);
}
}
Subscription
public class MySubscription implements Flow.Subscription {
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private Flow.Subscriber subscriber;
private AtomicBoolean isCanceled;
public MySubscription(Flow.Subscriber subscriber) {
this.subscriber = subscriber;
isCanceled = new AtomicBoolean(Boolean.FALSE);
}
@Override
public void request(long n) {
if (isCanceled.get()) {
return;
}
if (n <= 0) {
executorService.execute(() -> subscriber.onError(new IllegalArgumentException("请求元素个数错误")));
}
System.out.println("MySubscription请求元素个数: " + n);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (var i = 0; i < n; i++) {
executorService.execute(() -> subscriber.onNext(ThreadLocalRandom.current().nextInt(1000000)));
}
}
@Override
public void cancel() {
isCanceled.getAndSet(Boolean.TRUE);
executorService.shutdown();
subscriber.onComplete();
}
}
Subscriber
public class MySubscriber implements Flow.Subscriber {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(10);
}
@Override
public void onNext(Object item) {
if (item == null) {
onError(new IllegalArgumentException("上游传递的参数错误"));
}
System.out.println(item);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
request(ThreadLocalRandom.current().nextInt(10));
}
@Override
public void onError(Throwable throwable) {
System.err.println(throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Flow完成");
}
public void request(long n) {
this.subscription.request(n);
}
}
Test
@Test
public void flowTest() {
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MyPublisher myPublisher = new MyPublisher();
MySubscriber mySubscriber = new MySubscriber();
myPublisher.subscribe(mySubscriber);
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
}
Output
MySubscription请求元素个数: 10
924171
MySubscription请求元素个数: 1
239805
MySubscription请求元素个数: 8
634654
MySubscription请求元素个数: 2
456666
MySubscription请求元素个数: 4
657112
MySubscription请求元素个数: 8
928735
MySubscription请求元素个数: 8
547112
MySubscription请求元素个数: 7
774338
MySubscription请求元素个数: 3
774838
MySubscription请求元素个数: 7
623276
MySubscription请求元素个数: 3
994421
MySubscription请求元素个数: 1
516930
MySubscription请求元素个数: 8
85220
MySubscription请求元素个数: 0
852366
MySubscription请求元素个数: 8
280966
MySubscription请求元素个数: 8
401967
MySubscription请求元素个数: 3
444703
MySubscription请求元素个数: 1
906651
MySubscription请求元素个数: 1
Processor
什么,不够尽兴?
继续撸一个Processor吧。
Processor
public class MyProcessor implements Flow.Processor {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(10);
}
@Override
public void onNext(Object item) {
if (item == null) {
onError(new IllegalArgumentException("上游传递的参数错误"));
}
System.out.println(item);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
request(ThreadLocalRandom.current().nextInt(10));
}
@Override
public void onError(Throwable throwable) {
System.err.println(throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Flow完成");
}
@Override
public void subscribe(Flow.Subscriber subscriber) {
MyProcessorSubscription subscription = new MyProcessorSubscription(subscriber);
subscriber.onSubscribe(subscription);
}
public void request(long n) {
this.subscription.request(n);
}
}
Subscription
public class MyProcessorSubscription implements Flow.Subscription {
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private Flow.Subscriber subscriber;
private AtomicBoolean isCanceled;
public MyProcessorSubscription(Flow.Subscriber subscriber) {
this.subscriber = subscriber;
isCanceled = new AtomicBoolean(Boolean.FALSE);
}
@Override
public void request(long n) {
if (isCanceled.get()) {
return;
}
if (n <= 0) {
executorService.execute(() -> subscriber.onError(new IllegalArgumentException("请求元素个数错误")));
}
System.out.println("MySubscription请求元素个数: " + n);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (var i = 0; i < n; i++) {
executorService.execute(() -> subscriber.onNext(ThreadLocalRandom.current().nextInt(1000000)));
}
}
@Override
public void cancel() {
isCanceled.getAndSet(Boolean.TRUE);
executorService.shutdown();
subscriber.onComplete();
}
}
Test
@Test
public void processorTest() {
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MyProcessor processor = new MyProcessor();
processor.subscribe(processor);
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
}
Output
MySubscription请求元素个数: 10
54823
MySubscription请求元素个数: 4
441003
MySubscription请求元素个数: 1
162068
MySubscription请求元素个数: 4
853934
MySubscription请求元素个数: 5
860951
MySubscription请求元素个数: 9
245367
MySubscription请求元素个数: 2
977941
MySubscription请求元素个数: 6
724145
MySubscription请求元素个数: 1
951115
MySubscription请求元素个数: 6
806481
MySubscription请求元素个数: 4
593328
MySubscription请求元素个数: 8
523902
MySubscription请求元素个数: 3
826676
MySubscription请求元素个数: 5
82376
MySubscription请求元素个数: 7
668254
MySubscription请求元素个数: 0
102719
MySubscription请求元素个数: 1
129887