Reactor 之 publishOn 与 subscribeOn

一、概述

在 Spring Reactor 项目中,有两个出镜较少的方法:publishOn subscribeOn。这两个方法的作用是指定执行 Reactive Streaming 的 Scheduler(可理解为线程池)

为何需要指定执行 Scheduler 呢?一个显而易见的原因是:组成一个反应式流的代码有快有慢。

例如 NIO、BIO。如果将这些功能都放在一个线程里执行,快的就会被慢的影响,所以需要相互隔离。这是这两个方法应用的最典型的场景。

二、Scheduler

在介绍 publishOn subscribeOn 方法之前,需要先介绍 Scheduler 这个概念。在 Reactor 中,Scheduler 用来定义执行调度任务的抽象。可以简单理解为线程池,但其实际作用要更多。先简单介绍 Scheduler 的实现:

  • Schedulers.elastic(): 调度器会动态创建工作线程,线程数无上界,类似于 Execturos.newCachedThreadPool()
  • Schedulers.parallel(): 创建固定线程数的调度器,默认线程数等于 CPU 核心数。

三、publishOn 与 subscribeOn

接下来进入正题。先看两个例子(来自 https://github.com/reactor/lite-rx-api-hands-on

publishOn 的例子

Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
    return flux
            .publishOn(Schedulers.elastic())
            .doOnNext(repository::save)
            .then();
}

subscribeOn 的例子

Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
    return Flux.defer(() -> Flux.fromIterable(repository.findAll()))
            .subscribeOn(Schedulers.elastic());
}

这里的 repository 的类型是 BlockingRepository,指的是会导致线程阻塞的数据库操作的集合,例如 JPA、MyBatis 等基于 JDBC 技术实现的 DAO。

  • 在第一个例子中,在执行了 publishOn(Schedulers.elastic()) 之后,repository::save 就会被Schedulers.elastic() 定义的线程池所执行。
  • 在第二个例子中,subscribeOn(Schedulers.elastic()) 的作用类似。它使得 repository.findAll()(也包括 Flux.fromIterable)的执行发生在 Schedulers.elastic() 所定义的线程池中。

从上面的描述看,publishOn subscribeOn 的作用类似,那两者的区别又是什么?

两者的区别

简单说,两者的区别在于影响范围。

publishOn 影响在其之后的 operator 执行的线程池,而 subscribeOn 则会从源头影响整个执行过程。

所以,publishOn 的影响范围和它的位置有关,而 subscribeOn 的影响范围则和位置无关。

看个 publishOnsubscribeOn 同时使用的例子

Flux
    .just("tom")
    .map(s -> {
        System.out.println("[map] Thread name: " + Thread.currentThread().getName());
        return s.concat("@mail.com");
    })
    .publishOn(Schedulers.newElastic("thread-publishOn"))
    .filter(s -> {
        System.out.println("[filter] Thread name: " + Thread.currentThread().getName());
        return s.startsWith("t");
    })
    .subscribeOn(Schedulers.newElastic("thread-subscribeOn"))
    .subscribe(s -> {
        System.out.println("[subscribe] Thread name: " + Thread.currentThread().getName());
        System.out.println(s);
    });

输出结果如下:

[map] Thread name: thread-subscribeOn-3
[filter] Thread name: thread-publishOn-4
[subscribe] Thread name: thread-publishOn-4
tom@mail.com

从上面的例子可以看出,subscribeOn 定义在 publishOn 之后,但是却从源头开始生效。而在 publishOn 执行之后,线程池变更为 publishOn 所定义的。

实际用途

这里介绍 publishOnsubscribeOn 的一种实际用途,那就是反应式编程和传统的,会导致线程阻塞的编程技术混用的场景。其实开头两个例子已经解释了这个场景。

在第一个 publishOn 的例子中,repository::save 会导致线程阻塞,为了避免造成对其它反应式操作的影响,便使用 publishOn 改变其执行线程。

