标准 专业
多元 极客

Reactor实验室(2)——Java中的响应式编程

致敬ReactorJDK 9新特性。

JDK 9中也引入了这一潮流特性,不过和Reactor3.0不同,JDK 9只引入了一个类,其他全都需要自己编。

核心

Flow

Flow是响应式流编程的核心类,也就是说它既可以创建发送元素的生产者,还可以创建接收元素的消费者。

它提供了几个内部接口:

  • Publisher。函数式编程接口,发布者,满足Reactive Steams编程规范。
  • Subscriber。订阅者,接收消息,同样内置了onSubscribe()、onNext()、onError()和onComplete()方法。
  • Subscription。订阅通道,发布者和订阅者沟通的桥梁。
  • Processor。集发布者和订阅者于一身。

如果你看Reactor一文,上述角色很容易理解清楚。

JDK的目的是让我们自己造轮子,但是还怕我们造不明白,所以在注释中给了一个例子。

我的实现

Publisher

public class MyPublisher implements Flow.Publisher {

    @Override
    public void subscribe(Flow.Subscriber subscriber) {
        MySubscription subscription = new MySubscription(subscriber);
        subscriber.onSubscribe(subscription);
    }
}

Subscription

public class MySubscription implements Flow.Subscription {

    private final ExecutorService executorService = Executors.newSingleThreadExecutor();

    private Flow.Subscriber subscriber;

    private AtomicBoolean isCanceled;

    public MySubscription(Flow.Subscriber subscriber) {
        this.subscriber = subscriber;
        isCanceled = new AtomicBoolean(Boolean.FALSE);
    }

    @Override
    public void request(long n) {
        if (isCanceled.get()) {
            return;
        }

        if (n <= 0) {
            executorService.execute(() -> subscriber.onError(new IllegalArgumentException("请求元素个数错误")));
        }
        System.out.println("MySubscription请求元素个数: " + n);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (var i = 0; i < n; i++) {
            executorService.execute(() -> subscriber.onNext(ThreadLocalRandom.current().nextInt(1000000)));
        }
    }

    @Override
    public void cancel() {
        isCanceled.getAndSet(Boolean.TRUE);
        executorService.shutdown();
        subscriber.onComplete();
    }
}

Subscriber

public class MySubscriber implements Flow.Subscriber {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(10);
    }

    @Override
    public void onNext(Object item) {
        if (item == null) {
            onError(new IllegalArgumentException("上游传递的参数错误"));
        }
        System.out.println(item);
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        request(ThreadLocalRandom.current().nextInt(10));
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println(throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Flow完成");
    }

    public void request(long n) {
        this.subscription.request(n);
    }
}

Test

@Test
public void flowTest() {
    try {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        MyPublisher myPublisher = new MyPublisher();
        MySubscriber mySubscriber = new MySubscriber();
        myPublisher.subscribe(mySubscriber);
        countDownLatch.await();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

Output

MySubscription请求元素个数: 10
924171
MySubscription请求元素个数: 1
239805
MySubscription请求元素个数: 8
634654
MySubscription请求元素个数: 2
456666
MySubscription请求元素个数: 4
657112
MySubscription请求元素个数: 8
928735
MySubscription请求元素个数: 8
547112
MySubscription请求元素个数: 7
774338
MySubscription请求元素个数: 3
774838
MySubscription请求元素个数: 7
623276
MySubscription请求元素个数: 3
994421
MySubscription请求元素个数: 1
516930
MySubscription请求元素个数: 8
85220
MySubscription请求元素个数: 0
852366
MySubscription请求元素个数: 8
280966
MySubscription请求元素个数: 8
401967
MySubscription请求元素个数: 3
444703
MySubscription请求元素个数: 1
906651
MySubscription请求元素个数: 1

Processor

什么,不够尽兴?

继续撸一个Processor吧。

Processor

public class MyProcessor implements Flow.Processor {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(10);
    }

    @Override
    public void onNext(Object item) {
        if (item == null) {
            onError(new IllegalArgumentException("上游传递的参数错误"));
        }
        System.out.println(item);
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        request(ThreadLocalRandom.current().nextInt(10));
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println(throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Flow完成");
    }

    @Override
    public void subscribe(Flow.Subscriber subscriber) {
        MyProcessorSubscription subscription = new MyProcessorSubscription(subscriber);
        subscriber.onSubscribe(subscription);
    }

    public void request(long n) {
        this.subscription.request(n);
    }
}

Subscription

public class MyProcessorSubscription implements Flow.Subscription {

    private final ExecutorService executorService = Executors.newSingleThreadExecutor();

    private Flow.Subscriber subscriber;

    private AtomicBoolean isCanceled;

    public MyProcessorSubscription(Flow.Subscriber subscriber) {
        this.subscriber = subscriber;
        isCanceled = new AtomicBoolean(Boolean.FALSE);
    }

    @Override
    public void request(long n) {
        if (isCanceled.get()) {
            return;
        }

        if (n <= 0) {
            executorService.execute(() -> subscriber.onError(new IllegalArgumentException("请求元素个数错误")));
        }
        System.out.println("MySubscription请求元素个数: " + n);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (var i = 0; i < n; i++) {
            executorService.execute(() -> subscriber.onNext(ThreadLocalRandom.current().nextInt(1000000)));
        }
    }

    @Override
    public void cancel() {
        isCanceled.getAndSet(Boolean.TRUE);
        executorService.shutdown();
        subscriber.onComplete();
    }
}

Test

@Test
public void processorTest() {
    try {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        MyProcessor processor = new MyProcessor();
        processor.subscribe(processor);
        countDownLatch.await();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

Output

MySubscription请求元素个数: 10
54823
MySubscription请求元素个数: 4
441003
MySubscription请求元素个数: 1
162068
MySubscription请求元素个数: 4
853934
MySubscription请求元素个数: 5
860951
MySubscription请求元素个数: 9
245367
MySubscription请求元素个数: 2
977941
MySubscription请求元素个数: 6
724145
MySubscription请求元素个数: 1
951115
MySubscription请求元素个数: 6
806481
MySubscription请求元素个数: 4
593328
MySubscription请求元素个数: 8
523902
MySubscription请求元素个数: 3
826676
MySubscription请求元素个数: 5
82376
MySubscription请求元素个数: 7
668254
MySubscription请求元素个数: 0
102719
MySubscription请求元素个数: 1
129887

 

 

赞(2) 投币

评论 抢沙发

慕勋的实验室慕勋的研究院

码字不容易,路过请投币

支付宝扫一扫

微信扫一扫