在第二个 subscribeOn 的例子中,repository.findAll() 会导致线程阻塞。但是其是源头的 publisher,因此不能使用 publishOn 改变其 执行线程。这时就需要使用 subscribeOn,在源头上修改其执行线程。

这样,通过 publishOnsubscribeOn 就在反应式编程中实现了线程池隔离的目的,一定程度上避免了会导致线程阻塞的程序执行影响到反应式编程的程序执行效率。

局限性

使用 publishOnsubscribeOn 只能在一定程度上避免反应式编程代码执行的效率被影响。因为用来隔离的线程池资源终归是有限的,比如当出现数据库资源不足、慢查询等问题时,对应的线程池资源如果被耗尽,还是会使整个反应式编程的执行效率受到影响。

目前,Redis、Mongo、Couchbase 等非关系型数据库均有相应的反应式编程的解决方案,但是关系型数据库却没有理想的方案。一个重要原因是 JDBC 本身就是一个阻塞式的 API,根本不可能让其适应反应式编程。因此需要一个新的方案。目前 Oracle 正在推动 ADBA (Asynchronous Database Access API),使得关系型数据库可以满足异步编程的需要。但是,因为是 Oracle 主导,大家都懂的,所以目前前景还不是很明朗。

另外一个技术方案是 Spring 推动的 R2DBC,从名字上来看就很像是 JDBC 在反应式编程领域的对应的解决方案。目前可以支持 PostgreSQL,支持 MySQL 目前还尚需时日。

Spring Reactor 入门

适合阅读的人群:本文适合对 Spring、Netty 等框架,以及 Java 8 的 Lambda、Stream 等特性有基本认识,希望了解 Spring 5 的反应式编程特性的技术人员阅读。

一、前言

最近几年,随着 Node.js、Golang 等新技术、新语言的出现,Java 的服务器端开发语言老大的地位受到了不小的挑战。虽然,Java 的市场份额依旧很大,短时间内也不会改变,但 Java 社区对于挑战也并没有无动于衷。相反,Java 社区积极应对这些挑战,不断提高自身应对高并发服务器端开发场景的能力。

为了应对高并发的服务器端开发,在2009年的时候,微软提出了一个更优雅地实现异步编程的方式 —— Reactive Programming,中文称反应式编程。随后,其它技术也迅速地跟上了脚步,像 ES6 通过 Promise 引入了类似的异步编程方式。Java 社区也没有落后很多,Netflix 和 TypeSafe 公司提供了 RxJava 和 Akka Stream 技术,让 Java 平台也有了能够实现反应式编程的框架。

其实,在更早之前,像 Mina 和 Netty 这样的 NIO 框架其实也能搞定高并发的服务器端开发任务,但这样的技术相对来说只是少数高级开发人员手中的工具。对于更多的普通开发者来说,难度显得大了些,所以不容易普及。

很多年过去了,到了2017年,虽然已经有不少公司在实践反应式编程。但整体来说,应用范围依旧不大。原因在于缺少简单易用的技术将反应式编程推广普及,并同诸如 MVC 框架、HTTP 客户端、数据库技术等整合。

终于,在2017年9月28日,解决上面问题的利器浮出水面 —— Spring 5 正式发布。Spring 5 其最大的意义就是能将反应式编程技术的普及向前推进一大步。而作为在背后支持 Spring 5 反应式编程的框架 Reactor,也相应的发布了 3.1.0 版本。

本文接下来将会向大家介绍 Reactive Programming(反应式编程)、Reactor 的入门以及实践技巧等相关的内容。文章中的实践内容来自作者使用 Spring 5 和 Reactor 等技术改造实际项目的经历。

二、Reactor 简介

先介绍一下 Reactor 技术。Reactor 框架是 Pivotal 公司(开发 Spring 等技术的公司)开发的,实现了 Reactive Programming 思想,符合 Reactive Streams 规范(Reactive Streams 是由 Netflix、TypeSafe、Pivotal 等公司发起的)的一项技术。其名字有反应堆之意,反映了其背后的强大的性能。

Reactive Programming

Reactive Programming,中文称反应式编程,是一种高性能应用的编程方式。其最早是由微软提出并引入到 .NET 平台中,随后 ES6 也引入了类似的技术。在 Java 平台上,较早采用反应式编程技术的是 Netflix 公司开源的 RxJava 框架。现在大家比较熟知的 Hystrix 就是以 RxJava 为基础开发的。

反应式编程其实并不神秘,通过与我们熟悉的迭代器模式对比便可了解其基本思想:

event Iterable (pull) Observable (push)
retrieve data T next() onNext(T)
discover error throws Exception onError(Exception)
complete !hasNext() onCompleted()

上面表格的中的 Observable 那一列便代表反应式编程的 API 使用方式。可见,它就是常见的观察者模式的一种延伸。如果将迭代器看作是拉模式,那观测者模式便是推模式。被订阅者(Publisher)主动的推送数据给订阅者(Subscriber),触发 onNext 方法。异常和完成时触发另外两个方法。如果 Publisher 发布消息太快了,超过了 Subscriber 的处理速度,那怎么办。这就是 Backpressure 的由来,Reactive Programming 框架需要提供机制,使得 Subscriber 能够控制消费消息的速度。

在 Java 平台上,Netflix(开发了 RxJava)、TypeSafe(开发了 Scala、Akka)、Pivatol(开发了 Spring、Reactor)共同制定了一个被称为 Reactive Streams 项目(规范),用于制定反应式编程相关的规范以及接口。其主要的接口有这三个:

  • Publisher
  • Subscriber
  • Subcription

其中,Subcriber 中便包含了上面表格提到的 onNextonErroronCompleted 这三个方法。

对于 Reactive Streams,大家只需要理解其思想就可以,包括基本思想以及 Backpressure 等思想即可。

Imperative vs Reactive

对于上面表格里提到的 Iterable 和 Observale 两种风格,还有另一个称呼,便是 Imperative(指令式编程)和 Reactive(反应式编程)这两种风格。其实就是拉模型和推模型的另一种表述,大家理解其中的思想即可。对于 Imperative,老外写的文章有时会用,直译就是指令式编程,其实就是我们大家平时用 Java、Python 等语言写代码的常见风格,代码执行顺序和编写顺序基本一致(这里不考虑 JVM 指令重排)

Reactor 的主要模块

Reactor 框架主要有两个主要的模块:reactor-core 和 reactor-ipc。前者主要负责 Reactive Programming 相关的核心 API 的实现,后者负责高性能网络通信的实现,目前是基于 Netty 实现的。

Reactor 的主要类

在 Reactor 中,经常使用的类并不是很多,主要有以下两个:

  • Mono 实现了 org.reactivestreams.Publisher 接口,代表0到1个元素的发布者。
  • Flux 同样实现了 org.reactivestreams.Publisher 接口,代表0到N个元素的发表者。

可能会使用到的类

  • Scheduler 表示背后驱动反应式流的调度器,通常由各种线程池实现。

Web Flux

001

Spring 5 引入的一个基于 Netty 而不是 Servlet 的高性能的 Web 框架,但是使用方式并没有同传统的基于 Servlet 的 Spring MVC 有什么大的不同。

▼ Web Flux 中 MVC 接口的示例

@RequestMapping("/demo")
@RestController
public class DemoController {
    @RequestMapping(value = "/foobar")
    public Mono<Foobar> foobar() {
        return Mono.just(new Foobar());
    }
}

最大的变化就是返回值从 Foobar 所表示的一个对象变为 Mono<Foobar> (或 Flux<T>

当然,实际的程序并不会像示例那样就一行代码。关于如何开发实际的应用,这些正是后面介绍 Reactor 的部分所要详细叙述的。

Reactive Streams、Reactor 和 Web Flux

上面介绍了反应式编程的一些概念,以及 Reactor 和 Web Flux。可能读者看到这里有些乱。这里介绍一下三者的关系。其实很简单:

Reactive Streams 是规范,Reactor 实现了 Reactive Streams。Web Flux 以 Reactor 为基础,实现 Web 领域的反应式编程框架。

其实,对于大部分业务开发人员来说,当编写反应式代码时,我们通常只会接触到 Publisher 这个接口,对应到 Reactor 便是 MonoFlux。对于 SubscriberSubcription 这两个接口,Reactor 必然也有相应的实现。但是,这些都是 Web Flux 和 Spring Data Reactive 这样的框架用到的。如果不开发中间件,通常开发人员是不会接触到的。

比如,在 Web Flux,你的方法只需返回 MonoFlux 即可。你的代码基本也只和 MonoFlux 打交道。而 Web Flux 则会实现 SubscriberonNext 时将业务开发人员编写的 MonoFlux 转换为 HTTP Response 返回给客户端。

三、Reactor 入门

接下来介绍一下 Reactor 中 Mono 和 Flux 这两个类中的主要方法的使用。

如同 Java 8 所引入的 Stream 一样,Reactor 的使用方式基本上也是分三步:开始阶段的创建、中间阶段的处理和最终阶段的消费。只不过创建和消费可能是通过像 Spring 5 这样框架完成的(比如通过 Web Flux 中的 WebClient 调用 HTTP 接口,返回值便是一个 Mono)。但我们还是需要基本了解这些阶段的开发方式。

1. 创建 Mono 和 Flux(开始阶段)

使用 Reactor 编程的开始必然是先创建出 Mono 或 Flux。有些时候不需要我们自己创建,而是实现例如 WebFlux 中的 WebClient 或 Spring Data Reactive 得到一个 Mono 或 Flux。

▼ 使用 WebFlux WebClient 调用 HTTP 接口

WebClient webClient = WebClient.create("http://localhost:8080");

public Mono<User> findById(Long userId) {
    return webClient
            .get()
            .uri("/users/" + userId)
            .accept(MediaType.APPLICATION_JSON)
            .exchange()
            .flatMap(cr -> cr.bodyToMono(User.class));
}

▼ 使用 ReactiveMongoRepository 查询 User

public interface UserRepository extends ReactiveMongoRepository<User, Long> {
    Mono<User> findByUsername(String username);
}

但有些时候,我们也需要主动地创建一个 Mono 或 Flux。

“普通”的创建方式

简单的创建方式是主要是使用像 just 这样的方法创建

Mono<String> helloWorld = Mono.just("Hello World");
Flux<String> fewWords = Flux.just("Hello", "World");
Flux<String> manyWords = Flux.fromIterable(words);

这样的创建方式在什么时候用呢?一般是用在当你在经过一系列非 IO 型的操作之后,得到了一个对象。接下来要基于这个对象运用 Reactor 进行高性能的 IO 操作时,可以用这种方式将你之前得到的对象转换为 Mono 或 Flux。

“文艺”的创建方式

上述是我们通过一个同步调用得到的结果创建出 MonoFlux,但有时我们需要从一个非 Reactive 的异步调用的结果创建出 Mono 或 Flux。那如何实现呢。

如果这个异步方法返回一个 CompletableFuture,那我们可以基于这个 CompletableFuture 创建一个 Mono:

Mono.fromFuture(aCompletableFuture);

如果这个异步调用不会返回 CompletableFuture,是有自己的回调方法,那怎么创建 Mono 呢?我们可以使用 static <T> Mono<T> create(Consumer<MonoSink<T>> callback) 方法:

Mono.create(sink -> {
    ListenableFuture<ResponseEntity<String>> entity = asyncRestTemplate.getForEntity(url, String.class);
    entity.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
        @Override
        public void onFailure(Throwable ex) {
            sink.error(ex);
        }

        @Override
        public void onSuccess(ResponseEntity<String> result) {
            sink.success(result.getBody());
        }
    });
});

在使用 WebFlux 之后,AsyncRestTemplate 已经不推荐使用,这里只是做演示。

2. 处理 Mono 和 Flux(中间阶段)

中间阶段的 Mono 和 Flux 的方法主要有 filtermapflatMapthenzipreduce 等。这些方法使用方法和 Stream 中的方法类似。对于这些方法的介绍,将会放在下一节“Reactor 进阶”中,主要介绍这些方法不容易理解和使用容易出问题的点。

3. 消费 Mono 和 Flux(结束阶段)

直接消费的 Mono 或 Flux 的方式就是调用 subscribe 方法。如果在 Web Flux 接口中开发,直接返回 Mono 或 Flux 即可。Web Flux 框架会为我们完成最后的 Response 输出工作。

四、Reactor 进阶

接下来我将介绍一下我在使用 Reactor 开发实际项目时遇到的一些稍显复杂的问题,以及解决方法。

问题一:mapflatMapthen 分别在什么时候使用?

本段内容将涉及到如下类和方法:

  • 方法:Mono.map
  • 方法:Mono.flatMap
  • 方法:Mono.then
  • 类:Function

MonoFlux 中间环节处理的处理过程中,有三个有些类似的方法:mapflatMapthen。这三个方法可以说是 Reactor 中使用频率很高的方法。

▼ 传统的命令式编程

Object result1 = doStep1(params);
Object result2 = doStep2(result1);
Object result3 = doStep3(result2);

▼ 对应的反应式编程

Mono.just(params)
    .flatMap(v -> doStep1(v))
    .flatMap(v -> doStep2(v))
    .flatMap(v -> doStep3(v));

从上面两段代码的对比就很容易看出来 flatMap 方法在其中起到的作用,mapthen 方法也有类似的作用。但这些方法之间的区别是什么呢?我们先来看看这三个方法的签名(以 Mono 为例):

  • flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)
  • map(Function<? super T, ? extends R> mapper)
  • then(Mono<V> other)

可见,最复杂的是 flatMap 方法,map 次之,then 最简单。从方法名字上看,flatMapmap 都是做映射之用。而 then 则是下一步的意思,最适合用于链式调用,但为什么上面的例子使用的是 flatMap 而不是 then

then 表面看上去是下一步的意思,但它只表示执行顺序的下一步,不表示下一步依赖于上一步。这个语义同 ES6 Promise 中的 then 方法是不同的。从 then 方法的参数只是一个 Mono,无从接受上一步的执行结果。flatMapmap 的参数都是一个 Function,入参是上一步的执行结果。

flatMapmap 的区别在于,flatMap 中的入参 Function 的返回值要求是一个 Mono(不明白的复习一下 Function 接口的定义),而 map 的入参 Function 只要求返回一个普通对象。因为我们在业务处理中常需要调用 WebClientReactiveXxxRepository 中的方法,这些方法的返回值都是 Mono(或 Flux)。所以要将这些调用串联为一个整体链式调用,就必须使用 flatMap,而不是 map

所以,我们要正确理解 flatMapmapthen 这三个方法的用法和背后的含义,这样才能正确实践反应式编程。

问题二:如何实现并发执行

本段内容将涉及到如下类和方法:

  • 方法:Mono.zip
  • 类:Tuple2
  • 类:BiFunction

并发执行是常见的一个需求。Reactive Programming 虽然是一种异步编程方式,但是异步不代表就是并发并行的。

在传统的命令式开发方式中,并发执行是通过线程池加 Future 的方式实现的。

Future<Result1> result1Future = doStep1(params);
Future<Result2> result2Future = doStep2(params);
Result1 result1 = result1Future.get();
Result2 result2 = result2Future.get();
// Do merge;
return mergeResult;

因为上面的代码虽然有一些异步效果在里面,但 Future.get() 方法是阻塞的。所以,当我们使用 Reactor 开发有并发执行场景的反应式代码时,肯定不能用上面的方式。这时,需要使用到 MonoFlux 中的 zip 方法。这里我们以 Mono 为例演示。代码如下:

Mono<CustomType1> item1Mono = ...;
Mono<CustomType2> item2Mono = ...;
Mono.zip(items -> {
    CustomType1 item1 = CustomType1.class.cast(items[0]);
    CustomType2 item2 = CustomType2.class.cast(items[1]);
    // Do merge
    return mergeResult;
}, item1Mono, item2Mono);

上述代码中,产生 item1Monoitem2Mono 的过程是并行的。比如,调用一个 HTTP 接口的同时,执行一个数据库查询操作。这样就可以加快程序的执行。

但上述代码存在一个问题,就是 zip 方法需要做强制类型转换。而强制类型转换是不安全的。所以我们需要更优雅的方式。

好在 zip 方法存在多种重载形式。除了最基本的形式以外,还有多种类型安全的形式:

static <T1, T2> Mono<Tuple2<T1, T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2);
static <T1, T2, O> Mono<O> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator); 
static <T1, T2, T3> Mono<Tuple3<T1, T2, T3>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3);

对于不超过7个元素的合并操作,都有类型安全的 zip 方法可选。

以两个元素的合并为例,介绍一下使用方法:

Mono.zip(item1Mono, item2Mono).map(tuple -> {
    CustomType1 item1 = tuple.getT1();
    CustomType2 item2 = tuple.getT2();
    // Do merge
    return mergeResult;
});

上述代码中,map 方法的参数是一个 Tuple2,表示一个二元数组,相应的还有 Tuple3Tuple4 等。

另外,对于两个元素的并发执行,也可以通过 zip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator) 方法直接将结果合并。方法是传递 BiFunction 实现合并算法。

问题三:集合循环之后的汇聚

本段内容将涉及到如下类和方法:

  • 方法:Flux.fromIterable
  • 方法:Flux.reduce
  • 类:BiFunction

另外一个稍微复杂的场景是对一个对象中的一个类型为集合类的(List、Set)进行处理之后,再对原本的对象进行处理。使用 Imperative 风格的代码很容易编写:

List<SubData> subDataList = data.getSubDataList();
for (SubData item : subDataList) {
    // Do something on data and item
}
// Do something on data

是不是简单到无以复加的地步了。但当我们要用 Reactive 风格的代码实现上述逻辑时,就不是那么简单了。

要在 Reactive 风格的代码中实现上述逻辑,我们主要是要用到 Fluxreduce 方法。我们先来看 reduce 方法的签名:

<A> Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator);

从方法签名我们可以看出 reduce 方法的功能就是讲一个 Flux 聚合成一个 Mono。参数中第一个参数是返回值 Mono 中元素的初始值。

第二个参数是一个 BiFunction,用来实现聚合操作的逻辑。泛型参数 <A, ? super T, A> 中,第一个 A 表示每次聚合操作(因为需要对集合中每个元素进行操作)之后的结果的类型,它作为 BiFunction.apply 方法的第一个入参 ;? super T 表示集合中的每个元素,它作为 BiFunction.apply 方法的第二个入参;最后一个 A 表示聚合操作的结果,它作为 BiFunction.apply 方法的返回值。

接下来看一下示例:

Data initData = ...;
List<SubData> aList = ...;
Flux.fromIterable(aList)
    .reduce(initData, (data, itemInList) -> {
        // Do something on data and itemInList
        return data;
    });

上面的示例代码中,initDatadata 的类型相同,我们,但是命名不能重复。执行完上述代码之后, reduce 方法会返回 Mono<Data>

 
Copyright © 2008-2021 lanxinbase.com Rights Reserved. | 粤ICP备14086738号-3 |