1. About the Documentation

This section provides a brief overview of Reactor reference documentation. You do not need to read this guide in a linear fashion. Each piece stands on its own, though they often refer to other pieces.

The Reactor reference guide is available as HTML documents. The latest copy is available at https://projectreactor.io/docs/core/release/reference/index.html

Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically.

1.2. Contributing to the Documentation

The reference guide is written in Asciidoc, and you can find its sources at https://github.com/reactor/reactor-core/tree/master/docs/asciidoc.

If you have an improvement or a suggestion, we will be happy to get a pull request from you!

We recommend that you check out a local copy of the repository so that you can generate the documentation by running the asciidoctor gradle task and checking the rendering. Some of the sections rely on included files, so GitHub rendering is not always complete.

To facilitate documentation edits, most sections have a link at the end that opens an edit UI directly on GitHub for the main source file for that section. These links are only present in the HTML5 version of this reference guide. They look like the following: Suggest Edit to About the Documentation.

1.3. Getting Help

You can reach out for help in several ways with Reactor:

  • Get in touch with the community on Gitter.

  • Ask a question on stackoverflow.com at project-reactor.

  • Report bugs in Github issues. We closely monitor the following repositories: reactor-core (which covers the essential features) and reactor-addons (which covers reactor-test and adapters issues).

All of Reactor is open source, including this documentation. If you find problems with the docs or if you want to improve them, please get involved.

1.4. Where to Go from Here

2. Getting Started

本章节都内容信息将帮助大家更好的学习理解Reactor,主要包括以下小节内容

2.1. Introducing Reactor

Reactor 是一个给予JVM实现的完全非堵塞的 reactive 编程基础, 具有高效的 demand 管理 (以 "`backpressure`"的形式管理). 它们直接与Java 8 的 functional APIs 就行了整合, 特别整合了 CompletableFuture, Stream, and Duration. 提供了一系列的可组合的异步 APIs — Flux (为了 [N] 元素) and Mono (为了 [0|1] 元素) — 并大量的实现了 Reactive Streams 规范要求.

Reactor 还支持与给予`reactor-netty` 的项目进行非堵塞的进程间的通信 . 适用于微服务架构, Reactor Netty 为 HTTP (包括 Websockets), TCP, and UDP 提供了支持 backpressure 的网络引擎. Reactive 的编码与解码方式是完全支持.

2.2. Prerequisites

Reactor Core runs on Java 8 and above.

It has a transitive dependency on org.reactivestreams:reactive-streams:1.0.3.

Android Support
  • Reactor 3 does not officially support or target Android (consider using RxJava 2 if such support is a strong requirement).

  • However, it should work fine with Android SDK 26 (Android O) and above.

  • We are open to evaluating changes that benefit Android support in a best-effort fashion. However, we cannot make guarantees. Each decision must be made on a case-by-case basis.

2.3. Understanding the BOM

Reactor 3 uses a BOM (Bill of Materials) model (since reactor-core 3.0.4, with the Aluminium release train). This curated list groups artifacts that are meant to work well together, providing the relevant versions despite potentially divergent versioning schemes in these artifacts.

The BOM is itself versioned, using a release train scheme with a codename followed by a qualifier. The following list shows a few examples:

Aluminium-RELEASE
Californium-BUILD-SNAPSHOT
Aluminium-SR1
Bismuth-RELEASE
Californium-SR32

The codenames represent what would traditionally be the MAJOR.MINOR number. They (mostly) come from the Periodic Table of Elements, in increasing alphabetical order.

The qualifiers are (in chronological order):

  • BUILD-SNAPSHOT: Builds for development and testing.

  • M1..N: Milestones or developer previews.

  • RELEASE: The first GA (General Availability) release in a codename series.

  • SR1..N: The subsequent GA releases in a codename series — equivalent to a PATCH number. (SR stands for “Service Release”).

2.4. Getting Reactor

As mentioned earlier, the easiest way to use Reactor in your core is to use the BOM and add the relevant dependencies to your project. Note that, when you add such a dependency, you must omit the version so that the version gets picked up from the BOM.

However, if you want to force the use of a specific artifact’s version, you can specify it when adding your dependency, as you usually would. You can also forgo the BOM entirely and specify dependencies by their artifact versions.

2.4.1. Maven Installation

Maven natively supports the BOM concept. First, you need to import the BOM by adding the following snippet to your pom.xml:

<dependencyManagement> (1)
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>Bismuth-RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
1 Notice the dependencyManagement tag. This is in addition to the regular dependencies section.

If the top section (dependencyManagement) already exists in your pom, add only the contents.

Next, add your dependencies to the relevant reactor projects, as usual, except without a <version>, as follows:

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId> (1)
        (2)
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId> (3)
        <scope>test</scope>
    </dependency>
</dependencies>
1 Dependency on the core library.
2 No version tag here.
3 reactor-test provides facilities to unit test reactive streams.

2.4.2. Gradle Installation

Prior to version 5.0, Gradle has no core support for Maven BOMs, but you can use Spring’s gradle-dependency-management plugin.

First, apply the plugin from the Gradle Plugin Portal, as follows:

plugins {
    id "io.spring.dependency-management" version "1.0.7.RELEASE" (1)
}
1 as of this writing, 1.0.7.RELEASE is the latest version of the plugin. Check for updates.

Then use it to import the BOM, as follows:

dependencyManagement {
     imports {
          mavenBom "io.projectreactor:reactor-bom:Bismuth-RELEASE"
     }
}

Finally add a dependency to your project, without a version number, as follows:

dependencies {
     implementation 'io.projectreactor:reactor-core' (1)
}
1 There is no third : separated section for the version. It is taken from the BOM.

Since Gradle 5.0, you can use the native Gradle support for BOMs:

dependencies {
     implementation platform('io.projectreactor:reactor-bom:Bismuth-RELEASE')
     implementation 'io.projectreactor:reactor-core' (1)
}
1 There is no third : separated section for the version. It is taken from the BOM.

2.4.3. Milestones and Snapshots

Milestones and developer previews are distributed through the Spring Milestones repository rather than Maven Central. To add it to your build configuration file, use the following snippet:

Example 1. Milestones in Maven
<repositories>
	<repository>
		<id>spring-milestones</id>
		<name>Spring Milestones Repository</name>
		<url>https://repo.spring.io/milestone</url>
	</repository>
</repositories>

For Gradle, use the following snippet:

Example 2. Milestones in Gradle
repositories {
  maven { url 'https://repo.spring.io/milestone' }
  mavenCentral()
}

Similarly, snapshots are also available in a separate dedicated repository, as the following example show:

Example 3. BUILD-SNAPSHOTs in Maven
<repositories>
	<repository>
		<id>spring-snapshots</id>
		<name>Spring Snapshot Repository</name>
		<url>https://repo.spring.io/snapshot</url>
	</repository>
</repositories>
Example 4. BUILD-SNAPSHOTs in Gradle
repositories {
  maven { url 'https://repo.spring.io/snapshot' }
  mavenCentral()
}

3. Introduction to Reactive Programming

Reactor 是反应式编程范例的实现,可以总结如下:

Reactive 编程是一种异步编程范式,它涉及数据流和变化的传播. 现在可以通过所采用的编程语言轻松表达静态(例如数组)或动态(例如事件发射器)数据流。
— https://en.wikipedia.org/wiki/Reactive_programming

As a first step in the direction of reactive programming, Microsoft created the Reactive Extensions (Rx) library in the .NET ecosystem. Then RxJava implemented reactive programming on the JVM. As time went on, a standardization for Java emerged through the Reactive Streams effort, a specification that defines a set of interfaces and interaction rules for reactive libraries on the JVM. Its interfaces have been integrated into Java 9 under the Flow class.

作为响应式编程方向上的第一步,Microsoft在.NET生态系统中创建了响应式扩展(Rx)库。 然后RxJava在JVM上实现了反应式编程。随着时间的流逝,通过Reactive Streams的努力出现了Java的标准化, 该规范定义了JVM上的响应库的一组接口和交互规则。它的接口已集成到了Java 9中的Flow类下。

The reactive programming paradigm is often presented in object-oriented languages as an extension of the Observer design pattern. You can also compare the main reactive streams pattern with the familiar Iterator design pattern, as there is a duality to the Iterable-Iterator pair in all of these libraries. One major difference is that, while an Iterator is pull-based, reactive streams are push-based.

反应式编程范例通常以面向对象的语言表示,像是一个Observer设计模式的扩展。 您还可以将主要的反应流模式与熟悉的Iterator设计模式进行比较,因为所有这些库中的Iterable- Iterator对都有双重性 。 一个主要的区别是,虽然Iterator是基于pull的,但是反应流却是基于push的。

Using an iterator is an imperative programming pattern, even though the method of accessing values is solely the responsibility of the Iterable. Indeed, it is up to the developer to choose when to access the next() item in the sequence. In reactive streams, the equivalent of the above pair is Publisher-Subscriber. But it is the Publisher that notifies the Subscriber of newly available values as they come, and this push aspect is the key to being reactive. Also, operations applied to pushed values are expressed declaratively rather than imperatively: The programmer expresses the logic of the computation rather than describing its exact control flow.

使用迭代器是命令式编程模式,即使访问值的方法仅由负责Iterable。 实际上,由开发人员决定何时选择next()序列中的项目。 在反应流中,上述对的等价物为Publisher-Subscriber。 但是,当 Publisher新可用值出现时,正是通知订户,而这一推动方面是做出反应的关键。 同样,应用于推入值的操作以声明方式而不是命令方式表示:程序员表示计算的逻辑,而不是描述其确切的控制流程。

In addition to pushing values, the error-handling and completion aspects are also covered in a well defined manner. A Publisher can push new values to its Subscriber (by calling onNext) but can also signal an error (by calling onError) or completion (by calling onComplete). Both errors and completion terminate the sequence. This can be summed up as follows:

除了推送值之外,还以明确定义的方式涵盖了错误处理和完成方面。 A Publisher可以Subscriber(通过调用onNext)将新值推入其值,但也可以发出错误信号(调用onError)或完成信号(通过调用onComplete)。 错误和完成都会终止序列。可以总结如下:

onNext x 0..N [onError | onComplete]

This approach is very flexible. The pattern supports use cases where there is no value, one value, or n values (including an infinite sequence of values, such as the continuing ticks of a clock).

这种方法非常灵活。该模式支持没有值,一个值或n个值(包括无限个有值序列,例如时钟的连续滴答声)的用例。

But why do we need such an asynchronous reactive library in the first place?

但是为什么我们首先需要这样的异步反应式库?

3.1. Blocking Can Be Wasteful

Modern applications can reach huge numbers of concurrent users, and, even though the capabilities of modern hardware have continued to improve, performance of modern software is still a key concern.

现代应用程序可以吸引大量的并发用户,即使现代硬件的功能不断提高,现代软件的性能仍然是关键问题。

There are, broadly, two ways one can improve a program’s performance:

  • parallelize to use more threads and more hardware resources.

  • seek more efficiency in how current resources are used.

广义上讲,有两种方法可以提高程序的性能:

并行使用更多线程和更多硬件资源。

在使用现有资源方面寻求更高的效率。

Usually, Java developers write programs by using blocking code. This practice is fine until there is a performance bottleneck. Then it is time to introduce additional threads, running similar blocking code. But this scaling in resource utilization can quickly introduce contention and concurrency problems.

通常,Java开发人员通过使用阻塞代码来编写程序。除非存在性能瓶颈,否则这种做法很好。 然后是时候引入其他线程,运行类似的阻塞代码了。但是这种资源利用的扩展会迅速引入竞争和并发问题。

Worse still, blocking wastes resources. If you look closely, as soon as a program involves some latency (notably I/O, such as a database request or a network call), resources are wasted because threads (possibly many threads) now sit idle, waiting for data.

更糟糕的是,阻塞会浪费资源。 如果仔细观察,程序一旦遇到一些延迟(特别是I / O,例如数据库请求或网络调用),就会浪费资源, 因为线程(可能有很多线程)现在处于空闲状态,等待数据。

So the parallelization approach is not a silver bullet. It is necessary to access the full power of the hardware, but it is also complex to reason about and susceptible to resource wasting.

因此,并行化方法不是灵丹妙药。有必要访问硬件的全部功能,但是推理和资源浪费也很复杂。

3.2. Asynchronicity to the Rescue?

The second approach mentioned earlier, seeking more efficiency, can be a solution to the resource wasting problem. By writing asynchronous, non-blocking code, you let the execution switch to another active task that uses the same underlying resources and later comes back to the current process when the asynchronous processing has finished.

前面提到的第二种方法,寻求更高的效率,可以解决资源浪费的问题。 通过编写异步的非阻塞代码,您可以将执行切换到使用相同基础资源的另一个活动任务, 并在异步处理完成后返回到当前进程。

But how can you produce asynchronous code on the JVM? Java offers two models of asynchronous programming:

但是如何在JVM上生成异步代码?Java提供了两种异步编程模型:

  • Callbacks: Asynchronous methods do not have a return value but take an extra callback parameter (a lambda or anonymous class) that gets called when the result is available. A well known example is Swing’s EventListener hierarchy.

  • Futures: Asynchronous methods immediately return a Future<T>. The asynchronous process computes a T value, but the Future object wraps access to it. The value is not immediately available, and the object can be polled until the value is available. For instance, an ExecutorService running Callable<T> tasks use Future objects.

  • Callbacks: 异步方法没有返回值,但带有一个额外的 callback参数(lambda或匿名类),该参数在结果可用时被调用。一个著名的例子是Swing的EventListener层次结构.

  • Futures: 异步方法立即返回Future<T>。异步过程计算一个T值,但是Future对象包装了对其的访问。该值不是立即可用的,并且可以轮询该对象,直到该值可用为止。例如,ExecutorService正在运行的Callable<T>任务使用Future对象.

Are these techniques good enough? Not for every use case, and both approaches have limitations.

这些技术够好吗?并非针对每个用例,这两种方法都有局限性。

Callbacks are hard to compose together, quickly leading to code that is difficult to read and maintain (known as “Callback Hell”).

回调很难组合在一起,迅速导致难以阅读和维护的代码(称为“回调地狱”)。

Consider an example: showing the top five favorites from a user on the UI or suggestions if she does not have a favorite. This goes through three services (one gives favorite IDs, the second fetches favorite details, and the third offers suggestions with details), as follows:

考虑一个示例:在用户界面上显示用户的前五个收藏夹,如果没有收藏夹则显示建议。这需要三项服务(一项提供喜欢的ID,第二项获取喜欢的详细信息,第三项提供带有详细信息的建议),如下所示:

Example 5. Example of Callback Hell
userService.getFavorites(userId, new Callback<List<String>>() { (1)
  public void onSuccess(List<String> list) { (2)
    if (list.isEmpty()) { (3)
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { (4)
          UiUtils.submitOnUiThread(() -> { (5)
            list.stream()
                .limit(5)
                .forEach(uiList::show); (6)
            });
        }

        public void onError(Throwable error) { (7)
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() (8)
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, (9)
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});
1 我们有基于回调的服务:一种Callback接口,该接口的方法在异步过程成功时被调用,在错误发生时被调用。.
2 第一个服务使用收藏夹ID列表调用其回调.
3 如果列表为空,则必须转到suggestionService.
4 在suggestionService给出了一个List<Favorite>到第二个回调.
5 由于我们处理的是UI,因此我们需要确保使用的代码在UI线程中运行.
6 我们使用Java 8 Stream将处理的建议数限制为五个,并在UI的图形列表中显示它们.
7 在每个级别,我们以相同的方式处理错误:在弹出窗口中显示它们.
8 返回收藏夹ID级别。如果服务返回了完整列表,则需要转到favoriteService以获取详细的Favorite对象。由于我们只需要五个,因此我们首先传输ID列表以将其限制为五个.
9 再一次,回调。这次,我们得到了一个完整的Favorite对象,我们将该对象压入UI线程中的UI.

That is a lot of code, and it is a bit hard to follow and has repetitive parts. Consider its equivalent in Reactor:

那是很多代码,很难遵循并且包含重复的部分。考虑它在Reactor中的等效功能:

Example 6. Example of Reactor code equivalent to callback code
userService.getFavorites(userId) (1)
           .flatMap(favoriteService::getDetails) (2)
           .switchIfEmpty(suggestionService.getSuggestions()) (3)
           .take(5) (4)
           .publishOn(UiUtils.uiThreadScheduler()) (5)
           .subscribe(uiList::show, UiUtils::errorPopup); (6)
1 我们从收藏夹ID的流开始.
2 我们将这些异步转换为详细的Favorite对象(flatMap)。现在我们有一个流程Favorite.
3 如果的流程Favorite为空,则通过切换到后备广告 suggestionService.
4 我们最多只对结果流中的五个元素感兴趣.
5 最后,我们要处理UI线程中的每个数据.
6 我们通过描述如何处理数据的最终形式(在UI列表中显示)以及发生错误的情况(显示弹出窗口)来触发流程.

What if you want to ensure the favorite IDs are retrieved in less than 800ms or, if it takes longer, get them from a cache? In the callback-based code, that is a complicated task. In Reactor it becomes as easy as adding a timeout operator in the chain, as follows:

如果要确保在少于800毫秒的时间内检索喜欢的ID,或者如果花费更长的时间我们就从缓存中获取它们,该怎么办? 在基于回调的代码中,这是一项复杂的任务。在Reactor中,就像在链中添加一个timeout运算符一样容易,如下所示:

Example 7. Example of Reactor code with timeout and fallback
userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800)) (1)
           .onErrorResume(cacheService.cachedFavoritesFor(userId)) (2)
           .flatMap(favoriteService::getDetails) (3)
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);
1 如果以上部分在800ms内没有发出任何光,则传播一个错误.
2 如果发生错误,请退回到cacheService.
3 链的其余部分与前面的示例相似.

Future objects are a bit better than callbacks, but they still do not do well at composition, despite the improvements brought in Java 8 by CompletableFuture. Orchestrating multiple Future objects together is doable but not easy. Also, Future has other problems:

Future对象比回调要好一些,但是尽管Java 8带来了改进,但它们在组合方面仍然表现不佳CompletableFuture。 Future一起编排多个 对象是可行的,但并不容易。另外,Future还有其他问题:

  • It is easy to end up with another blocking situation with Future objects by calling the get() method.

  • They do not support lazy computation.

  • They lack support for multiple values and advanced error handling.

  • Future通过调用该get()方法很容易导致对象的另一种阻塞情况.

  • 它们不支持惰性计算.

  • 他们缺乏对多个值和高级错误处理的支持.

Consider another example: We get a list of IDs from which we want to fetch a name and a statistic and combine these pair-wise, all of it asynchronously. The following example does so with a list of type CompletableFuture:

再看一个例子:我们得到一个ID列表,我们要从中获取一个名称和一个统计信息,并将它们成对组合,所有这些信息都是异步的。 以下示例使用类型列表进行操作CompletableFuture:

Example 8. Example of CompletableFuture combination
CompletableFuture<List<String>> ids = ifhIds(); (1)

CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { (2)
	Stream<CompletableFuture<String>> zip =
			l.stream().map(i -> { (3)
				CompletableFuture<String> nameTask = ifhName(i); (4)
				CompletableFuture<Integer> statTask = ifhStat(i); (5)

				return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); (6)
			});
	List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); (7)
	CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);

	CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); (8)
	return allDone.thenApply(v -> combinationList.stream()
			.map(CompletableFuture::join) (9)
			.collect(Collectors.toList()));
});

List<String> results = result.join(); (10)
assertThat(results).contains(
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
1 我们从一个可以为我们提供id价值清单的Future开始.
2 一旦获得列表,我们想开始更深层次的异步处理.
3 对于列表中的每个元素:
4 异步获取关联名称.
5 异步获取关联的任务.
6 合并两个结果.
7 现在,我们有了代表所有组合任务的期货清单。要执行这些任务,我们需要将列表转换为数组.
8 将数组传递给CompletableFuture.allOf,在所有任务完成后输出CompletableFuture.
9 棘手的一点是allOfreturn CompletableFuture<Void>,因此我们在期货列表上重申,通过使用收集其结果join() (此处不会阻塞,因为allOf确保了期货全部完成了).
10 一旦整个异步管道被触发,我们等待它被处理并返回结果列表.

Since Reactor has more combination operators out of the box, this process can be simplified, as follows:

由于Reactor提供了更多组合运算符,因此可以简化此过程,如下所示

Example 9. Example of Reactor code equivalent to future code
Flux<String> ids = ifhrIds(); (1)

Flux<String> combinations =
		ids.flatMap(id -> { (2)
			Mono<String> nameTask = ifhrName(id); (3)
			Mono<Integer> statTask = ifhrStat(id); (4)

			return nameTask.zipWith(statTask, (5)
					(name, stat) -> "Name " + name + " has stats " + stat);
		});

Mono<List<String>> result = combinations.collectList(); (6)

List<String> results = result.block(); (7)
assertThat(results).containsExactly( (8)
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
1 这次,我们从ids(a Flux<String>)的异步提供序列开始.
2 对于序列中的每个元素,我们(在主体flatMap调用的函数内部)异步处理两次.
3 获取关联的名称.
4 获取相关的统计信息.
5 异步组合两个值.
6 当值可用完成可用时,将值汇总到List中.
7 在生产中,我们将Flux通过进一步组合或订阅来继续异步处理。很可能会返回result Mono。由于我们正在测试中,因此我们改为阻塞,等待处理完成,然后直接返回汇总的值列表.
8 声明结果.

The perils of using callbacks and Future objects are similar and are what reactive programming addresses with the Publisher-Subscriber pair.

使用回调和Future对象的风险是相似的,这是响应式编程Publisher-Subscriber它们一起要解决的问题

3.3. From Imperative to Reactive Programming

Reactive libraries, such as Reactor, aim to address these drawbacks of “classic” asynchronous approaches on the JVM while also focusing on a few additional aspects:

反应性库(例如Reactor)旨在解决JVM上“经典”异步方法的这些缺点,同时还着重于其他一些方面:

  • Composability and readability

  • Data as a flow manipulated with a rich vocabulary of operators

  • Nothing happens until you subscribe

  • Backpressure or the ability for the consumer to signal the producer that the rate of emission is too high

  • High level but high value abstraction that is concurrency-agnostic

  • 可组合性和可读性

  • 以丰富的运算符词汇操纵数据流

  • subscribe之前没有任何反应

  • Backpressure 或消费者向生产者发出排放速率过高信号的能力

  • 并发不可知的高级但高价值的抽象

3.3.1. Composability and Readability

By “composability”, we mean the ability to orchestrate multiple asynchronous tasks, in which we use results from previous tasks to feed input to subsequent ones. Alternatively, we can run several tasks in a fork-join style. In addition, we can reuse asynchronous tasks as discrete components in a higher-level system.

The ability to orchestrate tasks is tightly coupled to the readability and maintainability of code. As the layers of asynchronous processes increase in both number and complexity, being able to compose and read code becomes increasingly difficult. As we saw, the callback model is simple, but one of its main drawbacks is that, for complex processes, you need to have a callback executed from a callback, itself nested inside another callback, and so on. That mess is known as “Callback Hell”. As you can guess (or know from experience), such code is pretty hard to go back to and reason about.

Reactor offers rich composition options, wherein code mirrors the organization of the abstract process, and everything is generally kept at the same level (nesting is minimized).

3.3.2. The Assembly Line Analogy

You can think of data processed by a reactive application as moving through an assembly line. Reactor is both the conveyor belt and the workstations. The raw material pours from a source (the original Publisher) and ends up as a finished product ready to be pushed to the consumer (or Subscriber).

您可以将反应式应用程序处理的数据视为流水线。反应堆既是传送带又是工作站。原材料从来源(原始Publisher)倾泻而出,最终成为准备好推向消费者(或Subscriber)的成品

The raw material can go through various transformations and other intermediary steps or be part of a larger assembly line that aggregates intermediate pieces together. If there is a glitch or clogging at one point (perhaps boxing the products takes a disproportionately long time), the afflicted workstation can signal upstream to limit the flow of raw material.

原材料可以经过各种转换和其他中间步骤,也可以成为将中间件聚集在一起的较大装配线的一部分。如果某一点出现故障或堵塞(也许装箱产品花费的时间过长),那么受灾的工作站可以向上游发出信号,以限制原材料的流动

3.3.3. Operators

In Reactor, operators are the workstations in our assembly analogy. Each operator adds behavior to a Publisher and wraps the previous step’s Publisher into a new instance. The whole chain is thus linked, such that data originates from the first Publisher and moves down the chain, transformed by each link. Eventually, a Subscriber finishes the process. Remember that nothing happens until a Subscriber subscribes to a Publisher, as we see shortly.

在Reactor中,操作员是我们装配类比中的工作站。每个运算符都将行为添加到Publisher,并将上一步包装Publisher到新实例中。 因此,整个链被链接在一起,这样数据就从第一个Publisher链开始并向下移动,并由每个链接转换。 最终,Subscriber完成该过程。请记住,直到a Subscriber订阅了Publisher,什么都不会发生,正如我们很快看到的那样。

Understanding that operators create new instances can help you avoid a common mistake that would lead you to believe that an operator you used in your chain is not being applied. See this item in the FAQ.

While the Reactive Streams specification does not specify operators at all, one of the best added values of reactive libraries, such as Reactor, is the rich vocabulary of operators that they provide. These cover a lot of ground, from simple transformation and filtering to complex orchestration and error handling.

虽然反应式流规范根本没有指定运算符, 但是反应式库的最佳附加值之一(例如Reactor)是它们提供的运算符的丰富词汇表。 从简单的转换和过滤到复杂的编排和错误处理,这些内容涉及很多领域

3.3.4. Nothing Happens Until You subscribe()

In Reactor, when you write a Publisher chain, data does not start pumping into it by default. Instead, you create an abstract description of your asynchronous process (which can help with reusability and composition).

在Reactor中,当您编写Publisher链时,默认情况下不会开始将数据泵入链中。 相反,您可以创建异步过程的抽象描述(这有助于重用和组合)。

By the act of subscribing, you tie the Publisher to a Subscriber, which triggers the flow of data in the whole chain. This is achieved internally by a single request signal from the Subscriber that is propagated upstream, all the way back to the source Publisher.

通过subscribing操作,您可以将绑定Publisher到Subscriber,从而触发整个链中的数据流。 这是通过request 来自的信号在内部实现的,该信号在Subscriber上游传播,一直传回到信号源 Publisher。

3.3.5. Backpressure

Propagating signals upstream is also used to implement backpressure, which we described in the assembly line analogy as a feedback signal sent up the line when a workstation processes more slowly than an upstream workstation.

上游传播的信号也用于实现背压,我们在组装流水线中将其描述为当工作站的处理速度比上游工作站慢时,沿生产线向上发送的反馈信号。

The real mechanism defined by the Reactive Streams specification is pretty close to the analogy: A subscriber can work in unbounded mode and let the source push all the data at its fastest achievable rate or it can use the request mechanism to signal the source that it is ready to process at most n elements.

Reactive Streams规范定义的实际机制与类推非常接近: subscriber 可以以无界模式工作,并让源以最快可达到的速率推送所有数据,或者可以使用该request机制向源发出已准备就绪的信号处理最多的n元素。

Intermediate operators can also change the request in-transit. Imagine a buffer operator that groups elements in batches of ten. If the subscriber requests one buffer, it is acceptable for the source to produce ten elements. Some operators also implement prefetching strategies, which avoid request(1) round-trips and is beneficial if producing the elements before they are requested is not too costly.

中间的Intermediate还可以在途中更改请求。想象一下,一个buffer 运算符将元素以十个为一组进行分组。 如果subscriber请求一个缓冲区,则源产生十个元素是可以接受的。 一些operators还实施了 预取策略,该策略可避免request(1)往返,并且如果在请求之前生产元素的成本不太高的话,则是有益的。

This transforms the push model into a push-pull hybrid, where the downstream can pull n elements from upstream if they are readily available. But if the elements are not ready, they get pushed by the upstream whenever they are produced.

这会将推模型转换为推拉混合模型,如果下游可以随时使用,则下游可以从上游拉取n个元素。 但是,如果元素尚未准备就绪,则每当它们被生产时就会被上游推出。

3.3.6. Hot vs Cold

The Rx family of reactive libraries distinguishes two broad categories of reactive sequences: hot and cold. This distinction mainly has to do with how the reactive stream reacts to subscribers:

Rx反应库的家族将反应序列分为两大类:热和冷。这种区别主要与反应流对订户的反应有关:

  • A Cold sequence starts anew for each Subscriber, including at the source of data. For example, if the source wraps an HTTP call, a new HTTP request is made for each subscription.

  • A Hot sequence does not start from scratch for each Subscriber. Rather, late subscribers receive signals emitted after they subscribed. Note, however, that some hot reactive streams can cache or replay the history of emissions totally or partially. From a general perspective, a hot sequence can even emit when no subscriber is listening (an exception to the “nothing happens before you subscribe” rule).

  • 每一个冷序列都重新开始Subscriber,包括在数据源处。例如,如果源包装了一个HTTP调用,则会为每个订阅发出一个新的HTTP请求.

  • 每个Hot序列都不是从头开始的Subscriber。相反,后期用户接收发出的信号后,他们认购。但是请注意,某些热反应流可以全部或部分缓存或重放排放历史。从一般的角度来看,即使没有订阅者在收听,热序列甚至会发出(“订阅之前什么也没有发生”规则的例外).

For more information on hot vs cold in the context of Reactor, see this reactor-specific section.

4. Reactor Core Features

The Reactor project main artifact is reactor-core, a reactive library that focuses on the Reactive Streams specification and targets Java 8.

Reactor项目的主要工件是reactor-core一个反应式库,该库侧重于Reactive Streams规范,并针对Java 8。

Reactor introduces composable reactive types that implement Publisher but also provide a rich vocabulary of operators: Flux and Mono. A Flux object represents a reactive sequence of 0..N items, while a Mono object represents a single-value-or-empty (0..1) result.

Reactor引入了可实现的可反应类型,这些可实现Publisher但也提供了丰富的运算符词汇:Flux和Mono。 Flux对象表示的0..N项的反应序列,而一个Mono对象表示单值或空(0..1)的结果。

This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing. For instance, an HTTP request produces only one response, so there is not much sense in doing a count operation. Expressing the result of such an HTTP call as a Mono<HttpResponse> thus makes more sense than expressing it as a Flux<HttpResponse>, as it offers only operators that are relevant to a context of zero items or one item.

这种区别在类型中包含了一些语义信息,表明了异步处理的粗略基数。 例如,一个HTTP请求仅产生一个响应,因此进行count操作没有太大意义。 因此,将HTTP调用的结果表示为 Mono<HttpResponse>比将其表示为Flux<HttpResponse>更有意义,因为它仅提供与零项或一个项的上下文相关的运算符。

Operators that change the maximum cardinality of the processing also switch to the relevant type. For instance, the count operator exists in Flux, but it returns a Mono<Long>.

更改处理最大基数的运算符也将切换到相关类型。例如,count运算符存在于中Flux,但返回 Mono<Long>。

4.1. Flux, an Asynchronous Sequence of 0-N Items

The following image shows how a Flux transforms items:

Flux

A Flux<T> is a standard Publisher<T> that represents an asynchronous sequence of 0 to N emitted items, optionally terminated by either a completion signal or an error. As in the Reactive Streams spec, these three types of signal translate to calls to a downstream Subscriber’s onNext, onComplete, and onError methods.

Flux<T>是Publisher<T>代表0到N个发射项目的异步序列的标准,可以选择通过完成信号或错误终止。 如反应式流规范中,这三种类型的信号转换为呼叫到下游Subscriber的onNext,onComplete和onError方法。

With this large scope of possible signals, Flux is the general-purpose reactive type. Note that all events, even terminating ones, are optional: no onNext event but an onComplete event represents an empty finite sequence, but remove the onComplete and you have an infinite empty sequence (not particularly useful, except for tests around cancellation). Similarly, infinite sequences are not necessarily empty. For example, Flux.interval(Duration) produces a Flux<Long> that is infinite and emits regular ticks from a clock.

在这么大范围的可能信号中,Flux是通用信号类型。 请注意,所有事件,甚至是终止事件,都是可选的:没有onNext事件,但一个 onComplete事件表示一个空的有限序列,但是删除onComplete,您将获得一个无限的空序列(除了取消测试外,它不是特别有用)。 同样,无限序列不一定为空。例如,Flux.interval(Duration) 产生一个Flux<Long>无限的并且从时钟发出规则的滴答声。

4.2. Mono, an Asynchronous 0-1 Result

The following image shows how a Mono transforms an item:

Mono

A Mono<T> is a specialized Publisher<T> that emits at most one item and then (optionally) terminates with an onComplete signal or an onError signal.

Mono<T>是一个专门的Publisher<T>,最多发出一个项目, 然后(可选)以一个onComplete信号或一个onError信号终止。

It offers only a subset of the operators that are available for a Flux, and some operators (notably those that combine the Mono with another Publisher) switch to a Flux. For example, Mono#concatWith(Publisher) returns a Flux while Mono#then(Mono) returns another Mono.

它仅提供可用于Flux的运算符的一个子集,而某些运算符(特别是那些Mono与另一个结合的运算符Publisher)切换到Flux。 例如,Mono#concatWith(Publisher)返回Flux一会儿Mono#then(Mono) 返回另一个Mono。

Note that you can use a Mono to represent no-value asynchronous processes that only have the concept of completion (similar to a Runnable). To create one, you can use an empty Mono<Void>.

请注意,您可以使用 Mono表示仅具有完成概念的无值异步流程(类似于Runnable)。要创建一个,您可以使用empty Mono<Void>。

4.3. Simple Ways to Create a Flux or Mono and Subscribe to It

The easiest way to get started with Flux and Mono is to use one of the numerous factory methods found in their respective classes.

最简单的方式开始使用Flux和Mono是使用在各自的类别中的众多工厂方法之一。

For instance, to create a sequence of String, you can either enumerate them or put them in a collection and create the Flux from it, as follows:

例如,要创建的序列String,您可以枚举它们或将它们放入集合中并从中创建Flux,如下所示:

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");

List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);

Other examples of factory methods include the following:

Mono<String> noData = Mono.empty(); (1)

Mono<String> data = Mono.just("foo");

Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); (2)
1 请注意,即使工厂方法没有任何价值,它也会尊重该泛型类型.
2 第一个参数是范围的开始,而第二个参数是要生成的项目数.

When it comes to subscribing, Flux and Mono make use of Java 8 lambdas. You have a wide choice of .subscribe() variants that take lambdas for different combinations of callbacks, as shown in the following method signatures:

当谈到订阅,Flux以及Mono 采用Java 8 lambda表达式。 您可以选择多种.subscribe()类型的变体,它们针对不同的回调组合采用lambda表达式,如以下方法签名所示:

Example 10. Lambda-based subscribe variants for Flux
subscribe(); (1)

subscribe(Consumer<? super T> consumer); (2)

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); (3)

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); (4)

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); (5)
1 订阅并触发序列.
2 为每个产生的值做点事.
3 处理值,但也会对错误做出反应.
4 处理值和错误,但在序列成功完成后还要运行一些代码.
5 处理值和错误并成功完成,但还要处理因subscribe调用产生的结果Subscription.
These variants return a reference to the subscription that you can use to cancel the subscription when no more data is needed. Upon cancellation, the source should stop producing values and clean up any resources it created. This cancel-and-clean-up behavior is represented in Reactor by the general-purpose Disposable interface.
这些变体返回对订阅的引用,您可以在不需要更多数据时使用该引用来取消订阅。 取消后,源应停止产生值并清除其创建的任何资源。这种取消和清理行为在Reactor中由通用Disposable接口表示。

4.3.1. subscribe Method Examples

This section contains minimal examples of each of the five signatures for the subscribe method. The following code shows an example of the basic method with no arguments:

本节包含该subscribe 方法的五个签名中每个签名的最少示例。以下代码显示了不带参数的基本方法的示例:

Flux<Integer> ints = Flux.range(1, 3); (1)
ints.subscribe(); (2)
1 设置一个Flux在订阅者加入时产生三个值的。以最简单的方式订阅.
2 以最简单的方式订阅.

The preceding code produces no visible output, but it does work. The Flux produces three values. If we provide a lambda, we can make the values visible. The next example for the subscribe method shows one way to make the values appear:

前面的代码没有产生可见的输出,但是它确实起作用。将Flux产生三个值。如果提供lambda,则可以使值可见。 该subscribe方法的下一个示例显示了一种使值出现的方法

Flux<Integer> ints = Flux.range(1, 3); (1)
ints.subscribe(i -> System.out.println(i)); (2)
1 设置一个Flux在订阅者加入时产生三个值的.
2 用将打印值的订阅者订阅.

The preceding code produces the following output:

1
2
3

To demonstrate the next signature, we intentionally introduce an error, as shown in the following example:

为了演示下一个签名,我们有意引入一个错误,如以下示例所示

Flux<Integer> ints = Flux.range(1, 4) (1)
      .map(i -> { (2)
        if (i <= 3) return i; (3)
        throw new RuntimeException("Got to 4"); (4)
      });
ints.subscribe(i -> System.out.println(i), (5)
      error -> System.err.println("Error: " + error));
1 设置一个Flux,当订阅者连接时它会产生四个值.
2 我们需要一个map,以便我们可以不同地处理某些值.
3 对于大多数值,请返回该值.
4 对于一个值,强制执行错误.
5 用包含错误处理程序的订阅者进行Subscribe.

We now have two lambda expressions: one for the content we expect and one for errors. The preceding code produces the following output:

现在,我们有两个lambda表达式:一个用于期望的内容,另一个用于错误。上面的代码产生以下输出:

1
2
3
Error: java.lang.RuntimeException: Got to 4

The next signature of the subscribe method includes both an error handler and a handler for completion events, as shown in the following example:

该subscribe方法的下一个签名包括错误处理程序和完成事件处理程序,如以下示例所示:

Flux<Integer> ints = Flux.range(1, 4); (1)
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> System.out.println("Done")); (2)
1 设置一个Flux,当订阅者连接时它会产生四个值.
2 使用包括完成事件处理程序的订阅服务器进行订阅.

Error signals and completion signals are both terminal events and are exclusive of one another (you never get both). To make the completion consumer work, we must take care not to trigger an error.

错误信号和完成信号都是终端事件,并且彼此互斥(您永远不会都得到)。为了使完成消费者工作,我们必须注意不要触发错误

The completion callback has no input, as represented by an empty pair of parentheses: It matches the run method in the Runnable interface. The preceding code produces the following output:

完成回调没有输入,由一对空括号表示:它与接口中的run方法匹配Runnable。上面的代码产生以下输出:

1
2
3
4
Done

The last signature of the subscribe method includes a Consumer<Subscription>.

该subscribe方法的最后一个签名包括Consumer<Subscription>。

That variant requires you to do something with the Subscription (perform a request(long) on it or cancel() it). Otherwise the Flux hangs.
该变体要求您对Subscription进行某些操作(request(long)或 cancel())。否则Flux会挂起。

The following example shows the last signature of the subscribe method:

Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> System.out.println("Done"),
    sub -> sub.request(10)); (1)
1 当我们订阅时,我们会收到Subscription。表示我们要从源头获取 10 个元素(实际上将发出4个元素并完成).

4.3.2. Cancelling a subscribe() with Its Disposable

All these lambda-based variants of subscribe() have a Disposable return type. In this case, the Disposable interface represents the fact that the subscription can be cancelled, by calling its dispose() method.

所有这些基于lambda的变体subscribe()都具有Disposable返回类型。 在这种情况下,该Disposable接口表示可以实现通过调用方法dispose()来取消订阅。

For a Flux or Mono, cancellation is a signal that the source should stop producing elements. However, it is NOT guaranteed to be immediate: Some sources might produce elements so fast that they could complete even before receiving the cancel instruction.

对于Flux或Mono,取消表示信号源应停止产生元素。 但是,并不能保证立即执行:某些源可能会产生如此快的元素,以至于甚至在接收到取消指令之前它们也可以完成。

Some utilities around Disposable are available in the Disposables class. Among these, Disposables.swap() creates a Disposable wrapper that lets you atomically cancel and replace a concrete Disposable. This can be useful, for instance, in a UI scenario where you want to cancel a request and replace it with a new one whenever the user clicks on a button. Disposing the wrapper itself closes it. Doing so disposes the current concrete value and all future attempted replacements.

在Disposable类中提供了一些实用程序。 在其中,Disposables.swap()创建一个Disposable包装器,使您可以自动取消和替换具体的Disposable。 例如,这在UI场景中很有用,在UI场景中,您希望在用户单击按钮时取消请求并将其替换为新请求。 包装器本身会将其自动关闭。这样做会处置当前的具体的值以及将来所有尝试的替代品。

Another interesting utility is Disposables.composite(…​). This composite lets you collect several Disposable — for instance, multiple in-flight requests associated with a service call — and dispose all of them at once later on. Once the composite’s dispose() method has been called, any attempt to add another Disposable immediately disposes it.

另一个有趣的实用程序是Disposables.composite(…​)。通过此组合,您可以收集多个Disposable (例如,与服务调用关联的多个进行中的请求),并在以后一次处理所有这些请求。 一旦复合方法调用了dispose(),任何添加其他Disposable方法的尝试都会立即丢弃掉。

4.3.3. An Alternative to Lambdas: BaseSubscriber

There is an additional subscribe method that is more generic and takes a full-blown Subscriber rather than composing one out of lambdas. In order to help with writing such a Subscriber, we provide an extendable class called BaseSubscriber.

还有一种subscribe更通用的方法,它采用成熟的方法, 使用Subscriber而不是用lambda组成一个方法。 为了帮助编写这样的 Subscriber,我们提供了一个称为的可扩展的类叫 BaseSubscriber。

Instances of BaseSubscriber (or subclasses of it) are single-use, meaning that a BaseSubscriber cancels its subscription to the first Publisher if it is subscribed to a second Publisher. That is because using an instance twice would violate the Reactive Streams rule that a the onNext method of a Subscriber must not be called in parallel. As a result, anonymous implementations are fine only if they are declared directly within the call to Publisher#subscribe(Subscriber).
BaseSubscriber(或它的子类)的 实例是一次性的,这意味着如果一个 BaseSubscriber 对第二个 Publisher 就行订阅,则它会取消其对第一个的Publisher的订阅。 那是因为两次使用一个实例会违反反应式流规则,即不能并行调用Subscriber的onNext方法。 因此,只有在对的调用中直接声明了匿名实现时,匿名实现才可以Publisher#subscribe(Subscriber)。

Now we can implement one of these. We call it a SampleSubscriber. The following example shows how it would be attached to a Flux:

现在我们可以实现其中之一。我们称它为SampleSubscriber。以下示例显示了如何将其附加到Flux:

SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> {System.out.println("Done");},
    s -> s.request(10));
ints.subscribe(ss);

The following example shows what SampleSubscriber could look like, as a minimalistic implementation of a BaseSubscriber:

以下示例显示了SampleSubscriber作为一个BaseSubscriber的简约实现:

package io.projectreactor.samples;

import org.reactivestreams.Subscription;

import reactor.core.publisher.BaseSubscriber;

public class SampleSubscriber<T> extends BaseSubscriber<T> {

	public void hookOnSubscribe(Subscription subscription) {
		System.out.println("Subscribed");
		request(1);
	}

	public void hookOnNext(T value) {
		System.out.println(value);
		request(1);
	}
}

The SampleSubscriber class extends BaseSubscriber, which is the recommended abstract class for user-defined Subscribers in Reactor. The class offers hooks that can be overridden to tune the subscriber’s behavior. By default, it triggers an unbounded request and behaves exactly as subscribe(). However, extending BaseSubscriber is much more useful when you want a custom request amount.

这SampleSubscriber类是扩展BaseSubscriber,在Reactor中,推荐使用Subscribers的抽象类SampleSubscriber,去实现自定义功能。 该类提供了可以被覆盖的hook,以调整订阅者的行为。默认情况下,它会触发一个无限制的请求,并且行为与完全相同subscribe()。 但是,当您需要自定义请求量时,BaseSubscriber的扩展功能会更加有用。

For a custom request amount, the bare minimum is to implement hookOnSubscribe(Subscription subscription) and hookOnNext(T value), as we did. In our case, the hookOnSubscribe method prints a statement to standard out and makes the first request. Then the hookOnNext method prints a statement and performs additional requests, one request at a time.

对于自定义请求量,最起码的是实施hookOnSubscribe(Subscription subscription) 和hookOnNext(T value),就像我们所做的那样。 在我们的例子中,该hookOnSubscribe方法输出标准声明并发出第一个请求。然后,该hookOnNext 方法打印一条语句并执行其他请求,一次执行一个请求。

The SampleSubscriber class produces the following output:

Subscribed
1
2
3
4

BaseSubscriber also offers a requestUnbounded() method to switch to unbounded mode (equivalent to request(Long.MAX_VALUE)), as well as a cancel() method.

BaseSubscriber还提供了requestUnbounded()一种切换到无界模式的方法(等效于request(Long.MAX_VALUE)),以及一种cancel()方法。

It also has additional hooks: hookOnComplete, hookOnError, hookOnCancel, and hookFinally (which is always called when the sequence terminates, with the type of termination passed in as a SignalType parameter)

它还具有另外的钩:hookOnComplete,hookOnError,hookOnCancel,和hookFinally (其总是在序列终止时调用,并在序列通过后,传入类型为SignalType的参数)

You almost certainly want to implement the hookOnError, hookOnCancel, and hookOnComplete methods. You may also want to implement the hookFinally method. SampleSubscribe is the absolute minimum implementation of a Subscriber that performs bounded requests.
你几乎可以肯定要实现的hookOnError,hookOnCancel和 hookOnComplete方法。 您可能还想实现该hookFinally方法。 SampleSubscribe是Subscriber 执行受限请求的的绝对最小的实现。

4.3.4. On Backpressure and Ways to Reshape Requests

When implementing backpressure in Reactor, the way consumer pressure is propagated back to the source is by sending a request to the upstream operator. The sum of current requests is sometimes referenced to as the current “demand”, or “pending request”. Demand is capped at Long.MAX_VALUE, representing an unbounded request (meaning “produce as fast as you can” — basically disabling backpressure).

在Reactor中实施背压时,通过将压力发送request给上游操作员,将用户压力传播回源。 当前请求的总和有时被称为当前“需求”或“待处理请求”。 需求上限为Long.MAX_VALUE,表示无限制的请求(意思是“尽可能快地生成”,基本上是禁止背压)。

The first request comes from the final subscriber at subscription time, yet the most direct ways of subscribing all immediately trigger an unbounded request of Long.MAX_VALUE:

第一个请求在订阅时来自最终订户,但是最直接的订阅方式立即触发了以下无限请求Long.MAX_VALUE:

  • subscribe() and most of its lambda-based variants (to the exception of the one that has a Consumer<Subscription>)

  • block(), blockFirst() and blockLast()

  • iterating over a toIterable() or toStream()

  • subscribe() 及其大多数基于lambda的变体(具有Consumer <Subscription>的变体除外)

  • block(),blockFirst()和blockLast()

  • 遍历在一个toIterable()或toStream()

The simplest way of customizing the original request is to subscribe with a BaseSubscriber with the hookOnSubscribe method overridden, as the following example shows:

定制原始请求的最简单方法是subscribe使用BaseSubscriber,其中hookOnSubscribe方法被重写,如以下示例所示:

Flux.range(1, 10)
    .doOnRequest(r -> System.out.println("request of " + r))
    .subscribe(new BaseSubscriber<Integer>() {

      @Override
      public void hookOnSubscribe(Subscription subscription) {
        request(1);
      }

      @Override
      public void hookOnNext(Integer integer) {
        System.out.println("Cancelling after having received " + integer);
        cancel();
      }
    });

The preceding snippet prints out the following:

request of 1
Cancelling after having received 1
When manipulating a request, you must be careful to produce enough demand for the sequence to advance, or your Flux can get “stuck”. That is why BaseSubscriber defaults to an unbounded request in hookOnSubscribe. When overriding this hook, you should usually call request at least once.
在处理请求时,您必须小心以产生足够的需求来推进序列,否则您的Flux可能会“卡住”。 这就是为什么BaseSubscriber中的hookOnSubscribe 默认为无限制请求。覆盖此钩子时,通常应request至少调用一次。
Operators that Change the Demand from Downstream

One thing to keep in mind is that demand expressed at the subscribe level can be reshaped by each operator in the upstream chain. A textbook case is the buffer(N) operator: If it receives a request(2), it is interpreted as a demand for two full buffers. As a consequence, since buffers need N elements to be considered full, the buffer operator reshapes the request to 2 x N.

要记住的一件事是,上游链中的每个operator都可以调整在订阅级别表达的需求。 教科书的情况是buffer(N)运算符:如果它收到 request(2),则解释为对两个完整缓冲区的需求。 结果,由于缓冲区需要将N元素视为已满,因此buffer运算符将请求调整为2 x N。

You might also have noticed that some operators have variants that take an int input parameter called prefetch. This is another category of operators that modify the downstream request. These are usually operators that deal with inner sequences, deriving a Publisher from each incoming element (like flatMap).

您可能还已经注意到,某些运算符的变体采用了int称为的输入参数prefetch。这是修改下游请求的另一类运算符。 这些通常是处理内部序列的运算符,它们Publisher从每个传入元素(如flatMap)派生一个。

Prefetch is a way to tune the initial request made on these inner sequences. If unspecified, most of these operators start with a demand of 32.

Prefetch是一种调整对这些内部序列发出的初始请求的方法。如果未指定,则大多数这些运算符的起始要求为32。

These operators usually also implement a replenishing optimization: Once the operator has seen 75% of the prefetch request fulfilled, it re-requests 75% from upstream. This is a heuristic optimization made so that these operators proactively anticipate the upcoming requests.

这些操作员通常还会实现补充优化:一旦操作员看到预取请求的75%已完成,它就会从上游重新请求75%。 进行启发式优化,以便这些操作员主动预测即将到来的请求。

Finally, a couple of operators let you directly tune the request: limitRate and limitRequest.

最后,几个运算符可让您直接调整请求:limitRate和limitRequest。

limitRate(N) splits the downstream requests so that they are propagated upstream in smaller batches. For instance, a request of 100 made to limitRate(10) would result in, at most, 10 requests of 10 being propagated to the upstream. Note that, in this form, limitRate actually implements the replenishing optimization discussed earlier.

limitRate(N)拆分下游请求,以便将它们以较小的批次传播到上游。例如,一个请求100到由limitRate(10)将导致,顶多10的请求10传播到上游。 注意,以这种形式,limitRate实际上实现了前面讨论的补充优化。

The operator has a variant that also lets you tune the replenishing amount (referred to as the lowTide in the variant): limitRate(highTide, lowTide). Choosing a lowTide of 0 results in strict batches of highTide requests, instead of batches further reworked by the replenishing strategy.

经营者有一个变种,也可以让你调整补充量(简称lowTide中变体): limitRate(highTide, lowTide)。 选择一个 lowTide 参数0 会导致严格的批次highTide请求,而不是通过补充策略进一步重做的批次。

limitRequest(N), on the other hand, caps the downstream request to a maximum total demand. It adds up requests up to N. If a single request does not make the total demand overflow over N, that particular request is wholly propagated upstream. After that amount has been emitted by the source, limitRequest considers the sequence complete, sends an onComplete signal downstream, and cancels the source.

limitRequest(N),另一方面,限额了下游请求的最大总需求。它将请求总计为N。如果单个请求request没有使总需求超出N,则该特定请求将完全向上游传播。 在源头发出该数量的信号后,limitRequest认为序列已完成,向下游发送onComplete信号,然后取消源。

4.4. Programmatically creating a sequence

In this section, we introduce the creation of a Flux or a Mono by programmatically defining its associated events (onNext, onError, and onComplete). All these methods share the fact that they expose an API to trigger the events that we call a sink. There are actually a few sink variants, which we’ll get to shortly.

在本节中,我们介绍的创建Flux或Mono通过编程方式定义及其相关事件(onNext,onError,和 onComplete)。 所有这些方法都满足相同的设计:它们公开一个API来触发我们称为接收器的事件。实际上有一些接收器变体,稍后我们将介绍。

4.4.1. Synchronous generate

The simplest form of programmatic creation of a Flux is through the generate method, which takes a generator function.

以编程方式创建“ Flux”的最简单形式是通过“ generate”方法,通过使用生成器功能。

This is for synchronous and one-by-one emissions, meaning that the sink is a SynchronousSink and that its next() method can only be called at most once per callback invocation. You can then additionally call error(Throwable) or complete(), but this is optional.

这用于*同步发射*和*一对一*发射,这意味着接收器为 SynchronousSink,并且其next()方法最多只能在每次回调调用时调用一次。 然后,您可以另外调用error(Throwable) 或complete(),但这是可选的。

The most useful variant is probably the one that also lets you keep a state that you can refer to in your sink usage to decide what to emit next. The generator function then becomes a BiFunction<S, SynchronousSink<T>, S>, with <S> the type of the state object. You have to provide a Supplier<S> for the initial state, and your generator function now returns a new state on each round.

最有用的变体可能是一种变体,它还可以让您保持在接收器使用中可以参考的状态,以决定接下来要发射什么。 然后,生成器函数变为BiFunction<S, SynchronousSink<T>, S>带有<S>状态对象类型的。 您必须Supplier<S>为初始状态提供一个,并且生成器函数现在在每个回合中都返回一个新状态。

For instance, you could use an int as the state:

Example 11. Example of state-based generate
Flux<String> flux = Flux.generate(
    () -> 0, (1)
    (state, sink) -> {
      sink.next("3 x " + state + " = " + 3*state); (2)
      if (state == 10) sink.complete(); (3)
      return state + 1; (4)
    });
1 我们提供初始状态值0
2 我们使用状态来选择要发出的信号(乘法表3中的一行).
3 我们还使用它来选择何时停止
4 我们返回在下一个调用中使用的新状态(除非序列在此调用中终止).

The preceding code generates the table of 3, as the following sequence:

3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30

You can also use a mutable <S>. The example above could for instance be rewritten using a single AtomicLong as the state, mutating it on each round:

您也可以使用可变的<S>。例如,上面的示例可以使用一个AtomicLong状态作为重写状态,在每个回合中对其进行更改

Example 12. Mutable state variant
Flux<String> flux = Flux.generate(
    AtomicLong::new, (1)
    (state, sink) -> {
      long i = state.getAndIncrement(); (2)
      sink.next("3 x " + i + " = " + 3*i);
      if (i == 10) sink.complete();
      return state; (3)
    });
1 这次,我们生成一个可变对象作为状态.
2 我们在这里改变状态.
3 我们返回与新状态相同的实例.
If your state object needs to clean up some resources, use the generate(Supplier<S>, BiFunction, Consumer<S>) variant to clean up the last state instance.
如果您的状态对象需要清理一些资源,请使用 generate(Supplier<S>, BiFunction, Consumer<S>)变体来清理最后一个状态实例

The following example uses the generate method that includes a Consumer:

Flux<String> flux = Flux.generate(
    AtomicLong::new,
      (state, sink) -> { (1)
      long i = state.getAndIncrement(); (2)
      sink.next("3 x " + i + " = " + 3*i);
      if (i == 10) sink.complete();
      return state; (3)
    }, (state) -> System.out.println("state: " + state)); (4)
}
1 同样,我们生成一个可变对象作为状态.
2 我们在这里改变状态.
3 我们返回与新状态相同的实例.
4 我们将最后一个状态值(11)视为此Consumerlambda 的输出.

In the case of the state containing a database connection or other resource that needs to be handled at the end of the process, the Consumer lambda could close the connection or otherwise handle any tasks that should be done at the end of the process.

如果状态包含在过程结束时需要处理的数据库连接或其他资源,则Consumer lambda可以关闭连接或以其他方式处理应在过程结束时完成的任何任务

4.4.2. Asynchronous and Multi-threaded: create

create is a more advanced form of programmatic creation of a Flux which is suitable for multiple emissions per round, even from multiple threads.

create是Flux内的一种更高级的程序化创建形式,适用于每轮多次排放,甚至来自多个线程。

It exposes a FluxSink, with its next, error, and complete methods. Contrary to generate, it doesn’t have a state-based variant. On the other hand, it can trigger multi-threaded events in the callback.

它暴露了FluxSink,与它的next,error和complete方法。与此相反generate,它没有基于状态的变体。另一方面,它可以触发回调中的多线程事件。

create can be very useful to bridge an existing API with the reactive world - such as an asynchronous API based on listeners.
create 将现有的API与响应世界联系起来非常有用-例如基于侦听器的异步API。
create doesn’t parallelize your code nor does it make it asynchronous, even though it can be used with asynchronous APIs. If you block within the create lambda, you expose yourself to deadlocks and similar side effects. Even with the use of subscribeOn, there’s the caveat that a long-blocking create lambda (such as an infinite loop calling sink.next(t)) can lock the pipeline: the requests would never be performed due to the loop starving the same thread they are supposed to run from. Use the subscribeOn(Scheduler, false) variant: requestOnSeparateThread = false will use the Scheduler thread for the create and still let data flow by performing request in the original thread.
create 即使它可以与异步API一起使用,也不会并行化您的代码,也不会使其异步。 如果您在createlambda中阻塞,则会使自己陷入僵局和类似的副作用。 即使使用subscribeOn,也有一个警告,即长阻塞的create lambda(例如无限循环调用 sink.next(t))可以锁定管道:由于循环会饿死它们应该从中运行的相同线程,因此将永远不会执行请求。 使用subscribeOn(Scheduler, false) 变体:requestOnSeparateThread = false将Scheduler线程用于,create 并仍然通过request在原始线程中执行来让数据流动。

Imagine that you use a listener-based API. It processes data by chunks and has two events: (1) a chunk of data is ready and (2) the processing is complete (terminal event), as represented in the MyEventListener interface:

假设您使用基于侦听器的API。 它按块处理数据并有两个事件:(1)数据块已准备就绪,并且(2)处理完成(终端事件),如MyEventListener接口所示:

interface MyEventListener<T> {
    void onDataChunk(List<T> chunk);
    void processComplete();
}

You can use create to bridge this into a Flux<T>:

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register( (4)
      new MyEventListener<String>() { (1)

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); (2)
          }
        }

        public void processComplete() {
            sink.complete(); (3)
        }
    });
});
1 桥接到MyEventListenerAPI
2 块中的每个元素都成为中的一个元素Flux.
3 该processComplete事件已翻译为onComplete.
4 每当myEventProcessor执行时,所有这些操作都是异步完成的.

Additionally, since create can bridge asynchronous APIs and manages backpressure, you can refine how to behave backpressure-wise, by indicating an OverflowStrategy:

此外,由于create可以桥接异步API并管理背压,因此您可以通过指示以下内容来优化如何进行背压行为OverflowStrategy:

  • IGNORE to Completely ignore downstream backpressure requests. This may yield IllegalStateException when queues get full downstream.

  • ERROR to signal an IllegalStateException when the downstream can’t keep up.

  • DROP to drop the incoming signal if the downstream is not ready to receive it.

  • LATEST to let downstream only get the latest signals from upstream.

  • BUFFER (the default) to buffer all signals if the downstream can’t keep up. (this does unbounded buffering and may lead to OutOfMemoryError).

  • IGNORE完全忽略下游背压请求。IllegalStateException当队列下游充满时,可能会产生这种情况.

  • ERROR 去发送一个 IllegalStateException信号,当下游无法跟上时发出信号.

  • DROP 如果下游尚未准备好接收信号,则丢弃该信号.

  • LATEST 让下游只从上游获取最新信号.

  • BUFFER(默认设置)以在下游无法跟上时缓冲所有信号。(这会实现无限缓冲,并可能导致OutOfMemoryError).

Mono also has a create generator. The MonoSink of Mono’s create doesn’t allow several emissions. It will drop all signals after the first one.
Mono也有一个create创造器。在MonoSink的创造方法中不允许几个发射。它将在第一个信号之后丢弃所有信号

4.4.3. Asynchronous but single-threaded: push

push is a middle ground between generate and create which is suitable for processing events from a single producer. It is similar to create in the sense that it can also be asynchronous and can manage backpressure using any of the overflow strategies supported by create. However, only one producing thread may invoke next, complete or error at a time.

push是之间的中间接地generate并且create其适用于从一个生产者处理事件。从create某种意义上讲,它类似于,它也可以是异步的,并且可以使用所支持的任何溢出策略来管理背压create。 但是,只有一个线程生产 可以调用next,complete或error在同一时间。

Flux<String> bridge = Flux.push(sink -> {
    myEventProcessor.register(
      new SingleThreadEventListener<String>() { (1)

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); (2)
          }
        }

        public void processComplete() {
            sink.complete(); (3)
        }

        public void processError(Throwable e) {
            sink.error(e); (4)
        }
    });
});
1 桥接到SingleThreadEventListener API.
2 使用next单个侦听器线程将事件推送到接收器.
3 complete 从同一侦听器线程生成的事件.
4 error 事件也从同一侦听器线程生成.
A hybrid push/pull model

Most Reactor operators, like create, follow a hybrid push/pull model. What we mean by that is that despite most of the processing being asynchronous (suggesting a push approach), there is a small pull component to it: the request.

像的大多数Reactor运算符都create遵循混合 推/拉模型。 我们的意思是,尽管大多数处理都是异步的(建议采用推送方法),但其中有一个很小的拉组件:请求。

The consumer pulls data from the source in the sense that it won’t emit anything until first requested. The source pushes data to the consumer whenever it becomes available, but within the bounds of its requested amount.

消费者从源中提取数据,这意味着直到第一次请求它才发出任何东西。 只要有可用,源就会将数据推送到使用者,但要在其请求数量的范围内

Note that push() and create() both allow to set up an onRequest consumer in order to manage the request amount and to ensure that data is pushed through the sink only when there is pending request.

请注意,push()并且create()两者都允许设置onRequest使用者以管理请求量,并确保仅在有待处理的请求时才通过接收器推送数据

Flux<String> bridge = Flux.create(sink -> {
    myMessageProcessor.register(
      new MyMessageListener<String>() {

        public void onMessage(List<String> messages) {
          for(String s : messages) {
            sink.next(s); (3)
          }
        }
    });
    sink.onRequest(n -> {
        List<String> messages = myMessageProcessor.getHistory(n); (1)
        for(String s : message) {
           sink.next(s); (2)
        }
    });
});
1 发出请求时轮询消息.
2 如果消息立即可用,请将其推入接收器.
3 其余的消息也将在稍后被异步传递到达.
Cleaning up after push() or create()

Two callbacks, onDispose and onCancel, perform any cleanup on cancellation or termination. onDispose can be used to perform cleanup when the Flux completes, errors out, or is cancelled. onCancel can be used to perform any action specific to cancellation prior to cleanup with onDispose.

两个回调onDispose和onCancel在取消时执行任何清理或终止。 当`Flux` 完成,出错或者被取消时,`onDispose`可用于执行清理。 `onCancel`可用于执行任何在使用onDispose进行清理之前,特定于取消的操作。

Flux<String> bridge = Flux.create(sink -> {
    sink.onRequest(n -> channel.poll(n))
        .onCancel(() -> channel.cancel()) (1)
        .onDispose(() -> channel.close())  (2)
    });
1 onCancel 首先调用,仅用于取消信号.
2 onDispose 调用为完成,错误或取消信号而调用.

4.4.4. Handle

The handle method is a bit different: it is an instance method, meaning that it is chained on an existing source (as are the common operators). It is present in both Mono and Flux.

handle方法有点不同:它是一个实例方法,这意味着它被链接在一个现有的源上(常见的运算符也是如此)。它存在于Mono和中Flux。

It is close to generate, in the sense that it uses a SynchronousSink and only allows one-by-one emissions. However, handle can be used to generate an arbitrary value out of each source element, possibly skipping some elements. In this way, it can serve as a combination of map and filter. The signature of handle is as follows:

它接近generate,从某种意义上说,它使用 SynchronousSink 且仅允许一对一的发射。 但是,handle可用于从每个源元素中生成任意值,可能会跳过某些元素。通过这种方式,它可以作为map和filter组合一样使用。 handle的签名如下:

Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);

Let’s consider an example. The reactive streams specification disallows null values in a sequence. What if you want to perform a map but you want to use a preexisting method as the map function, and that method sometimes returns null?

让我们考虑一个例子。反应性流规范不允许null值在序列中。如果要执行一个 map方法,但想使用一个预先存在的方法作为map函数,而该方法有时返回null怎么办?

For instance, the following method can be applied safely to a source of integers:

例如,以下方法可以安全地应用于整数源:

public String alphabet(int letterNumber) {
	if (letterNumber < 1 || letterNumber > 26) {
		return null;
	}
	int letterIndexAscii = 'A' + letterNumber - 1;
	return "" + (char) letterIndexAscii;
}

We can then use handle to remove any nulls:

然后,我们可以使用handle删除任何空值:

Using handle for a "map and eliminate nulls" scenario

handle用于“映射并消除空值”场景

Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
    .handle((i, sink) -> {
        String letter = alphabet(i); (1)
        if (letter != null) (2)
            sink.next(letter); (3)
    });

alphabet.subscribe(System.out::println);
1 映射到字母.
2 如果“ map function”返回null
3 通过不调用过滤掉它sink.next.

Which will print out:

M
I
T

4.5. Threading and Schedulers

Reactor, like RxJava, can be considered to be concurrency-agnostic. That is, it does not enforce a concurrency model. Rather, it leaves you, the developer, in command. However, that does not prevent the library from helping you with concurrency.

像RxJava一样,Reactor可以被视为与并发无关的。 也就是说,它不强制执行并发模型。相反,它使您(开发人员)处于命令状态。但是,这不会阻止你使用并发库。

Obtaining a Flux or a Mono does not necessarily mean that it runs in a dedicated Thread. Instead, most operators continue working in the Thread on which the previous operator executed. Unless specified, the topmost operator (the source) itself runs on the Thread in which the subscribe() call was made. The following example runs a Mono in a new thread:

获得 Flux或 Mono不一定意味着它运行在专用的Thread 中。取而代之的是,大多数运算符Thread将在先前的运算符执行时继续工作。 除非另有说明,最上面的操作(源)本身上运行,Thread其在subscribe()有人呼吁。以下示例Mono在新线程中运行:

public static void main(String[] args) throws InterruptedException {
  final Mono<String> mono = Mono.just("hello "); (1)

  Thread t = new Thread(() -> mono
      .map(msg -> msg + "thread ")
      .subscribe(v -> (2)
          System.out.println(v + Thread.currentThread().getName()) (3)
      )
  )
  t.start();
  t.join();

}
1 该Mono<String>组装在线程main.
2 但是,它是在thread中订阅的Thread-0.
3 因此,map和onNext回调实际上都在Thread-0

The preceding code produces the following output:

hello thread Thread-0

In Reactor, the execution model and where the execution happens is determined by the Scheduler that is used. A Scheduler has scheduling responsibilities similar to an ExecutorService, but having a dedicated abstraction lets it do more, notably acting as a clock and enabling a wider range of implementations (virtual time for tests, trampolining or immediate scheduling, and so on).

在Reactor中,执行模型以及执行的位置由所Scheduler使用所确定的 。 Scheduler 具有与ExecutorService相似的调度职责,但是具有专用的抽象使其可以做更多的事情, 尤其是充当时钟并支持更广泛的实现(测试的虚拟时间,蹦床或即时调度等)。

The Schedulers class has static methods that give access to the following execution contexts: Schedulers 类有给访问以下执行上下文的静态方法:

  • No execution context (Schedulers.immediate()): at processing time, the submitted Runnable will be directly executed, effectively running them on the current Thread (can be seen as a "null object" or no-op Scheduler).

  • A single, reusable thread (Schedulers.single()). Note that this method reuses the same thread for all callers, until the Scheduler is disposed. If you want a per-call dedicated thread, use Schedulers.newSingle() for each call.

  • An unbounded elastic thread pool (Schedulers.elastic()). This one is no longer preferred with the introduction of Schedulers.boundedElastic(), as it has a tendency to hide backpressure problems and lead to too many threads (see below).

  • A bounded elastic thread pool (Schedulers.boundedElastic()). Like its predecessor elastic(), it creates new worker pools as needed and reuses idle ones. Worker pools that stay idle for too long (the default is 60s) are also disposed. Unlike its elastic() predecessor, it has a cap on the number of backing threads it can create (default is number of CPU cores x 10). Up to 100 000 tasks submitted after the cap has been reached are enqueued and will be re-scheduled when a thread becomes available (when scheduling with a delay, the delay starts when the thread becomes available). This is a better choice for I/O blocking work. Schedulers.boundedElastic() is a handy way to give a blocking process its own thread so that it does not tie up other resources. See How Do I Wrap a Synchronous, Blocking Call? 如何包装同步阻塞呼叫?, but doesn’t pressure the system too much with new threads.

  • A fixed pool of workers that is tuned for parallel work (Schedulers.parallel()). It creates as many workers as you have CPU cores.

  • 没有执行上下文(Schedulers.immediate()):在处理时,提交的文件Runnable 将被直接执行,从而有效地在当前Thread上运行(可以视为“空对象”或no-op Scheduler)。

  • 单个可重用线程(Schedulers.single())。请注意,此方法对所有调用方都使用相同的线程,直到调度程序被释放为止。如果您需要一个每次调用专用线程,请Schedulers.newSingle()为每个调用使用。

  • 无限制的弹性线程池(Schedulers.elastic())。引入时Schedulers.boundedElastic(),不再首选该线程,因为它倾向于隐藏背压问题并导致线程过多(请参见下文)。

  • 有界的弹性线程池(Schedulers.boundedElastic())。像其前身一样elastic(),它根据需要创建新的工作池,并重用空闲的工作池。闲置时间过长(默认值为60s)的工作池也将被丢弃。 与之前的elastic()版本不同,它对可以创建的支持线程数进行了限制(默认值为CPU内核数x 10)。达到上限后,最多可提交10万个任务,并在线程可用时重新调度(当延迟调度时,延迟在线程可用时开始)。 这是I / O阻止工作的更好选择。 Schedulers.boundedElastic()是一种为阻塞进程分配自己的线程的简便方法,这样它就不会占用其他资源。请参阅How Do I Wrap a Synchronous, Blocking Call? 如何包装同步阻塞呼叫?,但使用新线程不会对系统造成太大压力。

  • 已调整为并行工作的固定工作池(Schedulers.parallel())。它创建的工作线程数量与CPU内核数量一样多。

Additionally, you can create a Scheduler out of any pre-existing ExecutorService by using Schedulers.fromExecutorService(ExecutorService). (You can also create one from an Executor, although doing so is discouraged.)

此外,您可以创建一个Scheduler不存在ExecutorService的对象使用Schedulers.fromExecutorService(ExecutorService)。 (Executor尽管不建议这样做,也可以从中创建一个 。)

You can also create new instances of the various scheduler types by using the newXXX methods. For example, Schedulers.newParallel(yourScheduleName) creates a new parallel scheduler named yourScheduleName.

您还可以使用这些newXXX 方法来创建各种调度程序类型的新实例。 例如,Schedulers.newParallel(yourScheduleName)创建一个名为 yourScheduleName 的新并行调度程序。

While boundedElastic is made to help with legacy blocking code if it cannot be avoided, single and parallel are not. As a consequence, the use of Reactor blocking APIs (block(), blockFirst(), blockLast() (as well as iterating over toIterable() or toStream()) inside the default single and parallel schedulers) results in an IllegalStateException being thrown.

Custom Schedulers can also be marked as "non blocking only" by creating instances of Thread that implement the NonBlocking marker interface.

虽然boundedElastic是帮助制造与传统的阻塞代码,如果无法避免, single而且parallel都没有。因此,使用反应器阻塞的API( ,block(),blockFirst()(blockLast()以及遍历toIterable() 或toStream())的默认单和并行调度器内)导致在IllegalStateException被抛出。

Schedulers通过创建Thread 实现NonBlocking标记接口的实例,也可以将Custom 标记为“仅非阻塞” 。

Some operators use a specific scheduler from Schedulers by default (and usually give you the option of providing a different one). For instance, calling the Flux.interval(Duration.ofMillis(300)) factory method produces a Flux<Long> that ticks every 300ms. By default, this is enabled by Schedulers.parallel(). The following line changes the Scheduler to a new instance similar to Schedulers.single():

某些操作员Schedulers默认情况下使用特定的调度程序(通常会为您提供提供其他调度程序的选项)。 例如,调用 Flux.interval(Duration.ofMillis(300))factory方法会产生一个Flux<Long>每300毫秒滴答一次的滴答声。 默认情况下,通过启用Schedulers.parallel()。以下行将Scheduler更改为类似于Schedulers.single()的新实例:

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

Reactor offers two means of switching the execution context (or Scheduler) in a reactive chain: publishOn and subscribeOn. Both take a Scheduler and let you switch the execution context to that scheduler. But the placement of publishOn in the chain matters, while the placement of subscribeOn does not. To understand that difference, you first have to remember that nothing happens until you subscribe.

Reactor提供了两种在反应式链中切换执行上下文(或 Scheduler)的方式:publishOn和subscribeOn。两者都使用Scheduler,让您将执行上下文切换到该调度程序。 但是publishOn在链中的位置很重要,而在链中的位置subscribeOn并不重要。要了解这种差异,您首先必须记住,nothing happens until yousubscribe

In Reactor, when you chain operators, you can wrap as many Flux and Mono implementations inside one another as you need. Once you subscribe, a chain of Subscriber objects is created, backward (up the chain) to the first publisher. This is effectively hidden from you. All you can see is the outer layer of Flux (or Mono) and Subscription, but these intermediate operator-specific subscribers are where the real work happens.

在Reactor中,当您链接运算符时,可以根据需要将许多 实现Flux和Mono实现彼此包装在一起。 订阅后,一个Subscriber对象链将被创建 ,向后(向上)到第一个发布者。 这实际上对您是隐藏的。您所看到的只是Flux(和Mono)和 Subscription的外层,但是这些中间操作员特定的订户才是真正工作的地方。

With that knowledge, we can have a closer look at the publishOn and subscribeOn operators:

有了这些知识,我们可以更详细地了解publishOnand subscribeOn 运算符

4.5.1. The publishOn Method

publishOn applies in the same way as any other operator, in the middle of the subscriber chain. It takes signals from upstream and replays them downstream while executing the callback on a worker from the associated Scheduler. Consequently, it affects where the subsequent operators execute (until another publishOn is chained in), as follows:

publishOn在订户链的中间以与任何其他运营商相同的方式应用。它从上游获取信号并在下游重播它们,同时在关联的上对worker执行回调Scheduler。 因此,它 会影响后续运算符的执行位置(直到publishOn链接另一个运算符),如下所示:

  • Changes the execution context to one Thread picked by the Scheduler

  • as per the specification, onNext calls happen in sequence, so this uses up a single thread

  • unless they work on a specific Scheduler, operators after publishOn continue execution on that same thread

  • 将执行上下文更改Thread为由Scheduler

  • 根据规范,onNext调用是按顺序发生的,因此这会占用一个线程

  • 除非他们在特定的Scheduler,执行publishOn后在同一线程上工作

The following example uses the publishOn method:

Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); (1)

final Flux<String> flux = Flux
    .range(1, 2)
    .map(i -> 10 + i)  (2)
    .publishOn(s)  (3)
    .map(i -> "value " + i);  (4)

new Thread(() -> flux.subscribe(System.out::println));  (5)
1 创建一个Scheduler由四个Thread实例支持的新对象。
2 第一个map在<5>中的匿名线程上运行。
3 publishOn 切换整个序列到在 Thread<1>中。
4 第二个map在Thread<1> 上运行.
5 这匿名Thread是进行订阅的地方。打印发生在最新的执行上下文中,是从publishOn创建出来的

4.5.2. The subscribeOn Method

subscribeOn applies to the subscription process, when that backward chain is constructed. As a consequence, no matter where you place the subscribeOn in the chain, it always affects the context of the source emission. However, this does not affect the behavior of subsequent calls to publishOn — they still switch the execution context for the part of the chain after them.

subscribeOn当构造了反向链时,适用于订阅过程。因此,无论将其放置subscribeOn在链中的什么位置, 它始终会影响源发射的环境。 但是,这不会影响后续调用的行为,publishOn 它们仍会在其后的部分链中切换执行上下文。

  • 从全链的订阅中更改Thread

  • 从Scheduler中选择一个线程

Only the earliest subscribeOn call in the chain is actually taken into account.

subscribeOn实际上仅考虑链中 最早的呼叫。

The following example uses the subscribeOn method:

Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); (1)

final Flux<String> flux = Flux
    .range(1, 2)
    .map(i -> 10 + i)  (2)
    .subscribeOn(s)  (3)
    .map(i -> "value " + i);  (4)

new Thread(() -> flux.subscribe(System.out::println));  (5)
1 创建一个Scheduler由4个Thread。.
2 第一个map运行在这四个线程之一上
3 因为 subscribeOn从订阅时间(<5>)的位置,就开始切换了整个序列的线程
4 第二个map也运行在同一线程上.
5 匿名Thread是最初发生预订的那个,但subscribeOn立即将其转移到四个调度程序线程之一。

4.6. Handling Errors

For a quick look at the available operators for error handling, see the relevant operator decision tree.

In Reactive Streams, errors are terminal events. As soon as an error occurs, it stops the sequence and gets propagated down the chain of operators to the last step, the Subscriber you defined and its onError method.

在反应式流中,错误是终端事件。一旦发生错误,它就会停止序列,并沿操作链传播到最后一步Subscriber上,你定义的onError方法。

Such errors should still be dealt with at the application level. For instance, you might display an error notification in a UI or send a meaningful error payload in a REST endpoint. For this reason, the subscriber’s onError method should always be defined.

此类错误仍应在应用程序级别处理。例如,您可能在UI中显示错误通知,或在REST端点中发送有意义的错误有效负载。 因此,onError应始终定义订户的方法。

If not defined, onError throws an UnsupportedOperationException. You can further detect and triage it with the Exceptions.isErrorCallbackNotImplemented method.
如果未定义,则onError抛出UnsupportedOperationException。您可以使用该Exceptions.isErrorCallbackNotImplemented方法进一步对其进行检测和分类。

Reactor also offers alternative means of dealing with errors in the middle of the chain, as error-handling operators. The following example shows how to do so:

作为错误处理运算符,Reactor还提供了处理链中间错误的替代方法。以下示例显示了如何执行此操作:

Flux.just(1, 2, 0)
    .map(i -> "100 / " + i + " = " + (100 / i)) //this triggers an error with 0
    .onErrorReturn("Divided by zero :("); // error handling example
Before you learn about error-handling operators, you must keep in mind that any error in a reactive sequence is a terminal event. Even if an error-handling operator is used, it does not let the original sequence continue. Rather, it converts the onError signal into the start of a new sequence (the fallback one). In other words, it replaces the terminated sequence upstream of it.
在学习错误处理运算符之前,必须记住,反应序列中的 任何错误都是终端事件。 即使使用了错误处理运算符,它也不会让原始序列继续。相反,它将onError信号转换为新序列的开始(后备序列)。 换句话说,它将替换其上游的终止序列。

Now we can consider each means of error handling one-by-one. When relevant, we make a parallel with imperative programming’s try patterns.

现在,我们可以考虑各种错误处理方式。我们看做与命令式编程的try模式相平行。

4.6.1. Error Handling Operators

You may be familiar with several ways of dealing with exceptions in a try-catch block. Most notably, these include the following:

您可能熟悉在try-catch块中处理异常的几种方法。最值得注意的是,这些内容包括:

  • Catch and return a static default value.

  • Catch and execute an alternative path with a fallback method.

  • Catch and dynamically compute a fallback value.

  • Catch, wrap to a BusinessException, and re-throw.

  • Catch, log an error-specific message, and re-throw.

  • Use the finally block to clean up resources or a Java 7 “try-with-resource” construct.

  • 捕获并返回静态默认值。

  • 使用后备方法捕获并执行替代路径

  • 捕获并动态计算后备值

  • 接住,包装成BusinessException,然后重新抛出。

  • 捕获,记录特定于错误的消息,然后重新抛出。

  • 使用该finally块清除资源或Java 7“ try-with-resource”构造。

All of these have equivalents in Reactor, in the form of error-handling operators. Before looking into these operators, we first want to establish a parallel between a reactive chain and a try-catch block.

所有这些都以错误处理运算符的形式在Reactor中具有等效项。在研究这些运算符之前,我们首先要在反应链和try-catch块之间建立并行。

When subscribing, the onError callback at the end of the chain is akin to a catch block. There, execution skips to the catch in case an Exception is thrown, as the following example shows:

订阅时,onError链末尾的回调类似于一个catch 块。Exception如下面的示例所示,在此情况下,执行会跳到捕获到的情况:

Flux<String> s = Flux.range(1, 10)
    .map(v -> doSomethingDangerous(v)) (1)
    .map(v -> doSecondTransform(v)); (2)
s.subscribe(value -> System.out.println("RECEIVED " + value), (3)
            error -> System.err.println("CAUGHT " + error) (4)
);
1 执行了可能引发异常的转换
2 如果一切顺利,则执行第二次转换
3 每个成功转换的值都会打印出来
4 发生错误时,序列终止,并显示错误消息

The preceding example is conceptually similar to the following try-catch block:

前面的示例在概念上类似于以下try-catch块:

try {
    for (int i = 1; i < 11; i++) {
        String v1 = doSomethingDangerous(i); (1)
        String v2 = doSecondTransform(v1); (2)
        System.out.println("RECEIVED " + v2);
    }
} catch (Throwable t) {
    System.err.println("CAUGHT " + t); (3)
}
1 If an exception is thrown here…​
2 …​the rest of the loop is skipped…​
3 …​ and the execution goes straight to here.

Now that we have established a parallel, we can look at the different error handling cases and their equivalent operators.

既然我们已经建立了并行,我们就可以研究不同的错误处理情况及其等效的运算符

Static Fallback Value

The equivalent of “Catch and return a static default value” is onErrorReturn. The following example shows how to use it:

等效于“捕获并返回静态默认值” onErrorReturn。以下示例显示了如何使用它:

try {
  return doSomethingDangerous(10);
}
catch (Throwable error) {
  return "RECOVERED";
}

The following example shows the Reactor equivalent:

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn("RECOVERED");

You also have the option of applying a Predicate on the exception to decided whether or not to recover, as the folloiwng example shows:

你还可以选择Predicate对异常应用,以决定是否恢复,如以下示例所示:

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10"); (1)
1 Recover only if the message of the exception is "boom10"
Fallback Method

If you want more than a single default value and you have an alternative (safer) way of processing your data, you can use onErrorResume. This would be the equivalent of “Catch and execute an alternative path with a fallback method”.

如果您需要多个默认值,并且有另一种(更安全的)数据处理方式,则可以使用onErrorResume。这相当于“使用后备方法捕获并执行替代路径”。

For example, if your nominal process is fetching data from an external and unreliable service but you also keep a local cache of the same data that can be a bit more out of date but is more reliable, you could do the following:

例如,如果您的标称进程正在从外部且不可靠的服务中获取数据,但是您还保留了相同数据的本地缓存,该缓存可能会过时但更可靠,则可以执行以下操作:

String v1;
try {
  v1 = callExternalService("key1");
}
catch (Throwable error) {
  v1 = getFromCache("key1");
}

String v2;
try {
  v2 = callExternalService("key2");
}
catch (Throwable error) {
  v2 = getFromCache("key2");
}

The following example shows the Reactor equivalent:

Flux.just("key1", "key2")
    .flatMap(k -> callExternalService(k) (1)
        .onErrorResume(e -> getFromCache(k)) (2)
    );
1 对于每个键,异步调用外部服务。.
2 如果外部服务调用失败,则退回到该密钥的缓存。请注意,无论源错误e是什么,我们总是应用相同的后备.

Like onErrorReturn, onErrorResume has variants that let you filter which exceptions to fall back on, based either on the exception’s class or on a Predicate. The fact that it takes a Function also lets you choose a different fallback sequence to switch to, depending on the error encountered. The following example shows how to do so:

像 onErrorReturn,onErrorResume 那样具有变体,可让您根据异常的类或来过滤要回退的异常或断言。 事实上内部是执行了一个Function,使您可以根据遇到的错误选择不同的后备序列来切换。以下示例显示了如何执行此操作:

Flux.just("timeout1", "unknown", "key2")
    .flatMap(k -> callExternalService(k)
        .onErrorResume(error -> { (1)
            if (error instanceof TimeoutException) (2)
                return getFromCache(k);
            else if (error instanceof UnknownKeyException)  (3)
                return registerNewEntry(k, "DEFAULT");
            else
                return Flux.error(error); (4)
        })
    );
1 该功能允许动态选择如何继续.
2 如果源超时,请访问本地缓存.
3 如果源显示密钥未知,请创建一个新条目.
4 在所有其他情况下,“重新抛出”.
Dynamic Fallback Value

Even if you do not have an alternative (safer) way of processing your data, you might want to compute a fallback value out of the exception you received. This would be the equivalent of “Catch and dynamically compute a fallback value”.

即使没有其他(更安全)的数据处理方式,您也可能希望根据收到的异常来计算回退值。这相当于“捕获并动态计算后备值”。

For instance, if your return type (MyWrapper) has a variant dedicated to holding an exception (think Future.complete(T success) versus Future.completeExceptionally(Throwable error)), you could instantiate the error-holding variant and pass the exception.

例如,如果您的返回类型(MyWrapper)具有专用于保存异常的变量(认为 Future.complete(T success)与Future.completeExceptionally(Throwable error)),则可以实例化错误保存变量并传递异常。

An imperative example would look like the following:

try {
  Value v = erroringMethod();
  return MyWrapper.fromValue(v);
}
catch (Throwable error) {
  return MyWrapper.fromError(error);
}

You can do this reactively in the same way as the fallback method solution, by using onErrorResume, with a tiny bit of boilerplate, as follows:

您可以通过使用onErrorResume,与一小部分样板程序一样,以与后备方法解决方案相同的方式进行反应,如下所示:

erroringFlux.onErrorResume(error -> Mono.just( (1)
        MyWrapper.fromError(error) (2)
));
1 由于您期望返回MyWrapper为错误的表示形式,因此需要获得 Mono<MyWrapper> 从 onErrorResume 中。我们用 Mono.just() 来完成.
2 我们需要计算异常值。在这里,我们通过使用相关的MyWrapper工厂方法包装异常来实现这一目标。
Catch and Rethrow

"Catch, wrap to a BusinessException, and re-throw" looks like the following in the imperative world:

在命令式世界中,“捕获 BusinessException 包装并重新抛出” 看起来像以下内容:

try {
  return callExternalService(k);
}
catch (Throwable error) {
  throw new BusinessException("oops, SLA exceeded", error);
}

In the “fallback method” example, the last line inside the flatMap gives us a hint at achieving the same reactively, as follows:

在“后备方法”示例中,内的最后一行为flatMap我们提供了一个提示,以提示您以被动方式实现相同目标,如下所示:

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorResume(original -> Flux.error(
            new BusinessException("oops, SLA exceeded", original))
    );

However, there is a more straightforward way of achieving the same effect with onErrorMap:

但是,还有一种更简单的方法可以达到以下效果onErrorMap:

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
Log or React on the Side

For cases where you want the error to continue propagating but still want to react to it without modifying the sequence (logging it, for instance), you can use the doOnError operator. This is the equivalent of “Catch, log an error-specific message, and re-throw” pattern, as the following example shows:

对于希望错误继续传播但仍希望对错误做出反应而又不修改顺序(例如记录错误)的情况,可以使用doOnError 运算符。这等效于“捕获,记录特定于错误的消息并重新抛出”模式,如以下示例所示:

try {
  return callExternalService(k);
}
catch (RuntimeException error) {
  //make a record of the error
  log("uh oh, falling back, service failed for key " + k);
  throw error;
}

The doOnError operator, as well as all operators prefixed with doOn , are sometimes referred to as having a “side-effect”. They let you peek inside the sequence’s events without modifying them.

doOnError操作者,以及与所有前缀为doOn的操作符,有时被称为具有“副作用”。它们使您可以查看序列的事件而无需修改它们。

Like the imperative example shown earlier, the following example still propagates the error yet ensures that we at least log that the external service had a failure:

LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
    .flatMap(k -> callExternalService(k) (1)
        .doOnError(e -> {
            failureStat.increment();
            log("uh oh, falling back, service failed for key " + k); (2)
        })
        (3)
    );
1 可能失败的外部服务调用.
2 具有日志和统计方面的副作用
3 之后,它仍然会以错误终止,除非我们在此处使用错误恢复操作符。

We can also imagine we have statistic counters to increment as a second error side-effect.

我们还可以想象,我们有统计计数器会增加,这是第二个错误副作用。

Using Resources and the Finally Block

The last parallel to draw with imperative programming is the cleaning up that can be done either by using a “Use of the finally block to clean up resources” or by using a “Java 7 try-with-resource construct”, both shown below:

命令式编程的最后一个并行处理是清理,可以通过使用“使用finally块清理资源”或通过使用“ Java 7 try-with-resource构造”来完成,两者均显示如下:

Example 13. Imperative use of finally
Stats stats = new Stats();
stats.startTimer();
try {
  doSomethingDangerous();
}
finally {
  stats.stopTimerAndRecordTiming();
}
Example 14. Imperative use of try-with-resource
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
  return disposableInstance.toString();
}

Both have their Reactor equivalents: doFinally and using.

两者都有其Reactor等效方法:doFinally和using。

doFinally is about side-effects that you want to be executed whenever the sequence terminates (with onComplete or onError) or is cancelled. It gives you a hint as to what kind of termination triggered the side-effect. The following example shows how to use doFinally:

doFinally与序列终止(用onComplete或onError或取消)时要执行的副作用有关。它提示您哪种终止方式会引起副作用。以下示例显示如何使用doFinally:

Reactive finally: doFinally()
Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();

Flux<String> flux =
Flux.just("foo", "bar")
    .doOnSubscribe(s -> stats.startTimer())
    .doFinally(type -> { (1)
        stats.stopTimerAndRecordTiming();(2)
        if (type == SignalType.CANCEL) (3)
          statsCancel.increment();
    })
    .take(1); (4)
1 doFinally consumes a SignalType for the type of termination. doFinally使用终止类型的SignalType。
2 Similarly to finally blocks, we always record the timing. 与“ finally”块类似,我们总是记录时间。
3 Here we also increment statistics in case of cancellation only. 在这里,我们还仅在取消的情况下增加统计信息。
4 take(1) cancels after one item is emitted. take(1) 表示 发出一项后即取消,

On the other hand, using handles the case where a Flux is derived from a resource and that resource must be acted upon whenever processing is done. In the following example, we replace the AutoCloseable interface of “try-with-resource” with a Disposable:

另一方面,“using”处理了“Flux”来自于资源,并且每当处理完成时都必须对资源进行操作。 在以下示例中,我们将“ try-with-resource”的“ AutoCloseable”界面替换为 一次性

Example 15. The Disposable resource
AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
    @Override
    public void dispose() {
        isDisposed.set(true); (4)
    }

    @Override
    public String toString() {
        return "DISPOSABLE";
    }
};

Now we can do the reactive equivalent of “try-with-resource” on it, which looks like the following:

现在,我们可以在其上执行与“ try-with-resource”等效的效果,如下所示:

Example 16. Reactive try-with-resource: using()
Flux<String> flux =
Flux.using(
        () -> disposableInstance, (1)
        disposable -> Flux.just(disposable.toString()), (2)
        Disposable::dispose (3)
);
1 The first lambda generates the resource. Here, we return our mock Disposable. 第一个lambda生成资源。在这里,我们返回我们的模拟Disposable
2 The second lambda processes the resource, returning a Flux<T>. 第二个lambda处理资源,返回Flux<T>。
3 The third lambda is called when the Flux from <2> terminates or is cancelled, to clean up resources. 当Fluxfrom <2>终止或取消时,将调用第三个lambda 以清理资源。
4 After subscription and execution of the sequence, the isDisposed atomic boolean becomes true. 订阅并执行序列后,isDisposed原子布尔值变为true。
Demonstrating the Terminal Aspect of onError

In order to demonstrate that all these operators cause the upstream original sequence to terminate when an error happens, we can use a more visual example with a Flux.interval. The interval operator ticks every x units of time with an increasing Long value. The following example uses an interval operator:

为了证明所有这些运算符都会在发生错误时使上游原始序列终止,我们可以使用带有的更直观的示例 Flux.interval。 该interval运算符每隔x单位时间滴答一次,递增Long值。以下示例使用interval运算符:

Flux<String> flux =
Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;
        throw new RuntimeException("boom");
    })
    .onErrorReturn("Uh oh");

flux.subscribe(System.out::println);
Thread.sleep(2100); (1)
1 Note that interval executes on a timer Scheduler by default. If we want to run that example in a main class, we would need to add a sleep call here so that the application does not exit immediately without any value being produced.

请注意,默认情况下interval会在计时器上执行Scheduler。如果要在主类中运行该示例,则需要在sleep此处添加一个调用,以便应用程序不会立即退出而不会产生任何值。

The preceding example prints out one line every 250ms, as follows:

前面的示例每250ms打印出一行,如下所示:

tick 0
tick 1
tick 2
Uh oh

Even with one extra second of runtime, no more tick comes in from the interval. The sequence was indeed terminated by the error.

即使再多一秒的运行时间,也不会再出现滴答声interval。错误确实终止了该序列。

Retrying

There is another operator of interest with regards to error handling, and you might be tempted to use it in the case described in the previous section. retry, as its name indicates, lets you retry an error-producing sequence.

关于错误处理,还有另一个有趣的运算符,在上一节中所述的情况下,您可能会想使用它。retry顾名思义,可让您重试产生错误的序列。

The thing to keep in mind is that it works by re-subscribing to the upstream Flux. This is really a different sequence, and the original one is still terminated. To verify that, we can re-use the previous example and append a retry(1) to retry once instead of using onErrorReturn. The following example shows how to do sl:

要记住的是,它通过重新订阅上游的Flux而起作用。这实际上是一个不同的序列,原始序列仍然终止。为了验证这一点,我们可以重用前面的示例,并在其中附加一个`retry(1)。 重试一次,而不要使用`onErrorReturn。 以下示例显示了如何执行sl:

Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;
        throw new RuntimeException("boom");
    })
    .retry(1)
    .elapsed() (1)
    .subscribe(System.out::println, System.err::println); (2)

Thread.sleep(2100); (3)
1 elapsed associates each value with the duration since previous value was emitted.
2 We also want to see when there is an onError.
3 Ensure we have enough time for our 4x2 ticks.

The preceding example produces the following output:

259,tick 0
249,tick 1
251,tick 2
506,tick 0 (1)
248,tick 1
253,tick 2
java.lang.RuntimeException: boom
1 A new interval started, from tick 0. The additional 250ms duration is coming from the 4th tick, the one that causes the exception and subsequent retry. 从滴答0开始一个新的interval。额外的250毫秒持续时间来自第4个滴答,它引起异常并随后重试。

As you can see from the preceding example, retry(1) merely re-subscribed to the original interval once, restarting the tick from 0. The second time around, since the exception still occurs, it gives up and propagates the error downstream.

从前面的示例中可以看到,retry(1)只重新订阅了interval 一次,从0重新开始了滴答。第二次,由于仍然发生异常,它放弃并向下游传播错误。

There is a more advanced version of retry (called retryWhen) that uses a “companion” Flux to tell whether or not a particular failure should retry. This companion Flux is created by the operator but decorated by the user, in order to customize the retry condition.

有一个更高级的版本retry(称为retryWhen),它使用“companion” Flux来告知是否应重试特定的故障。 该同伴Flux由操作员创建但由用户修饰,以自定义重试条件。

The companion Flux is a Flux<Throwable> that gets passed to a Function, the sole parameter of retryWhen. As the user, you define that function and make it return a new Publisher<?>. Retry cycles go as follows:

Companiond的Flux是Flux<Throwable>,它传递给retryWhen的唯一参数Function。 作为使用者,您可以定义该函数并使它返回new Publisher<?>。重试周期如下:

  1. Each time an error happens (giving potential for a retry), the error is emitted into the companion Flux, which has been decorated by your function. Having a Flux here gives a bird eye’s view of all the attempts so far.

  2. If the companion Flux emits a value, a retry happens.

  3. If the companion Flux completes, the error is swallowed, the retry cycle stops, and the resulting sequence completes, too.

  4. If the companion Flux produces an error (e), the retry cycle stops and the resulting sequence errors with e.

  5. 每次发生错误(重试的可能性)时,错误就会被发送到伴随Flux函数中,该伴随函数已由您的函数修饰。在Flux这里有一个鸟瞰图,可以看到到目前为止的所有尝试。

  6. 果同伴Flux发出一个值,则重试。

  7. 如果伴随程序Flux完成,则错误将被吞没,重试周期将停止,并且所生成的序列也将完成。

  8. 如果随companion Flux产生错误(e),则重试周期停止,并且产生的序列错误为e。

The distinction between the previous two cases is important. Simply completing the companion would effectively swallow an error. Consider the following way of emulating retry(3) by using retryWhen:

前两种情况之间的区别很重要。只需完成companion 即可有效地吞下一个错误。考虑使用retryWhen 以模拟 retry(3)方法:

Flux<String> flux = Flux
    .<String>error(new IllegalArgumentException()) (1)
    .doOnError(System.out::println) (2)
    .retryWhen(companion -> companion.take(3)); (3)
1 This continuously produces errors, calling for retry attempts.这会不断产生错误,要求重试。
2 doOnError before the retry lets us log and see all failures. doOnError 重试之前,让我们记录并查看所有失败
3 Here, we consider the first three errors as retry-able (take(3)) and then give up. 在这里,我们将前三个错误视为可重试(take(3)),然后放弃。

In effect, the preceding example results in an empty Flux, but it completes successfully. Since retry(3) on the same Flux would have terminated with the latest error, this retryWhen example is not exactly the same as a retry(3).

实际上,前面的示例导致为空Flux,但它成功完成。 由于 retry(3)同一Flux错误会因最新错误而终止,因此此retryWhen示例与并不完全与retry(3)相同。

Getting to the same behavior involves a few additional tricks:

Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
    .retryWhen(companion -> companion
    .zipWith(Flux.range(1, 4), (1)
          (error, index) -> { (2)
            if (index < 4) return index; (3)
            else throw Exceptions.propagate(error); (4)
          })
    );
1 Trick one: use zip and a range of "number of acceptable retries + 1". 技巧一:使用zip和range“可接受的重试次数+ 1”
2 The zip function lets you count the retries while keeping track of the original error. 该zip功能可让您计算重试次数,同时跟踪原始错误
3 To allow for three retries, indexes before 4 return a value to emit.为了允许进行三次重试,4之前的索引将返回一个要发出的值
4 In order to terminate the sequence in error, we throw the original exception after these three retries. 为了终止错误的序列,我们在这三个重试之后抛出了原始异常

达到相同的行为还涉及其他一些技巧:

You can use similar code to implement an “exponential backoff and retry” pattern, as shown in the FAQ.

您可以使用类似的代码来实现“指数退避和重试”模式,如FAQ中所示。

4.6.2. Handling Exceptions in Operators or Functions

In general, all operators can themselves contain code that potentially trigger an exception or calls to a user-defined callback that can similarly fail, so they all contain some form of error handling.

通常,所有运算符都可以自己包含可能触发异常的代码或对用户定义的回调的调用(同样可能失败),因此它们都包含某种形式的错误处理。

As a rule of thumb, an unchecked exception is always propagated through onError. For instance, throwing a RuntimeException inside a map function translates to an onError event, as the following code shows:

根据经验,始终会通过传播未经检查的异常onError。 例如,将函数RuntimeException内部的内容map转换为 onError事件,如以下代码所示:

Flux.just("foo")
    .map(s -> { throw new IllegalArgumentException(s); })
    .subscribe(v -> System.out.println("GOT VALUE"),
               e -> System.out.println("ERROR: " + e));

The preceding code prints out the following:

ERROR: java.lang.IllegalArgumentException: foo
You can tune the Exception before it is passed to onError, through the use of a hook.

您可以使用 hook 来调整Exception传递给它之前的状态。 onError

Reactor, however, defines a set of exceptions (such as OutOfMemoryError) that are always deemed to be fatal. See the Exceptions.throwIfFatal method. These errors mean that Reactor cannot keep operating and are thrown rather than propagated.

但是,Reactor定义了一组OutOfMemoryError总是被认为是致命的异常(例如)。 参见Exceptions.throwIfFatal方法。这些错误意味着Reactor无法继续运行,并且会被抛出而不是传播。

Internally, there are also cases where an unchecked exception still cannot be propagated (most notably during the subscribe and request phases), due to concurrency races that could lead to double onError or onComplete conditions. When these races happen, the error that cannot be propagated is “dropped”. These cases can still be managed to some extent by using customizable hooks. See Dropping Hooks.
在内部,由于并发竞争可能导致重复onError或onComplete情况,因此在某些情况下仍无法传播未经检查的异常(最值得注意的是在订阅和请求阶段)。 当发生这些竞争时,无法传播的错误将被“丢弃”。通过使用可定制的挂钩,仍可以在某种程度上管理这些情况。请参阅 Dropping Hooks

You may ask: “What about checked exceptions?”

您可能会问:“如何检查异常?”

If, for example, you need to call some method that declares it throws exceptions, you still have to deal with those exceptions in a try-catch block. You have several options, though:

例如,如果您需要调用某个声明其throws异常的方法,则仍必须在一个try-catch块中处理这些异常。不过,您有几种选择:

  1. Catch the exception and recover from it. The sequence continues normally.

  2. Catch the exception, wrap it into an unchecked exception, and then throw it (interrupting the sequence). The Exceptions utility class can help you with that (we get to that next).

  3. If you need to return a Flux (for example, you are in a flatMap), wrap the exception in an error-producing Flux, as follows: return Flux.error(checkedException). (The sequence also terminates.)

  4. 获异常并从中恢复。该序列正常继续。

  5. 捕获异常,将其包装为未经检查的异常,然后将其抛出(中断序列)。该Exceptions实用工具类可以帮你

  6. 如果需要返回Flux(例如,你在中flatMap),则将异常包装在一个错误生产器的`Flux`中,如下所示:return Flux.error(checkedException)。(该序列也终止。)

Reactor has an Exceptions utility class that you can use to ensure that exceptions are wrapped only if they are checked exceptions:

Reactor有一个Exceptions实用程序类,您可以使用它来确保仅是检查异常的情况下包装这些异常:

  • Use the Exceptions.propagate method to wrap exceptions, if necessary. It also calls throwIfFatal first and does not wrap RuntimeException.

  • Use the Exceptions.unwrap method to get the original unwrapped exception (going back to the root cause of a hierarchy of reactor-specific exceptions).

  • Exceptions.propagate如有必要,使用该方法包装异常。它还throwIfFatal先调用 ,并且不包装RuntimeException。

  • 使用该Exceptions.unwrap方法来获取原始的未包装的异常(返回到具体于reactor的异常的层次结构的根本原因)。

Consider the following example of a map that uses a conversion method that can throw an IOException:

考虑以下示例,该示例map使用可以引发的转换方法 IOException:

public String convert(int i) throws IOException {
    if (i > 3) {
        throw new IOException("boom " + i);
    }
    return "OK " + i;
}

Now imagine that you want to use that method in a map. You must now explicitly catch the exception, and your map function cannot re-throw it. So you can propagate it to the map’s onError method as a RuntimeException, as follows:

现在,假设您要在中使用该方法map。现在,您必须显式捕获异常,并且您的map函数无法将其重新引发。 因此,您可以将其像 RuntimeException 传播到Map的onError方法,如下所示:

Flux<String> converted = Flux
    .range(1, 10)
    .map(i -> {
        try { return convert(i); }
        catch (IOException e) { throw Exceptions.propagate(e); }
    });

Later on, when subscribing to the preceding Flux and reacting to errors (such as in the UI), you could revert back to the original exception if you want to do something special for IOExceptions. The following example shows how to do so:

稍后,在订阅上述内容Flux并对错误做出反应时(例如在UI中),如果您想对IOExceptions做一些特殊的事情,则可以恢复到原始异常。 以下示例显示了如何执行此操作:

converted.subscribe(
    v -> System.out.println("RECEIVED: " + v),
    e -> {
        if (Exceptions.unwrap(e) instanceof IOException) {
            System.out.println("Something bad happened with I/O");
        } else {
            System.out.println("Something bad happened");
        }
    }
);

4.7. Processors

Processors are a special kind of Publisher that are also a Subscriber. That means that you can subscribe to a Processor (generally, they implement Flux), but you can also call methods to manually inject data into the sequence or terminate it.

处理器是一种特殊的Publisher,同时也是一个Subscriber。 这意味着你可以subscribe到Processor(通常,他们实施Flux),但你也可以手动调用方法来注入数据的序列或终止它。

There are several kinds of Processors, each with a few particular semantics, but before you start looking into these, you need to ask yourself the following question:

处理器有几种,每种都有一些特殊的语义,但是在开始研究它们之前,您需要问自己以下问题:

4.7.1. Do I Need a Processor?

Most of the time, you should try to avoid using a Processor. They are harder to use correctly and prone to some corner cases.

大多数时候,您应该避免使用Processor。它们很难正确使用,并且容易出现极端情况。

If you think a Processor could be a good match for your use case, ask yourself if you have tried these two alternatives:

  1. Could an operator or combination of operators fit the bill? (See Which operator do I need?.)

  2. Could a “generator” operator work instead? (Generally, these operators are made to bridge APIs that are not reactive, providing a “sink” that is similar in concept to a Processor, in the sense that it lets you manually populate the sequence with data or terminate it).

If, after exploring the above alternatives, you still think you need a Processor, read the Overview of Available Processors section to learn about the different implementations.

4.7.2. Safely Produce from Multiple Threads by Using the Sink Facade

Rather than directly using Reactor Processors, it is a good practice to obtain a Sink for the Processor by calling sink() once.

FluxProcessor sinks safely gate multi-threaded producers and can be used by applications that generate data from multiple threads concurrently. For example, you can create a thread-safe serialized sink for UnicastProcessor by doing the following:

UnicastProcessor<Integer> processor = UnicastProcessor.create();
FluxSink<Integer> sink = processor.sink(overflowStrategy);

Multiple producer threads may concurrently generate data on the following serialized sink by doing the following:

sink.next(n);
Despite the FluxSink being adapted for multi-threaded manual feeding of the Processor, it is not possible to mix the subscriber approach with the sink approach: You have to either subscribe your FluxProcessor to a source Publisher or feed it manually though its FluxSink.

Overflow from next behaves in two possible ways, depending on the Processor and its configuration:

  • An unbounded processor handles the overflow itself by dropping or buffering.

  • A bounded processor blocks or “spins” on the IGNORE strategy or applies the overflowStrategy behavior specified for the sink.

4.7.3. Overview of Available Processors

Reactor Core comes with several flavors of Processor. Not all processors have the same semantics, but they are roughly split into three categories. The following list briefly describes the three kinds of processors:

  • direct (DirectProcessor and UnicastProcessor): These processors can push data only through direct user action (calling their methods of their Sink directly).

  • synchronous (EmitterProcessor and ReplayProcessor): These processors can either push data through user interaction or by subscribing to an upstream Publisher and synchronously draining it.

One way of publishing events onto different threads is to use the EmitterProcessor combined with publishOn(Scheduler). This can for example replace the former TopicProcessor, which was using Unsafe operations and has been moved to reactor-extra in 3.3.0.
Direct Processor

A direct Processor is a processor that can dispatch signals to zero or more Subscribers. It is the simplest one to instantiate, with a single DirectProcessor#create() static factory method. On the other hand, it has the limitation of not handling backpressure. As a consequence, a DirectProcessor signals an IllegalStateException to its subscribers if you push N elements through it but at least one of its subscribers has requested less than N.

Once the Processor has terminated (usually through its sink’s error(Throwable) or complete() methods being called), it lets more subscribers subscribe but replays the termination signal to them immediately.

Unicast Processor

A unicast Processor can deal with backpressure by using an internal buffer. The trade-off is that it can have at most one Subscriber.

A UnicastProcessor has a few more options than a direct processor, reflected by the existence of a few create static factory methods. For instance, by default, it is unbounded: If you push any amount of data through it while its Subscriber has not yet requested data, it buffers all of the data.

You can change this by providing a custom Queue implementation for the internal buffering in the create factory method. If that queue is bounded, the processor could reject the push of a value when the buffer is full and not enough requests from downstream have been received.

In that bounded case, you can also build the processor with a callback that is invoked on each rejected element, allowing for cleanup of these rejected elements.

Emitter Processor

An emitter Processor can emit to several subscribers while honoring backpressure for each of its subscribers. It can also subscribe to a Publisher and relay its signals synchronously.

Initially, when it has no subscriber, it can still accept a few data pushes up to a configurable bufferSize. After that point, if no Subscriber has come in and consumed the data, calls to onNext block until the processor is drained (which can happen only concurrently by then).

Thus, the first Subscriber to subscribe receives up to bufferSize elements upon subscribing. However, after that, the processor stops replaying signals to additional subscribers. These subsequent subscribers instead receive only the signals pushed through the processor after they have subscribed. The internal buffer is still used for backpressure purposes.

By default, if all of its subscribers are cancelled (which basically means they have all un-subscribed), it clears its internal buffer and stops accepting new subscribers. You can tune this by using the autoCancel parameter in the create static factory methods.

Replay Processor

A replay Processor caches elements that are either pushed directly through its sink() or elements from an upstream Publisher and replays them to late subscribers.

It can be created in multiple configurations:

  • Caching a single element (cacheLast).

  • Caching a limited history (create(int)) or an unbounded history (create()).

  • Caching a time-based replay window (createTimeout(Duration)).

  • Caching a combination of history size and time window (createSizeOrTimeout(int, Duration)).

5. Kotlin support

Kotlin is a statically-typed language targeting the JVM (and other platforms), which allows writing concise and elegant code while providing very good interoperability with existing libraries written in Java.

This section describes Reactor’s support for Kotlin.

5.1. Requirements

Reactor supports Kotlin 1.1+ and requires kotlin-stdlib (or one of its kotlin-stdlib-jre7 or kotlin-stdlib-jre8 variants).

5.2. Extensions

As of Dysprosium-M1 (ie. reactor-core 3.3.0.M1), Kotlin extensions are moved to a dedicated reactor-kotlin-extensions module with new package names that start with reactor.kotlin instead of simply reactor.

As a consequence, Kotlin extensions in reactor-core module are deprecated. The new dependency’s groupId and artifactId are:

io.projectreactor.kotlin:reactor-kotlin-extensions

Thanks to its great Java interoperability and to Kotlin extensions, Reactor Kotlin APIs leverage regular Java APIs and are additionally enhanced by a few Kotlin-specific APIs that are available out of the box within Reactor artifacts.

Keep in mind that Kotlin extensions need to be imported to be used. This means for example that the Throwable.toFlux Kotlin extension is available only if import reactor.kotlin.core.publisher.toFlux is imported. That said, similar to static imports, an IDE should automatically suggest the import in most cases.

For example, Kotlin reified type parameters provide a workaround for JVM generics type erasure, and Reactor provides some extensions to take advantage of this feature.

The following table compares Reactor with Java against Reactor with Kotlin and extensions:

Java

Kotlin with extensions

Mono.just("foo")

"foo".toMono()

Flux.fromIterable(list)

list.toFlux()

Mono.error(new RuntimeException())

RuntimeException().toMono()

Flux.error(new RuntimeException())

RuntimeException().toFlux()

flux.ofType(Foo.class)

flux.ofType<Foo>() or flux.ofType(Foo::class)

StepVerifier.create(flux).verifyComplete()

flux.test().verifyComplete()

The Reactor KDoc API lists and documents all the available Kotlin extensions.

5.3. Null Safety

One of Kotlin’s key features is null safety, which cleanly deals with null values at compile time rather than bumping into the famous NullPointerException at runtime. This makes applications safer through nullability declarations and expressive “value or no value” semantics without paying the cost of wrappers such as Optional. (Kotlin allows using functional constructs with nullable values. See this comprehensive guide to Kotlin null-safety.)

Although Java does not let one express null safety in its type-system, Reactor now provides null safety of the whole Reactor API through tooling-friendly annotations declared in the reactor.util.annotation package. By default, types from Java APIs used in Kotlin are recognized as platform types for which null-checks are relaxed. Kotlin support for JSR 305 annotations and Reactor nullability annotations provide null-safety for the whole Reactor API to Kotlin developers, with the advantage of dealing with null-related issues at compile time.

You can configure the JSR 305 checks by adding the -Xjsr305 compiler flag with the following options: -Xjsr305={strict|warn|ignore}.

For kotlin versions 1.1.50+, the default behavior is the same as -Xjsr305=warn. The strict value is required to have the Reactor API full null-safety taken into account but should be considered experimental, since the Reactor API nullability declaration could evolve even between minor releases, as more checks may be added in the future).

Nullability for generic type arguments, variable arguments, and array elements is not supported yet, but itshould be in an upcoming release. See this dicussion for up-to-date information.

6. Testing

Whether you have written a simple chain of Reactor operators or your own operator, automated testing is always a good idea.

Reactor comes with a few elements dedicated to testing, gathered into their own artifact: reactor-test. You can find that project on Github, inside of the reactor-core repository.

To use it in your tests, you must add it as a test dependency. The following example shows how to add reactor-test as a dependency in Maven:

Example 17. reactor-test in Maven, in <dependencies>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
    (1)
</dependency>
1 If you use the BOM, you do not need to specify a <version>.

The following example shows how to add reactor-test as a dependency in Gradle:

Example 18. reactor-test in Gradle, amend the dependencies block
dependencies {
   testCompile 'io.projectreactor:reactor-test'
}

The three main uses of reactor-test are as follows:

  • Testing that a sequence follows a given scenario, step-by-step, with StepVerifier.

  • Producing data in order to test the behavior of downstream operators (including you own operators) with TestPublisher.

  • In sequences that can go through several alternative Publisher (for example, a chain that uses switchIfEmpty, probing such a Publisher to ensure it was used (that is, subscribed to).

6.1. Testing a Scenario with StepVerifier

The most common case for testing a Reactor sequence is to have a Flux or a Mono defined in your code (for example, it might be returned by a method) and to want to test how it behaves when subscribed to.

This situation translates well to defining a “test scenario,” where you define your expectations in terms of events, step-by-step. You can ask and answer questions such as the following:

  • What is the next expected event?

  • Do you expect the Flux to emit a particular value?

  • Or maybe to do nothing for the next 300ms?

You can express all of that through the StepVerifier API.

For instance, you could have the following utility method in your codebase that decorates a Flux:

public <T> Flux<T> appendBoomError(Flux<T> source) {
  return source.concatWith(Mono.error(new IllegalArgumentException("boom")));
}

In order to test it, you want to verify the following scenario:

I expect this Flux to first emit thing1, then emit thing2, and then produce an error with the message, boom. Subscribe and verify these expectations.

In the StepVerifier API, this translates to the following test:

@Test
public void testAppendBoomError() {
  Flux<String> source = Flux.just("thing1", "thing2"); (1)

  StepVerifier.create( (2)
    appendBoomError(source)) (3)
    .expectNext("thing1") (4)
    .expectNext("thing2")
    .expectErrorMessage("boom") (5)
    .verify(); (6)
}
1 Since our method needs a source Flux, define a simple one for testing purposes.
2 Create a StepVerifier builder that wraps and verifies a Flux.
3 Pass the Flux to be tested (the result of calling our utility method).
4 The first signal we expect to happen upon subscription is an onNext, with a value of thing1.
5 The last signal we expect to happen is a termination of the sequence with an onError. The exception should have boom as a message.
6 It is important to trigger the test by calling verify().

The API is a builder. You start by creating a StepVerifier and passing the sequence to be tested. This offers a choice of methods that let you:

  • Express expectations about the next signals to occur. If any other signal is received (or the content of the signal does not match the expectation), the whole test fails with a meaningful AssertionError. For example, you might use expectNext(T…​) and expectNextCount(long).

  • Consume the next signal. This is used when you want to skip part of the sequence or when you want to apply a custom assertion on the content of the signal (for example, to check that there is an onNext event and assert that the emitted item is a list of size 5). For example, you might use consumeNextWith(Consumer<T>).

  • Take miscellaneous actions such as pausing or running arbitrary code. For example, if you want to manipulate a test-specific state or context. To that effect, you might use thenAwait(Duration) and then(Runnable).

For terminal events, the corresponding expectation methods (expectComplete() and expectError() and all their variants) switch to an API where you cannot express expectations anymore. In that last step, all you can do is perform some additional configuration on the StepVerifier and then trigger the verification, often with verify() or one of its variants.

What happens at this point is that the StepVerifier subscribes to the tested Flux or Mono and plays the sequence, comparing each new signal with the next step in the scenario. As long as these match, the test is considered a success. As soon as there is a discrepancy, an AssertionError is thrown.

Remember the verify() step, which triggers the verification. To help, the API includes a few shortcut methods that combine the terminal expectations with a call to verify(): verifyComplete(), verifyError(), verifyErrorMessage(String), and others.

Note that, if one of the lambda-based expectations throws an AssertionError, it is reported as is, failing the test. This is useful for custom assertions.

By default, the verify() method and derived shortcut methods (verifyThenAssertThat, verifyComplete(), and so on) have no timeout. They can block indefinitely. You can use StepVerifier.setDefaultTimeout(Duration) to globally set a timeout for these methods, or specify one on a per-call basis with verify(Duration).

6.1.1. Better Identifying Test Failures

StepVerifier provides two options to better identify exactly which expectation step caused a test to fail:

  • as(String): Used after most expect* methods to give a description to the preceding expectation. If the expectation fails, its error message contains the description. Terminal expectations and verify cannot be described that way.

  • StepVerifierOptions.create().scenarioName(String): By using StepVerifierOptions to create your StepVerifier, you can use the scenarioName method to give the whole scenario a name, which is also used in assertion error messages.

Note that, in both cases, the use of the description or name in messages is only guaranteed for StepVerifier methods that produce their own AssertionError (for example, throwing an exception manually or through an assertion library in assertNext does not add the description or name to the error’s message).

6.2. Manipulating Time

You can use StepVerifier with time-based operators to avoid long run times for corresponding tests. You can do so through the StepVerifier.withVirtualTime builder.

It looks like the following example:

StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
//... continue expectations here

This virtual time feature plugs in a custom Scheduler in Reactor’s Schedulers factory. Since these timed operators usually use the default Schedulers.parallel() scheduler, replacing it with a VirtualTimeScheduler does the trick. However, an important prerequisite is that the operator be instantiated after the virtual time scheduler has been activated.

To increase the chances that this happens correctly, the StepVerifier does not take a simple Flux as input. withVirtualTime takes a Supplier, which guides you into lazily creating the instance of the tested flux after having done the scheduler set up.

Take extra care to ensure the Supplier<Publisher<T>> can be used in a lazy fashion. Otherwise, virtual time is not guaranteed. Especially avoid instantiating the Flux earlier in the test code and having the Supplier return that variable. Instead, always instantiate the Flux inside the lambda.

There are two expectation methods that deal with time, and they are both valid with or without virtual time:

  • thenAwait(Duration): Pauses the evaluation of steps (allowing a few signals to occur or delays to run out).

  • expectNoEvent(Duration): Also lets the sequence play out for a given duration but fails the test if any signal occurs during that time.

Both methods pause the thread for the given duration in classic mode and advance the virtual clock instead in virtual mode.

expectNoEvent also considers the subscription as an event. If you use it as a first step, it usually fails because the subscription signal is detected. Use expectSubscription().expectNoEvent(duration) instead.

In order to quickly evaluate the behavior of our Mono.delay above, we can finish writing our code as follows:

StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
    .expectSubscription() (1)
    .expectNoEvent(Duration.ofDays(1)) (2)
    .expectNext(0L) (3)
    .verifyComplete(); (4)
1 See the preceding tip.
2 Expect nothing to happen for a full day.
3 Then expect a delay that emits 0.
4 Then expect completion (and trigger the verification).

We could have used thenAwait(Duration.ofDays(1)) above, but expectNoEvent has the benefit of guaranteeing that nothing happened earlier than it should have.

Note that verify() returns a Duration value. This is the real-time duration of the entire test.

Virtual time is not a silver bullet. All Schedulers are replaced with the same VirtualTimeScheduler. In some cases, you can lock the verification process because the virtual clock has not moved forward before an expectation is expressed, resulting in the expectation waiting on data that can only be produced by advancing time. In most cases, you need to advance the virtual clock for sequences to emit. Virtual time also gets very limited with infinite sequences, which might hog the thread on which both the sequence and its verification run.

6.3. Performing Post-execution Assertions with StepVerifier

After having described the final expectation of your scenario, you can switch to a complementary assertion API instead of triggering verify(). To do so, use verifyThenAssertThat() instead.

verifyThenAssertThat() returns a StepVerifier.Assertions object, which you can use to assert a few elements of state once the whole scenario has played out successfully (because it also calls verify()). Typical (albeit advanced) usage is to capture elements that have been dropped by some operator and assert them (see the section on Hooks).

6.4. Testing the Context

For more information about the Context, see Adding a Context to a Reactive Sequence.

StepVerifier comes with a couple of expectations around the propagation of a Context:

  • expectAccessibleContext: Returns a ContextExpectations object that you can use to set up expectations on the propagated Context. Be sure to call then() to return to the set of sequence expectations.

  • expectNoAccessibleContext: Sets up an expectation that NO Context can be propagated up the chain of operators under test. This most likely occurs when the Publisher under test is not a Reactor one or does not have any operator that can propagate the Context (for example, a generator source).

Additionally, you can associate a test-specific initial Context to a StepVerifier by using StepVerifierOptions to create the verifier.

These features are demonstrated in the following snippet:

StepVerifier.create(Mono.just(1).map(i -> i + 10),
				StepVerifierOptions.create().withInitialContext(Context.of("thing1", "thing2"))) (1)
		            .expectAccessibleContext() (2)
		            .contains("foo", "bar") (3)
		            .then() (4)
		            .expectNext(11)
		            .verifyComplete(); (5)
1 Create the StepVerifier by using StepVerifierOptions and pass in an initial Context
2 Start setting up expectations about Context propagation. This alone ensures that a Context was propagated.
3 An example of a Context-specific expectation. It must contain value "thing2" for key "thing1".
4 We then() switch back to setting up normal expectations on the data.
5 Let us not forget to verify() the whole set of expectations.

6.5. Manually Emitting with TestPublisher

For more advanced test cases, it might be useful to have complete mastery over the source of data, to trigger finely chosen signals that closely match the particular situation you want to test.

Another situation is when you have implemented your own operator and you want to verify how it behaves with regards to the Reactive Streams specification, especially if its source is not well behaved.

For both cases, reactor-test offers the TestPublisher class. This is a Publisher<T> that lets you programmatically trigger various signals:

  • next(T) and next(T, T…​) triggers 1-n onNext signals.

  • emit(T…​) triggers 1-n onNext signals and does complete().

  • complete() terminates with an onComplete signal.

  • error(Throwable) terminates with an onError signal.

You can get a well behaved TestPublisher through the create factory method. Also, you can create a misbehaving TestPublisher by using the createNonCompliant factory method. The latter takes a value or multiple values from the TestPublisher.Violation enum. The values define which parts of the specification the publisher can overlook. These enum values include:

  • REQUEST_OVERFLOW: Allows next calls to be made despite an insufficient request, without triggering an IllegalStateException.

  • ALLOW_NULL: Allows next calls to be made with a null value without triggering a NullPointerException.

  • CLEANUP_ON_TERMINATE: Allows termination signals to be sent several times in a row. This includes complete(), error(), and emit().

  • DEFER_CANCELLATION: Allows the TestPublisher to ignore cancellation signals and continue emitting signals as if the cancellation lost the race against said signals.

Finally, the TestPublisher keeps track of internal state after subscription, which can be asserted through its various assert* methods.

You can use it as a Flux or Mono by using the conversion methods, flux() and mono().

6.6. Checking the Execution Path with PublisherProbe

When building complex chains of operators, you could come across cases where there are several possible execution paths, materialized by distinct sub-sequences.

Most of the time, these sub-sequences produce a specific-enough onNext signal that you can assert that it was executed by looking at the end result.

For instance, consider the following method, which builds a chain of operators from a source and uses a switchIfEmpty to fall back to a particular alternative if the source is empty:

public Flux<String> processOrFallback(Mono<String> source, Publisher<String> fallback) {
    return source
            .flatMapMany(phrase -> Flux.fromArray(phrase.split("\\s+")))
            .switchIfEmpty(fallback);
}

You can test which logical branch of the switchIfEmpty was used, as follows:

@Test
public void testSplitPathIsUsed() {
    StepVerifier.create(processOrFallback(Mono.just("just a  phrase with    tabs!"),
            Mono.just("EMPTY_PHRASE")))
                .expectNext("just", "a", "phrase", "with", "tabs!")
                .verifyComplete();
}

@Test
public void testEmptyPathIsUsed() {
    StepVerifier.create(processOrFallback(Mono.empty(), Mono.just("EMPTY_PHRASE")))
                .expectNext("EMPTY_PHRASE")
                .verifyComplete();
}

However, think about an example where the method produces a Mono<Void> instead. It waits for the source to complete, performs an additional task, and completes. If the source is empty, a fallback Runnable-like task must be performed instead. The following example shows such a case:

private Mono<String> executeCommand(String command) {
    return Mono.just(command + " DONE");
}

public Mono<Void> processOrFallback(Mono<String> commandSource, Mono<Void> doWhenEmpty) {
    return commandSource
            .flatMap(command -> executeCommand(command).then()) (1)
            .switchIfEmpty(doWhenEmpty); (2)
}
1 then() forgets about the command result. It cares only that it was completed.
2 How to distinguish between two cases that are both empty sequences?

To verify that your processOrFallback method does indeed go through the doWhenEmpty path, you need to write a bit of boilerplate. Namely you need a Mono<Void> that:

  • Captures the fact that it has been subscribed to.

  • Lets you assert that fact after the whole process has terminated.

Before version 3.1, you would need to manually maintain one AtomicBoolean per state you wanted to assert and attach a corresponding doOn* callback to the publisher you wanted to evaluate. This could be a lot of boilerplate when having to apply this pattern regularly. Fortunately, 3.1.0 introduced an alternative with PublisherProbe. The following example shows how to use it:

@Test
public void testCommandEmptyPathIsUsed() {
    PublisherProbe<Void> probe = PublisherProbe.empty(); (1)

    StepVerifier.create(processOrFallback(Mono.empty(), probe.mono())) (2)
                .verifyComplete();

    probe.assertWasSubscribed(); (3)
    probe.assertWasRequested(); (4)
    probe.assertWasNotCancelled(); (5)
}
1 Create a probe that translates to an empty sequence.
2 Use the probe in place of Mono<Void> by calling probe.mono().
3 After completion of the sequence, the probe lets you assert that it was used. You can check that is was subscribed to…​
4 …​as well as actually requested data…​
5 …​and whether or not it was cancelled.

You can also use the probe in place of a Flux<T> by calling .flux() instead of .mono(). For cases where you need to probe an execution path but also need the probe to emit data, you can wrap any Publisher<T> by using PublisherProbe.of(Publisher).

7. Debugging Reactor

Switching from an imperative and synchronous programming paradigm to a reactive and asynchronous one can sometimes be daunting. One of the steepest steps in the learning curve is how to analyze and debug when something goes wrong.

从命令式和同步编程范例切换到被动和异步编程范例有时会令人生畏。 学习曲线中最陡峭的步骤之一是出现问题时如何进行分析和调试。

In the imperative world, debugging is usually pretty straightforward. You can read the stacktrace and see where the problem originated. Was it entirely a failure of your code? Did the failure occur in some library code? If so, what part of your code called the library, potentially passing in improper parameters that ultimately caused the failure?

在命令式世界中,调试通常非常简单。您可以阅读stacktrace并查看问题的根源。这完全是您的代码失败吗? 故障是否发生在某些库代码中?如果是这样,您的代码的哪一部分调用了库,可能会传入不正确的参数,最终导致失败?

7.1. The Typical Reactor Stack Trace

When you shift to asynchronous code, things can get much more complicated.

当您转向异步代码时,事情会变得更加复杂。

Consider the following stack trace:

考虑以下堆栈跟踪:

A typical Reactor stack trace

典型的Reactor堆栈跟踪

java.lang.IndexOutOfBoundsException: Source emitted more than one item
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:445)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:379)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
	at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
	at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:332)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
	at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
	at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
	at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97)
	at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3096)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:3204)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3090)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3057)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3029)
	at reactor.guide.GuideTests.debuggingCommonStacktrace(GuideTests.java:995)

There is a lot going on there. We get an IndexOutOfBoundsException, which tells us that a source emitted more than one item.

那里发生了很多事情。我们得到一个IndexOutOfBoundsException,告诉我们一个`source emitted more than one item`.

We can probably quickly come to assume that this source is a Flux or a Mono, as confirmed by the next line, which mentions MonoSingle. So it appears to be some sort of complaint from a single operator.

正如下一行提到的所示,我们可能很快就可以假定此源是Flux或Mono MonoSingle。因此,这似乎是single操作员的某种抱怨。

Referring to the Javadoc for the Mono#single operator, we see that single has a contract: The source must emit exactly one element. It appears we had a source that emitted more than one and thus violated that contract.

参考Mono#single操作符的Javadoc时,我们看到single有一个约定:源必须确切地发出一个元素。看来我们有一个排放源不止一个,因此违反了该合同。

Can we dig deeper and identify that source? The following rows are not very helpful. They take us through the internals of what seems to be a reactive chain, through multiple calls to subscribe and request.

我们可以更深入地挖掘并确定来源吗?以下几行不是很有帮助。 带我们进入似乎是一个reactive 链的内部。进过了多次调用subscribe和request,

By skimming over these rows, we can at least start to form a picture of the kind of chain that went wrong: It seems to involve a MonoSingle, a FluxFlatMap, and a FluxRange (each gets several rows in the trace, but overall these three classes are involved). So a range().flatMap().single() chain maybe?

通过略读这些行,我们至少可以开始形成一条错误链的图:似乎涉及到MonoSingle,a FluxFlatMap和a FluxRange (每个都在跟踪中得到几行,但总体上涉及这三个类)。 那么range().flatMap().single()也许是一个调用链?

But what if we use that pattern a lot in our application? This still does not tell us much, and simply searching for single is not going to find the problem. Then the last line refers to some of our code. Finally, we are getting close.

但是,如果我们在应用程序中频繁使用该模式怎么办?这仍然不能告诉我们太多,仅搜索single是不会发现问题的。 然后,最后一行引用了我们的一些代码。最后,我们越来越近了。

Hold on, though. When we go to the source file, all we see is that a pre-existing Flux is subscribed to, as follows:

等一下。转到源文件时,我们看到 之前的退出的Flux 是已被预订了,如下所示:

toDebug.subscribe(System.out::println, Throwable::printStackTrace);

All of this happened at subscription time, but the Flux itself was not declared there. Worse, when we go to where the variable is declared, we see the following:

所有这些都是在订阅时发生的,但Flux本身并未在此处声明。更糟糕的是,当我们转到声明变量的位置时,会看到以下内容:

public Mono<String> toDebug; //please overlook the public class attribute

The variable is not instantiated where it is declared. We must assume a worst-case scenario where we find out that there could be a few different code paths that set it in the application. We remain unsure of which one caused the issue.

变量未在声明的地方实例化。 我们必须假设在最坏的情况下,我们发现可能在应用程序中设置了一些不同的代码路径。我们仍然不确定是哪一个引起了问题。

This is kind of the Reactor equivalent of a runtime error, as opposed to a compilation error.

What we want to find out more easily is where the operator was added into the chain - that is, where the Flux was declared. We usually refer to that as the “assembly” of the Flux.

我们想要更容易发现的是将运算符添加到链中的位置-即Flux声明的位置。我们通常将其称为Flux的“assembly” 。

7.2. Activating Debug Mode - aka tracebacks

this section describes the easiest but also the slowest way to enable the debugging capabitilies due to the fact that it captures the stacktrace on every operator. See The checkpoint() Alternative for a more fine grained way of debugging, and Production-ready Global Debugging for a more advanced and performant global option.
本节介绍了启用调试功能的最简单但也是最慢的方法,因为它捕获了每个运算符上的stacktrace。 有关更细粒度的调试方法,请参见The checkpoint() Alternative;有关更高级和高性能的全局选项,请参见 Production-ready Global Debugging

Even though the stacktrace was still able to convey some information for someone with a bit of experience, we can see that it is not ideal by itself in more advanced cases.

即使stacktrace仍然能够为有经验的人传达一些信息,但我们可以看到,在更高级的情况下,它本身并不理想。

Fortunately, Reactor comes with assembly-time instrumentation that is designed for debugging.

幸运的是,Reactor带有专为调试而设计的组装时工具。

This is done by customizing the Hooks.onOperator hook at application start (or at least before the incriminated Flux or Mono can be instantiated), as follows:

这是通过Hooks.onOperator在应用程序启动时(或至少在包含Flux或Mono可实例化之前)自定义钩子来完成的,如下所示:

Hooks.onOperatorDebug();

This starts instrumenting the calls to the Flux (and Mono) operator methods (where they are assembled into the chain) by wrapping the construction of the operator and capturing a stack trace there. Since this is done when the operator chain is declared, the hook should be activated before that, so the safest way is to activate it right at the start of your application.

通过包装操作符的结构并捕获其中的堆栈跟踪信息,开始对Flux(和Mono)操作符方法(将它们组装到链中)的调用进行检测。 由于此操作是在声明操作员链时完成的,因此应在此之前将钩子激活,因此最安全的方法是在应用程序开始时立即将其激活。

Later on, if an exception occurs, the failing operator is able to refer to that capture and append it to the stack trace. We call this captured assembly information a traceback.

稍后,如果发生异常,则失败的运算符可以引用该捕获并将其附加到堆栈跟踪中。我们将此捕获的程序集信息称为回溯。

In the next section, we see how the stack trace differs and how to interpret that new information.

在下一节中,我们将了解堆栈跟踪的不同之处以及如何解释该新信息。

7.3. Reading a Stack Trace in Debug Mode

When we reuse our initial example but activate the operatorStacktrace debug feature, the stack trace is as follows:

当我们重用最初的示例但激活operatorStacktrace调试功能时,堆栈跟踪如下:

java.lang.IndexOutOfBoundsException: Source emitted more than one item
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:375) (1)
...
(2)
...
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:3204)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3090)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3057)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3029)
	at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1000)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: (3)
Assembly trace from producer [reactor.core.publisher.MonoSingle] : (4)
	reactor.core.publisher.Flux.single(Flux.java:6676)
	reactor.guide.GuideTests.scatterAndGather(GuideTests.java:949)
	reactor.guide.GuideTests.populateDebug(GuideTests.java:962)
	org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
	org.junit.rules.RunRules.evaluate(RunRules.java:20)
Error has been observed by the following operator(s): (5)
	|_	Flux.single ⇢ reactor.guide.GuideTests.scatterAndGather(GuideTests.java:949) (6)
1 我们看到包装操作符捕获堆栈。
2 除此之外,堆栈跟踪的第一部分在大多数情况下仍然相同,显示了操作员内部的一些内容(因此我们在此处删除了一些代码段).
3 这是回溯开始出现的地方.
4 首先,我们获得一些有关操作员组装位置的详细信息.
5 当错误通过操作员链从头到尾(错误站点到订阅站点)传播时,我们还可以追溯到该错误.
6 提及错误的每个操作符均会与用户类别和使用错误的行一起提及

The captured stack trace is appended to the original error as a suppressed OnAssemblyException. There are two parts to it, but the first section is the most interesting. It shows the path of construction for the operator that triggered the exception. Here, it shows that the single that caused our issue was created in the scatterAndGather method, itself called from a populateDebug method that got executed through JUnit.

捕获的堆栈跟踪将被抑制后附加到原始错误OnAssemblyException。它有两个部分,但是第一部分是最有趣的。它显示了触发异常的操作员的构造路径。 在这里,它表明single导致我们问题的是在scatterAndGather方法中创建的,该 方法本身是populateDebug通过JUnit执行的方法调用的。

Now that we are armed with enough information to find the culprit, we can have a meaningful look at that scatterAndGather method:

既然我们已经掌握了足够的信息来找到罪魁祸首,我们就可以对该scatterAndGather方法进行有自信的排查:

private Mono<String> scatterAndGather(Flux<String> urls) {
    return urls.flatMap(url -> doRequest(url))
           .single(); (1)
}
1 Sure enough, here is our single.

Now we can see what the root cause of the error was a flatMap that performs several HTTP calls to a few URLs but that is chained with single, which is too restrictive. After a short git blame and a quick discussion with the author of that line, we find out he meant to use the less restrictive take(1) instead.

现在,我们可以看到错误的根本原因是flatMap它对几个URL执行了几次HTTP调用,但是与链接在一起single,这太过严格了。 在git blame与该行的作者进行简短简短的讨论之后,我们发现他的意思是使用限制性较小的take(1)标签。

We have solved our problem.

我们已经解决了我们的问题。

Now consider the following line in the stack trace:

现在考虑堆栈跟踪中的以下行:

Error has been observed by the following operator(s):

That second part of the debug stack trace was not necessarily interesting in this particular example, because the error was actually happening in the last operator in the chain (the one closest to subscribe). Considering another example might make it more clear:

在此特定示例中,调试堆栈跟踪的第二部分不一定是有趣的,因为该错误实际上发生在链中的最后一个运算符中(最接近的那个subscribe)。 考虑另一个示例可能会更清楚:

FakeRepository.findAllUserByName(Flux.just("pedro", "simon", "stephane"))
              .transform(FakeUtils1.applyFilters)
              .transform(FakeUtils2.enrichUser)
              .blockLast();

Now imagine that, inside findAllUserByName, there is a map that fails. Here, we would see the following final traceback:

现在想象一下,在内部findAllUserByName,有一个map失败。在这里,我们将看到以下最终回溯:

Error has been observed by the following operator(s):
	|_	Flux.map ⇢ reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:27)
	|_	Flux.map ⇢ reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:28)
	|_	Flux.filter ⇢ reactor.guide.FakeUtils1.lambda$static$1(FakeUtils1.java:29)
	|_	Flux.transform ⇢ reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:40)
	|_	Flux.elapsed ⇢ reactor.guide.FakeUtils2.lambda$static$0(FakeUtils2.java:30)
	|_	Flux.transform ⇢ reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:41)

This corresponds to the section of the chain of operators that gets notified of the error:

  1. The exception originates in the first map.

  2. It is seen by a second map (both in fact correspond to the findAllUserByName method).

  3. It is then seen by a filter and a transform, which indicate that part of the chain is constructed by a reusable transformation function (here, the applyFilters utility method).

  4. Finally, it is seen by an elapsed and a transform. Once again, elapsed is applied by the transformation function of that second transform.

这对应于操作员链中获得该错误通知的部分:

  1. 异常起源于第一个map。

  2. 一秒钟就可以看到它map(两者实际上都对应于该findAllUserByName 方法)。

  3. 然后通过a filter和a 看到transform,这表明链的一部分是由可重用的转换函数(此处为applyFiltersUtility方法)构造的。

  4. 最后,由elapsed和看到transform。再次elapsed由第二个转换的转换函数应用。

As tracebacks are appended to original errors as suppressed exceptions, this can somewhat interfere with another type of exception that uses this mechanism: composite exceptions. Such exceptions can be created directly via Exceptions.multiple(Throwable…​), or by some operators that might join multiple erroring sources (like Flux#flatMapDelayError). They can be unwrapped into a List via Exceptions.unwrapMultiple(Throwable), in which case the traceback would be considered a component of the composite and be part of the returned List. If that is somehow not desirable, tracebacks can be identified thanks to Exceptions.isTraceback(Throwable) check, and excluded from such an unwrap by using Exceptions.unwrapMultipleExcludingTracebacks(Throwable) instead.

We deal with a form of instrumentation here, and creating a stack trace is costly. That is why this debugging feature should only be activated in a controlled manner, as a last resort.

我们在这里处理一种形式的检测,并且创建堆栈跟踪非常昂贵。这就是为什么只能以可控制的方式激活此调试功能的原因。

7.3.1. The checkpoint() Alternative

The debug mode is global and affects every single operator assembled into a Flux or a Mono inside the application. This has the benefit of allowing after-the-fact debugging: Whatever the error, we can obtain additional information to debug it.

调试模式是全局的,会影响组装到应用程序内部Flux或 Mono内部的每个操作员。 这样做的好处是可以进行事后调试:无论发生什么错误,我们都可以获取其他信息来对其进行调试。

As we saw earlier, this global knowledge comes at the cost of an impact on performance (due to the number of populated stack traces). That cost can be reduced if we have an idea of likely problematic operators. However, we usually do not know which operators are likely to be problematic unless we observed an error in the wild, saw we were missing assembly information, and then modified the code to activate assembly tracking, hoping to observe the same error again.

正如我们前面所看到的,这种全局性知识是以影响性能为代价的(由于填充的堆栈跟踪的数量)。如果我们知道有问题的操作符,有可能降低消耗成本。 但是,通常我们不知道哪个运算符可能有问题,除非我们在野外观察到错误,看到我们缺少程序集信息,然后修改代码以激活程序集跟踪,希望再次观察到相同的错误。

In that scenario, we have to switch into debugging mode and make preparations in order to better observe a second occurrence of the error, this time capturing all the additional information.

在这种情况下,我们必须切换到调试模式并进行准备,以便更好地观察第二次出现错误,这次捕获了所有其他信息。

If you can identify reactive chains that you assemble in your application for which serviceability is critical, you can achieve a mix of both techniques with the checkpoint() operator.

如果您可以确定在应用程序中组装的对可维护性至关重要的反应式链,则可以与checkpoint()操作员一起实现这两种技术的混合 。

You can chain this operator into a method chain. The checkpoint operator works like the hook version but only for its link of that particular chain.

您可以将此运算符链接到方法链中。该checkpoint运算符的工作方式类似于钩版本,但仅针对其特定链的链接

There is also a checkpoint(String) variant that lets you add a unique String identifier to the assembly traceback. This way, the stack trace is omitted and you rely on the description to identify the assembly site. checkpoint(String) imposes less processing cost than a regular checkpoint.

还有一个checkpoint(String)变体,可让您String向程序集追溯添加唯一标识符。 这样,将忽略堆栈跟踪,而您依靠描述来标识组装位置。checkpoint(String)比普通的checkpoint处理成本更低。

checkpoint(String) includes “light” in its output (which can be handy when searching), as shown in the following example:

checkpoint(String) 在其输出中包括“light”(在搜索时可以方便使用),如以下示例所示:

...
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly site of producer [reactor.core.publisher.ParallelSource] is identified by light checkpoint [light checkpoint identifier].

Last but not least, if you want to add a more generic description to the checkpoint but still rely on the stack trace mechanism to identify the assembly site, you can force that behavior by using the checkpoint("description", true) version. We are now back to the initial message for the traceback, augmented with a description, as shown in the following example:

最后但并非最不重要的一点是,如果您想向检查点添加更通用的描述,但仍依靠堆栈跟踪机制来标识组装站点,则可以通过使用checkpoint("description", true)版本来强制执行该行为。 现在,我们返回到用于追溯的初始消息,并以进行了扩展description,如以下示例所示:

Assembly trace from producer [reactor.core.publisher.ParallelSource], described as [descriptionCorrelation1234] : (1)
	reactor.core.publisher.ParallelFlux.checkpoint(ParallelFlux.java:215)
	reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225)
Error has been observed by the following operator(s):
	|_	ParallelFlux.checkpoint ⇢ reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225)
1 descriptionCorrelation1234 is the description provided in the checkpoint. descriptionCorrelation1234是中提供的说明checkpoint。

The description could be a static identifier or user-readable description or a wider correlation ID (for instance, coming from a header in the case of an HTTP request).

When both global debugging and local checkpoint() are enabled, checkpointed snapshot stacks are appended as suppressed error output after the observing operator graph and following the same declarative order.

7.4. Production-ready Global Debugging

Project Reactor comes with a separate Java Agent that instruments your code and adds debugging info without paying the cost of capturing the stacktrace on every operator call. The behaviour is very similar to Activating Debug Mode - aka tracebacks, but without the runtime performance overhead.

Project Reactor带有一个单独的Java代理,可对您的代码进行检测并添加调试信息,而无需话费每次操作调用时捕获stacktrace的消耗。 该行为与 Activating Debug Mode - aka tracebacks(也称为回溯)非常相似,但没有运行时性能开销。

To use it in your app, you must add it as a dependency.

要在您的应用程序中使用它,必须将其添加为依赖项。

The following example shows how to add reactor-tools as a dependency in Maven:

下面的示例显示如何reactor-tools在Maven中添加为依赖项:

Example 19. reactor-tools in Maven, in <dependencies>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-tools</artifactId>
    (1)
</dependency>
1 If you use the BOM, you do not need to specify a <version>.

The following example shows how to add reactor-tools as a dependency in Gradle:

Example 20. reactor-tools in Gradle, amend the dependencies block
dependencies {
   compile 'io.projectreactor:reactor-tools'
}

It also needs to be explicitly initialized with:

还需要使用以下命令显式初始化它:

ReactorDebugAgent.init();
Since the implementation will instrument your classes when they are loaded, the best place to put it is before everything else in your main(String[]) methood:
由于该实现将在加载类时对它们进行检测,因此放置它的最佳位置是在main(String [])方法中的所有其他内容之前:
public static void main(String[] args) {
    ReactorDebugAgent.init();
    SpringApplication.run(Application.class, args);
}

You may also re-process existing classes if you cannot run the init eagerly (e.g. in the tests):

如果您不能急于运行init(例如在测试中),也可以重新处理现有的类:

ReactorDebugAgent.init();
ReactorDebugAgent.processExistingClasses();
Be aware that the re-processing takes a couple of seconds due to the need to iterate over all loaded classes and apply the transformation. Use it only if you see that some call-sites are not instrumented.
请注意,由于需要遍历所有已加载的类并应用转换,因此重新处理需要花费几秒钟的时间。仅当看到某些呼叫站点没有检测到时,才使用它。

7.4.1. Limitations

ReactorDebugAgent is implemented as a Java Agent and uses ByteBuddy to perform the self-attach. Self-attach may not work on some JVMs, please refer to ByteBuddy’s documentation for more details.

ReactorDebugAgent作为Java代理实现,并使用ByteBuddy 进行自我附加。自连接可能不适用于某些JVM,请参阅ByteBuddy的文档以获取更多详细信息。

7.5. Logging a Sequence

In addition to stack trace debugging and analysis, another powerful tool to have in your toolkit is the ability to trace and log events in an asynchronous sequence.

除了堆栈跟踪调试和分析之外,工具包中另一个强大的工具是能够以异步顺序跟踪和记录事件。

The log() operator can do just that. Chained inside a sequence, it peeks at every event of the Flux or Mono upstream of it (including onNext, onError, and onComplete as well as subscriptions, cancellations, and requests).

log()操作符可以做到这一点。序列内部链的,它在偷窥的每个在Flux或Mono事件上游的事件(包括onNext,onError,和 onComplete以及订阅,取消和请求)。
A note on logging implementation

The log operator uses the Loggers utility class, which picks up common logging frameworks such as Log4J and Logback through SLF4J and defaults to logging to the console if SLF4J is unavailable.

这个log操作者使用Loggers工具类,其拾取通过共同日志框架如Log4J的和的logback SLF4J,缺省值为登录到控制台如果SLF4J不可用。

The console fallback uses System.err for the WARN and ERROR log levels and System.out for everything else.

控制台回调System.err用于WARN和ERROR日志级别以及 System.out其他所有情况。

If you prefer a JDK java.util.logging fallback, as in 3.0.x, you can get it by setting the reactor.logging.fallback system property to JDK.

如果您喜欢java.util.logging在JDK3.0.x中使用,则可以通过将reactor.logging.fallbacksy JDK stem属性设置为来获得它。

In all cases, when logging in production you should take care to configure the underlying logging framework to use its most asynchronous and non-blocking approach — for instance, an AsyncAppender in Logback or AsyncLogger in Log4j 2.

在所有情况下,在生产环境中登录时,都应谨慎配置底层日志记录框架,以使用其最异步和非阻塞的方法  , 例如,AsyncAppender在Logback 或 AsyncLogger 在 Log4j2中。

For instance, suppose we have Logback activated and configured and a chain like range(1,10).take(3). By placing a log() before the take, we can get some insight into how it works and what kind of events it propagates upstream to the range, as the following example shows:

例如,假设我们已激活并配置了Logback以及类似的链 range(1,10).take(3)。 通过在take之前放置log(),我们可以深入了解其工作原理以及它将向上游传播到范围的事件的种类,如以下示例所示:

Flux<Integer> flux = Flux.range(1, 10)
                         .log()
                         .take(3);
flux.subscribe();

This prints out the following (through the logger’s console appender):

这将打印出以下内容(通过记录器的控制台附加):

10:45:20.200 [main] INFO  reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) (1)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | request(unbounded) (2)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(1) (3)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(2)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(3)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | cancel() (4)

Here, in addition to the logger’s own formatter (time, thread, level, message), the log() operator outputs a few things in its own format:

1 reactor.Flux.Range.1 is an automatic category for the log, in case you use the operator several times in a chain. It lets you distinguish which operator’s events are logged (in this case, the range). You can overwrite the identifier with your own custom category by using the log(String) method signature. After a few separating characters, the actual event gets printed. Here, we get an onSubscribe call, a request call, three onNext calls, and a cancel call. For the first line, onSubscribe, we get the implementation of the Subscriber, which usually corresponds to the operator-specific implementation. Between square brackets, we get additional information, including whether the operator can be automatically optimized through synchronous or asynchronous fusion.
2 On the second line, we can see that an unbounded request was propagated up from downstream.
3 Then the range sends three values in a row.
4 On the last line, we see cancel().

The last line, (4), is the most interesting. We can see the take in action there. It operates by cutting the sequence short after it has seen enough elements emitted. In short, take() causes the source to cancel() once it has emitted the user-requested amount.

最后一行(4)是最有趣的。我们可以take在那里看到实际的效果。 在看到足够多的元素发射之后,它会通过缩短序列来进行操作。简而言之,take()使源转变为cancel()的原因是让源发送完一次用户请求的数量。

8. Exposing Reactor metrics

Project Reactor is a library designed for performance and better utilization of resources. But to truly understand the performance of a system, it is best to be able to monitor its various components.

This is why Reactor provides a built-in integration with Micrometer.

If Micrometer is not on the classpath, metrics will be a no-op.

8.1. Scheduler metrics

Every async operation in Reactor is done via the Scheduler abstraction described in Threading and Schedulers. This is why it is important to monitor your schedulers, watch out for key metrics that start to look suspicious and react accordingly.

To enable scheduler metrics, you will need to use the following method:

Schedulers.enableMetrics();
The instrumentation is performed when a scheduler is created. It is recommended to call this method as early as possible.
If you’re using Spring Boot, it is a good idea to place the invocation before SpringApplication.run(Application.class, args) call.

Once scheduler metrics are enabled and provided it is on the classpath, Reactor will use Micrometer’s support for instrumenting the executors that back most schedulers.

Please refer to Micrometer’s documentation for the exposed metrics, such as:

  • executor_active_threads

  • executor_completed_tasks_total

  • executor_pool_size_threads

  • executor_queued_tasks

  • executor_secounds_{count, max, sum}

Since one scheduler may have multiple executors, every executor metric has a reactor_scheduler_id tag.

Grafana + Prometheus users can use a pre-built dashboard which includes panels for threads, completed tasks, task queues and other handy metrics.

8.2. Publisher metrics

Sometimes it is useful to be able to record metrics at some stage in your reactive pipeline.

One way to do it would be to manually push the values to your metrics backend of choice. Another option would be to use Reactor’s built-in metrics integration for Flux/Mono and interpret them.

Consider the following pipeline:

listenToEvents()
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();

To enable the metrics for this source Flux (returned from listenToEvents()), we need to give it a name and turn on the metrics collecting:

listenToEvents()
    .name("events") (1)
    .metrics() (2)
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();
1 Every metric at this stage will be identified as "events".
2 Flux#metrics operator enables the reporting of metrics and uses the last known name up in the pipeline.

Just adding these two operators will expose a whole bunch of useful metrics!

metric name type description

reactor.subscribed

Counter

Counts how many Reactor sequences have been subscribed to

reactor.malformed.source

Counter

Counts the number of events received from a malformed source (ie an onNext after an onComplete)

reactor.requested

DistributionSummary

Counts the amount requested to a named Flux by all subscribers, until at least one requests an unbounded amount

reactor.onNext.delay

Timer

Measures delays between onNext signals (or between onSubscribe and first onNext)

reactor.flow.duration

Timer

Times the duration elapsed between a subscription and the termination or cancellation of the sequence. A status tag is added to specify what event caused the timer to end (onComplete, onError, cancel).

Want to know how many times your event processing has restarted due to some error? Read reactor.subscribed, because retry() operator will re-subscribe to the source publisher on error.

Interested in "events per second" metric? Measure the rate of reactor.onNext.delay 's count.

Want to be alerted when the listener throws an error? reactor.flow.duration with status=error tag is your friend.

8.2.1. Common tags

Every metric will have the following tags in common:

tag name description example

type

Publisher’s type

"Mono"

flow

current flow’s name, set by .name() operator

"events"

8.2.2. Custom tags

Users are allowed to add custom tags to their reactive chains:

listenToEvents()
    .tag("source", "kafka") (1)
    .name("events")
    .metrics() (2)
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();
1 Set a custom tag "source" to value "kafka".
2 All reported metrics will have source=kafka tag assigned in addition to the common tags described above.

9. Advanced Features and Concepts

This chapter covers advanced features and concepts of Reactor, including the following:

本章介绍了Reactor的高级功能和概念,其中包括:

9.1. Mutualizing Operator Usage

From a clean-code perspective, code reuse is generally a good thing. Reactor offers a few patterns that can help you reuse and mutualize code, notably for operators or combinations of operators that you might want to apply regularly in your codebase. If you think of a chain of operators as a recipe, you can create a “cookbook” of operator recipes.

从干净代码的角度来看,代码重用通常是一件好事。Reactor提供了一些模式,可以帮助您重用和互用代码,特别是对于您可能希望在代码库中定期应用的运算符或运算符组合。 如果您将一连串的操作符视为配方,则可以创建一个“菜谱”操作符配方。

9.1.1. Using the transform Operator

The transform operator lets you encapsulate a piece of an operator chain into a function. That function is applied to an original operator chain at assembly time to augment it with the encapsulated operators. Doing so applies the same operations to all the subscribers of a sequence and is basically equivalent to chaining the operators directly. The following code shows an example:

这个transform操作可让您封装了一块操作链成一个函数。该功能在组装时应用于原始运算符链,以使用封装的运算符进行扩充。这样做会将相同的操作应用于序列的所有订户,并且基本上等效于直接链接运算符。以下代码显示了一个示例:

Function<Flux<String>, Flux<String>> filterAndMap =
f -> f.filter(color -> !color.equals("orange"))
      .map(String::toUpperCase);

Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
	.doOnNext(System.out::println)
	.transform(filterAndMap)
	.subscribe(d -> System.out.println("Subscriber to Transformed MapAndFilter: "+d));

The following image shows how the transform operator encapsulates flows:

Transform Operator : encapsulate flows

The preceding example produces the following output:

前面的示例产生以下输出:

blue
Subscriber to Transformed MapAndFilter: BLUE
green
Subscriber to Transformed MapAndFilter: GREEN
orange
purple
Subscriber to Transformed MapAndFilter: PURPLE

9.1.2. Using the transformDeferred Operator

The transformDeferred operator is similar to transform and also lets you encapsulate operators in a function. The major difference is that this function is applied to the original sequence on a per-subscriber basis. It means that the function can actually produce a different operator chain for each subscription (by maintaining some state). The following code shows an example:

transformDeferred运算符与transform类似,而且也可以让你在一个函数内封装运算符。主要区别在于,此功能按每个订阅者应用于原始序列。 这意味着该函数实际上可以为每个订阅者生成一个不同的运算符链(通过维护某种状态)。以下代码显示了一个示例:

AtomicInteger ai = new AtomicInteger();
Function<Flux<String>, Flux<String>> filterAndMap = f -> {
	if (ai.incrementAndGet() == 1) {
return f.filter(color -> !color.equals("orange"))
        .map(String::toUpperCase);
	}
	return f.filter(color -> !color.equals("purple"))
	        .map(String::toUpperCase);
};

Flux<String> composedFlux =
Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
    .doOnNext(System.out::println)
    .transformDeferred(filterAndMap);

composedFlux.subscribe(d -> System.out.println("Subscriber 1 to Composed MapAndFilter :"+d));
composedFlux.subscribe(d -> System.out.println("Subscriber 2 to Composed MapAndFilter: "+d));

The following image shows how the transformDeferred operator works with per-subscriber transformations:

Compose Operator : Per Subscriber transformation

The preceding example produces the following output:

blue
Subscriber 1 to Composed MapAndFilter :BLUE
green
Subscriber 1 to Composed MapAndFilter :GREEN
orange
purple
Subscriber 1 to Composed MapAndFilter :PURPLE
blue
Subscriber 2 to Composed MapAndFilter: BLUE
green
Subscriber 2 to Composed MapAndFilter: GREEN
orange
Subscriber 2 to Composed MapAndFilter: ORANGE
purple

9.2. Hot Versus Cold

So far, we have considered that all Flux (and Mono) are the same: They all represent an asynchronous sequence of data, and nothing happens before you subscribe.

到目前为止,我们已经考虑到所有Flux(和Mono)都是相同的:它们都代表异步的数据序列,在您订阅之前什么也没有发生。

Really, though, there are two broad families of publishers: hot and cold.

不过,实际上,有两个广泛的发布者家族:热门和冷门。

The earlier description applies to the cold family of publishers. They generate data anew for each subscription. If no subscription is created, data never gets generated.

较早的描述适用于冷门的发布者系列。它们为每个订阅重新生成数据。如果没有创建订阅,则永远不会生成数据。

Think of an HTTP request: Each new subscriber triggers an HTTP call, but no call is made if no one is interested in the result.

考虑一下HTTP请求:每个新订户都会触发HTTP调用,但是如果没有人对结果感兴趣,则不会进行任何调用。

Hot publishers, on the other hand, do not depend on any number of subscribers. They might start publishing data right away and would continue doing so whenever a new Subscriber comes in (in which case, the subscriber would see only new elements emitted after it subscribed). For hot publishers, something does indeed happen before you subscribe.

另一方面,热门发布者不依赖任何数量的订阅者。他们可能会开始发布数据的时候了,并会继续这样做,每当有新 Subscriber进来(在这种情况下,用户会看到其预约后发射的新元素)。 对于热门发布者,在您订阅之前确实发生了某些事情。

One example of the few hot operators in Reactor is just: It directly captures the value at assembly time and replays it to anybody subscribing to it later. To re-use the HTTP call analogy, if the captured data is the result of an HTTP call, then only one network call is made, when instantiating just.

在Reactor中为数不多的热门运算符的一个示例是just:它直接在汇编时捕获值,然后将其重播给以后订阅该值的任何人。 为了重用HTTP调用类比,如果捕获的数据是HTTP调用的结果,则在实例化时仅进行一个网络调用just。

To transform just into a cold publisher, you can use defer. It defers the HTTP request in our example to subscription time (and would result in a separate network call for each new subscription).

要转化just为冷发行商,您可以使用defer。它在我们的示例中将HTTP请求延迟了订阅时间(并会为每个新订阅导致单独的网络调用)。

Most other hot publishers in Reactor extend Processor.
Reactor中的大多数其他热门发行商都在扩展Processor。

Consider two other examples. The following code shows the first example:

考虑另外两个例子。以下代码显示了第一个示例:

Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
                          .map(String::toUpperCase);

source.subscribe(d -> System.out.println("Subscriber 1: "+d));
source.subscribe(d -> System.out.println("Subscriber 2: "+d));

This first example produces the following output:

Subscriber 1: BLUE
Subscriber 1: GREEN
Subscriber 1: ORANGE
Subscriber 1: PURPLE
Subscriber 2: BLUE
Subscriber 2: GREEN
Subscriber 2: ORANGE
Subscriber 2: PURPLE

The following image shows the replay behavior:

Replaying behavior

Both subscribers catch all four colors, because each subscriber causes the process defined by the operators on the Flux to run.

两个订户都捕获全部四种颜色,因为每个订户都会导致Flux运行由操作符定义的过程。

Compare the first example to the second example, shown in the following code:

将第一个示例与第二个示例进行比较,如以下代码所示:

DirectProcessor<String> hotSource = DirectProcessor.create();

Flux<String> hotFlux = hotSource.map(String::toUpperCase);


hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));

hotSource.onNext("blue");
hotSource.onNext("green");

hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));

hotSource.onNext("orange");
hotSource.onNext("purple");
hotSource.onComplete();

The second example produces the following output:

Subscriber 1 to Hot Source: BLUE
Subscriber 1 to Hot Source: GREEN
Subscriber 1 to Hot Source: ORANGE
Subscriber 2 to Hot Source: ORANGE
Subscriber 1 to Hot Source: PURPLE
Subscriber 2 to Hot Source: PURPLE

The following image shows how a subscription is broadcast:

Broadcasting a subscription

Subscriber 1 catches all four colors. Subscriber 2, having been created after the first two colors were produced, catches only the last two colors. This difference accounts for the doubling of ORANGE and PURPLE in the output. The process described by the operators on this Flux runs regardless of when subscriptions have been attached.

订户1捕获所有四种颜色。在生成前两种颜色之后创建的订户2,仅捕获后两种颜色。 这种差异导致输出ORANGE和的加倍PURPLE。无论何时附加订阅,运营商在此Flux上描述的过程都将运行。

9.3. Broadcasting to Multiple Subscribers with ConnectableFlux

Sometimes, you may want to not defer only some processing to the subscription time of one subscriber, but you might actually want for several of them to rendezvous and then trigger the subscription and data generation.

有时,您可能不希望仅将某些处理推迟到一个订户的订阅时间,但实际上您可能希望其中的几个会合,然后触发订阅和数据生成。

This is what ConnectableFlux is made for. Two main patterns are covered in the Flux API that return a ConnectableFlux: publish and replay.

这是ConnectableFlux能做的。Flux API 中涵盖了两个主要模式,这些模式返回ConnectableFlux:publish和replay。

  • publish dynamically tries to respect the demand from its various subscribers, in terms of backpressure, by forwarding these requests to the source. Most notably, if any subscriber has a pending demand of 0, publish pauses its requesting to the source.

  • replay buffers data seen through the first subscription, up to configurable limits (in time and buffer size). It replays the data to subsequent subscribers.

  • publish通过将这些请求转发到源,动态地尝试在背压方面尊重其各个订户的需求。最值得注意的是,如果任何订阅者的需求待定0,发布会暂停其对源的请求

  • replay缓冲通过第一个订阅者看到的数据,直至可配置的限制(时间和缓冲区大小)。它将数据重放给后续的订户。

A ConnectableFlux offers additional methods to manage subscriptions downstream versus subscriptions to the original source. These additional methods include the following:

ConnectableFlux提供了其他方法来管理下游订阅,而不是原始来源的订阅。这些其他方法包括:

  • connect() can be called manually once you reach enough subscriptions to the Flux. That triggers the subscription to the upstream source.

  • autoConnect(n) can do the same job automatically once n subscriptions have been made.

  • refCount(n) not only automatically tracks incoming subscriptions but also detects when these subscriptions are cancelled. If not enough subscribers are tracked, the source is “disconnected”, causing a new subscription to the source later if additional subscribers appear.

  • refCount(int, Duration) adds a “grace period.” Once the number of tracked subscribers becomes too low, it waits for the Duration before disconnecting the source, potentially allowing for enough new subscribers to come in and cross the connection threshold again.

  • connect() 只要您订阅了足够的订阅,便可以手动调用Flux。这将触发对上游源的订阅。

  • autoConnect(n) 进行n次订阅后,可以自动执行相同的工作。

  • refCount(n)不仅会自动跟踪传入的订阅,还会检测何时取消这些订阅。如果跟踪的订户不足,则源将“断开连接”,如果出现其他订户,则会在以后导致对该源的新订阅。

  • refCount(int, Duration)添加了“宽限期”。一旦被跟踪的订户数量变得太少,它将Duration在断开源连接之前等待,可能允许足够的新订户进入并再次超过连接阈值。

Consider the following example:

Flux<Integer> source = Flux.range(1, 3)
                           .doOnSubscribe(s -> System.out.println("subscribed to source"));

ConnectableFlux<Integer> co = source.publish();

co.subscribe(System.out::println, e -> {}, () -> {});
co.subscribe(System.out::println, e -> {}, () -> {});

System.out.println("done subscribing");
Thread.sleep(500);
System.out.println("will now connect");

co.connect();

The preceding code produces the following output:

done subscribing
will now connect
subscribed to source
1
1
2
2
3
3

The following code uses autoConnect:

Flux<Integer> source = Flux.range(1, 3)
                           .doOnSubscribe(s -> System.out.println("subscribed to source"));

Flux<Integer> autoCo = source.publish().autoConnect(2);

autoCo.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("subscribed first");
Thread.sleep(500);
System.out.println("subscribing second");
autoCo.subscribe(System.out::println, e -> {}, () -> {});

The preceding code produces the following output:

subscribed first
subscribing second
subscribed to source
1
1
2
2
3
3

9.4. Three Sorts of Batching

When you have lots of elements and you want to separate them into batches, you have three broad solutions in Reactor: grouping, windowing, and buffering. These three are conceptually close, because they redistribute a Flux<T> into an aggregate. Grouping and windowing create a Flux<Flux<T>>, while buffering aggregates into a Collection<T>.

当您有很多元素并且想要将它们分成批处理时,Reactor中提供了三种广泛的解决方案:分组,窗口化和缓冲。这三个在概念上很接近,因为它们将重新分配Flux<T>为一个集合。 分组和开窗创建一个Flux<Flux<T>>,同时将聚合缓冲到一个Collection<T>。

9.4.1. Grouping with Flux<GroupedFlux<T>>

Grouping is the act of splitting the source Flux<T> into multiple batches, each of which matches a key.

分组是将源Flux<T>分成多个批次的操作,每个批次与一个密钥匹配。

The associated operator is groupBy.

关联的运算符为groupBy。

Each group is represented as a GroupedFlux<T>, which lets you retrieve the key by calling its key() method.

每个组均以表示GroupedFlux<T>,您可以通过调用其key()方法来检索密钥 。

There is no necessary continuity in the content of the groups. Once a source element produces a new key, the group for this key is opened and elements that match the key end up in the group (several groups could be open at the same time).

组的内容没有必要的连续性。一旦源元素生成新密钥,就会打开该密钥的组,并且与该密钥匹配的元素最终出现在该组中(可以同时打开几个组)。

This means that groups:

  1. Are always disjoint (a source element belongs to one and only one group).终不相交(源元素属于一个且仅属于一组)。

  2. Can contain elements from different places in the original sequence.可以包含原始序列中不同位置的元素。

  3. Are never empty.永远不会空着。

The following example groups values by whether they are even or odd:

下面的示例按值是偶数还是奇数对值进行分组:

StepVerifier.create(
	Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
		.groupBy(i -> i % 2 == 0 ? "even" : "odd")
		.concatMap(g -> g.defaultIfEmpty(-1) //if empty groups, show them
				.map(String::valueOf) //map to string
				.startWith(g.key())) //start with the group's key
	)
	.expectNext("odd", "1", "3", "5", "11", "13")
	.expectNext("even", "2", "4", "6", "12")
	.verifyComplete();
Grouping is best suited for when you have a medium to low number of groups. The groups must also imperatively be consumed (such as by a flatMap) so that groupBy continues fetching data from upstream and feeding more groups. Sometimes, these two constraints multiply and lead to hangs, such as when you have a high cardinality and the concurrency of the flatMap consuming the groups is too low.
分组最适合中小数量的组。还必须强制使用组(例如通过flatMap),以便groupBy 继续从上游获取数据并提供更多组。 有时,这两个约束会倍增并导致挂起,例如,当您具有高基数并且flatMap使用组的并发性太低时。

9.4.2. Windowing with Flux<Flux<T>>

Windowing is the act of splitting the source Flux<T> into windows, by criteria of size, time, boundary-defining predicates, or boundary-defining Publisher.

窗口化是根据大小,时间,边界定义或边界定义的标准将源分割Flux<T>为窗口的行为的Publisher。

The associated operators are window, windowTimeout, windowUntil, windowWhile, and windowWhen.

相关的操作符是 window,windowTimeout,windowUntil,windowWhile,和 windowWhen。

Contrary to groupBy, which randomly overlaps according to incoming keys, windows are (most of the time) opened sequentially.

与相对groupBy,根据输入键随机重叠。窗口(大部分时间)是按顺序打开的。

Some variants can still overlap, though. For instance, in window(int maxSize, int skip) the maxSize parameter is the number of elements after which a window closes, and the skip parameter is the number of elements in the source after which a new window is opened. So if maxSize > skip, a new window opens before the previous one closes and the two windows overlap.

但是,某些变体仍然可以重叠。例如,在window(int maxSize, int skip) 该maxSize参数是窗口达到元件的数量之后关闭,并且所述skip参数是源元件的数量达到后在打开一个新的窗口。 因此,如果maxSize > skip打开一个新窗口,则在前一个窗口关闭之前,这两个窗口会重叠。

The following example shows overlapping windows:

以下示例显示了重叠的窗口:

StepVerifier.create(
	Flux.range(1, 10)
		.window(5, 3) //overlapping windows
		.concatMap(g -> g.defaultIfEmpty(-1)) //show empty windows as -1
	)
		.expectNext(1, 2, 3, 4, 5)
		.expectNext(4, 5, 6, 7, 8)
		.expectNext(7, 8, 9, 10)
		.expectNext(10)
		.verifyComplete();
With the reverse configuration (maxSize < skip), some elements from the source are dropped and are not part of any window.
如果使用相反的配置(maxSize< skip),则会删除源中的某些元素,它们不属于任何窗口。

In the case of predicate-based windowing through windowUntil and windowWhile, having subsequent source elements that do not match the predicate can also lead to empty windows, as demonstrated in the following example:

在通过“ windowUntil”和“ windowWhile”进行基于谓词的窗口化的情况下, 后续源元素与断言不匹配也可能导致 如以下示例所示,关闭空窗口:

StepVerifier.create(
	Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
		.windowWhile(i -> i % 2 == 0)
		.concatMap(g -> g.defaultIfEmpty(-1))
	)
		.expectNext(-1, -1, -1) //respectively triggered by odd 1 3 5
		.expectNext(2, 4, 6) // triggered by 11
		.expectNext(12) // triggered by 13
		// however, no empty completion window is emitted (would contain extra matching elements)
		.verifyComplete();

9.4.3. Buffering with Flux<List<T>>

Buffering is similar to windowing, with the following twist: Instead of emitting windows (each of which is each a Flux<T>), it emits buffers (which are Collection<T> — by default, List<T>).

缓冲类似于加窗,但有以下不同:而不是发出 窗口(每个窗口都是 Flux<T>),而是发出缓冲区(Collection<T> 默认为List<T>)。

The operators for buffering mirror those for windowing: buffer, bufferTimeout, bufferUntil, bufferWhile, and bufferWhen.

用于缓冲的操作符反映出像窗口的性质,比如:buffer,bufferTimeout, bufferUntil,bufferWhile,和bufferWhen。

Where the corresponding windowing operator opens a window, a buffering operator creates a new collection and starts adding elements to it. Where a window closes, the buffering operator emits the collection.

在相应的窗口运算符打开一个窗口的地方,一个缓冲运算符创建一个新的集合并开始向其中添加元素。 在窗口关闭的地方,缓冲运算符发出集合。

Buffering can also lead to dropping source elements or having overlapping buffers, as the following example shows:

缓冲还会导致源元素丢失或缓冲区重叠,如以下示例所示:

StepVerifier.create(
	Flux.range(1, 10)
		.buffer(5, 3) //overlapping buffers
	)
		.expectNext(Arrays.asList(1, 2, 3, 4, 5))
		.expectNext(Arrays.asList(4, 5, 6, 7, 8))
		.expectNext(Arrays.asList(7, 8, 9, 10))
		.expectNext(Collections.singletonList(10))
		.verifyComplete();

Unlike in windowing, bufferUntil and bufferWhile do not emit an empty buffer, as the following example shows:

不同于在窗口中,bufferUntil并且bufferWhile不发出空缓冲区,如以下示例所示:

StepVerifier.create(
	Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
		.bufferWhile(i -> i % 2 == 0)
	)
	.expectNext(Arrays.asList(2, 4, 6)) // triggered by 11
	.expectNext(Collections.singletonList(12)) // triggered by 13
	.verifyComplete();

9.5. Parallelizing Work with ParallelFlux

With multi-core architectures being a commodity nowadays, being able to easily parallelize work is important. Reactor helps with that by providing a special type, ParallelFlux, that exposes operators that are optimized for parallelized work.

如今,随着多核体系结构成为一种商品,能够轻松并行化工作非常重要。 Reactor通过提供一种特殊类型来帮助实现这一点,该类型 ParallelFlux公开了针对并行工作进行了优化的运算符。

To obtain a ParallelFlux, you can use the parallel() operator on any Flux. By itself, this method does not parallelize the work. Rather, it divides the workload into “rails” (by default, as many rails as there are CPU cores).

要获取 ParallelFlux,您可以parallel()在任意位置使用运算符Flux。就其本身而言,此方法不会使工作并行化。 相反,它将工作负载划分为“ rails”(默认情况下,与CPU内核一样多的rails)。

In order to tell the resulting ParallelFlux where to run each rail (and, by extension, to run rails in parallel) you have to use runOn(Scheduler). Note that there is a recommended dedicated Scheduler for parallel work: Schedulers.parallel().

为了告诉最终结果ParallelFlux,每个导轨都在哪里运行(并扩展为并行运行导轨),您必须使用runOn(Scheduler)。 需要注意的是有一个推荐的专用Scheduler并行工作:Schedulers.parallel()。

Compare the next two examples:

Flux.range(1, 10)
    .parallel(2) (1)
    .subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));
1 We force a number of rails instead of relying on the number of CPU cores. 我们强制使用多个导轨,而不是依赖于CPU内核的数量。
Flux.range(1, 10)
    .parallel(2)
    .runOn(Schedulers.parallel())
    .subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));

The first example produces the following output:

main -> 1
main -> 2
main -> 3
main -> 4
main -> 5
main -> 6
main -> 7
main -> 8
main -> 9
main -> 10

The second correctly parallelizes on two threads, as shown in the following output:

第二个在两个线程上正确并行化,如以下输出所示:

parallel-1 -> 1
parallel-2 -> 2
parallel-1 -> 3
parallel-2 -> 4
parallel-1 -> 5
parallel-2 -> 6
parallel-1 -> 7
parallel-1 -> 9
parallel-2 -> 8
parallel-2 -> 10

If, once you process your sequence in parallel, you want to revert back to a “normal” Flux and apply the rest of the operator chain in a sequential manner, you can use the sequential() method on ParallelFlux.

如果在并行处理序列后想要恢复为“正常” Flux并以顺序方式应用其余运算符链,则可以在ParallelFlux上使用sequential()方法。

Note that sequential() is implicitly applied if you subscribe to the ParallelFlux with a Subscriber but not when using the lambda-based variants of subscribe.

请注意,sequential()是隐式应用,如果你subscribe到ParallelFlux 了Subscriber使用基于lambda时,但不是subscribe。

Note also that subscribe(Subscriber<T>) merges all the rails, while subscribe(Consumer<T>) runs all the rails. If the subscribe() method has a lambda, each lambda is executed as many times as there are rails.

还要注意,subscribe(Subscriber<T>)合并所有导轨,同时 subscribe(Consumer<T>)运行所有导轨。 如果该subscribe()方法具有lambda,则每个lambda的执行次数将与rails一样多。

You can also access individual rails or “groups” as a Flux<GroupedFlux<T>> through the groups() method and apply additional operators to them through the composeGroup() method.

您也可以Flux<GroupedFlux<T>>通过groups()方法访问单个导轨或“组”,并通过该 方法向它们应用其他运算符composeGroup() 。

9.6. Replacing Default Schedulers

As we described in the Threading and Schedulers section, Reactor Core comes with several Scheduler implementations. While you can always create new instances through the new* factory methods, each Scheduler flavor also has a default singleton instance that is accessible through the direct factory method (such as Schedulers.boundedElastic() versus Schedulers.newBoundedElastic(…​)).

如我们在“ 线程和调度程序”部分所述,Reactor Core附带了几种 Scheduler实现。 尽管您始终可以通过new* 工厂方法创建新实例,但是每个Scheduler风味还具有默认的单例实例,可通过直接工厂方法(例如Schedulers.boundedElastic()vs Schedulers.newBoundedElastic(…​))进行访问。

These default instances are the ones used by operators that need a Scheduler to work when you do not explicitly specify one. For example, Flux#delayElements(Duration) uses the Schedulers.parallel() instance.

这些默认实例是Scheduler未明确指定实例时需要工作的运算符所使用的实例。 例如,Flux#delayElements(Duration)使用Schedulers.parallel()实例。

In some cases, however, you might need to change these default instances with something else in a cross-cutting way, without having to make sure every single operator you call has your specific Scheduler as a parameter. An example is measuring the time every single scheduled task takes by wrapping the real schedulers, for instrumentation purposes. In other words, you might want to change the default Schedulers.

但是,在某些情况下,您可能需要以交叉方式使用其他默认值来更改这些默认实例,而不必确保调用的每个运算符都将自己的特定Scheduler参数作为参数。 一个示例是通过包装实际的计划程序来测量每个计划的任务所花费的时间,以进行检测。换句话说,您可能想要更改默认的Schedulers。

Changing the default schedulers is possible through the Schedulers.Factory class. By default, a Factory creates all the standard Scheduler through similarly named methods. You can override each of these with your custom implementation.

通过Schedulers.Factory该类可以更改默认调度程序。 默认情况下,Factory 通过相似命名的方法创建所有标准Scheduler。您可以使用自定义实现覆盖其中的每一个。

Additionally, the factory exposes one additional customization method: decorateExecutorService. It is invoked during the creation of every Reactor Core Scheduler that is backed by a ScheduledExecutorService (even non-default instances, such as those created by calls to Schedulers.newParallel()).

此外,工厂还公开了另一种自定义方法: decorateExecutorService。 在创建每个Scheduler由ScheduledExecutorService(作为非默认实例,例如通过调用创建的实例)支持的Reactor Core的过程中调用它 Schedulers.newParallel()。

This lets you tune the ScheduledExecutorService to be used: The default one is exposed as a Supplier and, depending on the type of Scheduler being configured, you can choose to entirely bypass that supplier and return your own instance or you can get() the default instance and wrap it.

这使您可以调整ScheduledExecutorService要使用的:默认实例显示为一个Supplier并且根据Scheduler配置的类型, 您可以选择完全绕过该提供并返回自己的实例,也可以为get()将选择的默认实例包装并将其返回。

Once you create a Factory that fits your needs, you must install it by calling Schedulers.setFactory(Factory).
创建Factory适合您需求的软件后,您必须通过调用进行安装 Schedulers.setFactory(Factory)。

Finally, there is a last customizable hook in Schedulers: onHandleError. This hook is invoked whenever a Runnable task submitted to a Scheduler throws an Exception (note that if there is an UncaughtExceptionHandler set for the Thread that ran the task, both the handler and the hook are invoked).

最后,还有最后一个可定制的钩子Schedulers:onHandleError。 每当Runnable提交给Scheduler引发任务的任务抛出异常时,都会调用此挂钩Exception(请注意,如果有运行该任务的UncaughtExceptionHandler集合,则将Thread同时调用处理程序和挂钩)。

9.7. Using Global Hooks

Reactor has another category of configurable callbacks that are invoked by Reactor operators in various situations. They are all set in the Hooks class, and they fall into three categories:

Reactor具有另一类可配置的回调,可在各种情况下由Reactor运算符调用。它们全都设置在Hooks 类中,分为三类:

9.7.1. Dropping Hooks

Dropping hooks are invoked when the source of an operator does not comply with the Reactive Streams specification. These kind of errors are outside of the normal execution path (that is, they cannot be propagated through onError).

当操作员的来源不符合Reactive Streams规范时,将调用吊钩。这些类型的错误不在正常的执行路径之内(也就是说,它们不能通过传播onError)。

Typically, a Publisher calls onNext on the operator despite having already called onCompleted on it previously. In that case, the onNext value is dropped. The same is true for an extraneous onError signal.

通常,尽管先前已经在运算符上进行了Publisher调用onNext,但仍在对运算符进行调用 onCompleted。 在这种情况下,该onNext值将被删除。外部onError信号也是如此。

The corresponding hooks, onNextDropped and onErrorDropped, let you provide a global Consumer for these drops. For example, you can use it to log the drop and clean up resources associated with a value if needed (as it never makes it to the rest of the reactive chain).

相应的钩子onNextDropped和onErrorDropped允许您Consumer为这些放置提供全局 变量。 例如,您可以使用它来记录删除操作,并在需要时清理与某个值关联的资源(因为它永远不会到达响应链的其余部分)。

Setting the hooks twice in a row is additive: every consumer you provide is invoked. The hooks can be fully reset to their defaults by using the Hooks.resetOn*Dropped() methods.

连续两次设置钩子是附加的:您提供的每个使用者都将被调用。 可以使用这些Hooks.resetOn*Dropped()方法将挂钩完全重置为默认值。

9.7.2. Internal Error Hook

One hook, onOperatorError, is invoked by operators when an unexpected Exception is thrown during the execution of their onNext, onError, and onComplete methods.

一个钩,onOperatorError是由操作员当一意外的调用Exception其执行期间被抛出onNext,onError和onComplete方法。

Unlike the previous category, this is still within the normal execution path. A typical example is the map operator with a map function that throws an Exception (such as division by zero). It is still possible at this point to go through the usual channel of onError, and that is what the operator does.

与之前的类别不同,这仍然在常规执行路径之内。一个典型的例子是map带有一个映射函数的运算符,该运算符抛出一个Exception(例如被零除)。 在这一点上,仍然有可能通过的常规渠道在 onError,而这正是操作员所要做的。

First, it passes the Exception through onOperatorError. The hook lets you inspect the error (and the incriminating value, if relevant) and change the Exception. Of course, you can also do something on the side, such as log and return the original Exception.

首先,它准许Exception通过onOperatorError。该挂钩可让您检查错误(以及相关值),并更改Exception。 当然,您也可以在侧面执行一些操作,例如log并返回original Exception。

Note that you can set the onOperatorError hook multiple times. You can provide a String identifier for a particular BiFunction and subsequent calls with different keys concatenates the functions, which are all executed. On the other hand, reusing the same key twice lets you replace a function you previously set.

请注意,您可以onOperatorError多次设置挂钩。您可以String为一个特定的标识符提供 标识符BiFunction,随后的调用将使用不同的键将这些函数串联起来,这些函数将全部执行。 另一方面,重复使用同一键两次可让您替换以前设置的功能。

As a consequence, the default hook behavior can be both fully reset (by using Hooks.resetOnOperatorError()) or partially reset for a specific key only (by using Hooks.resetOnOperatorError(String)).

因此,默认挂钩行为既可以完全重置(通过使用Hooks.resetOnOperatorError()),也可以 key仅针对特定的部分重置(通过使用 Hooks.resetOnOperatorError(String))。

9.7.3. Assembly Hooks

These hooks tie in the lifecycle of operators. They are invoked when a chain of operators is assembled (that is, instantiated). onEachOperator lets you dynamically change each operator as it is assembled in the chain, by returning a different Publisher. onLastOperator is similar, except that it is invoked only on the last operator in the chain before the subscribe call.

这些挂钩关系到操作符的生命周期。在组装(即实例化)一系列操作符时调用它们。onEachOperator通过返回不同的,可以动态更改链中组装的每个运算符Publisher。 onLastOperator与之类似,除了它仅在调用subscribe之前在链中的最后一个运算符上才执行。

If you want to decorate all operators with a cross-cutting Subscriber implementation, you can look into the Operators#lift* methods to help you deal with the various types of Reactor Publishers out there (Flux, Mono, ParallelFlux, GroupedFlux, and ConnectableFlux), as well as their Fuseable versions.

如果您想使用跨领域的Subscriber实现来装饰所有运算符,您可以查看“ Operators#lift *”方法来帮助您处理各种 那里的反应堆“Publishers”的类型(“Flux”,“Mono”,“ParallelFlux”,“GroupedFlux”和“ConnectableFlux”), 以及它们的“Fuseable”版本。

Like onOperatorError, these hooks are cumulative and can be identified with a key. They can also be reset partially or totally.

像一样onOperatorError,这些钩子是累积的,可以用一个密钥标识。它们也可以部分或全部重置。

9.7.4. Hook Presets

The Hooks utility class provides two preset hooks. These are alternatives to the default behaviors that you can use by calling their corresponding method, rather than coming up with the hook yourself:

这 Hooks工具类提供两个预置钩。这些是默认行为的替代方法,您可以通过调用它们的相应方法来使用这些默认行为,而不用亲自实现该钩子:

  • onNextDroppedFail(): onNextDropped used to throw a Exceptions.failWithCancel() exception. It now defaults to logging the dropped value at the DEBUG level. To go back to the old default behavior of throwing, use onNextDroppedFail().

  • onOperatorDebug(): This method activates debug mode. It ties into the onOperatorError hook, so calling resetOnOperatorError() also resets it. You can independently reset it by using resetOnOperatorDebug(), as it uses a specific key internally.

  • onNextDroppedFail():onNextDropped用于引发Exceptions.failWithCancel() 异常。现在,它默认在DEBUG级别记录下降的值。要返回原来的默认抛出行为,请使用onNextDroppedFail()。

  • onOperatorDebug():此方法激活调试模式。它与onOperatorError挂钩相关,因此调用resetOnOperatorError()也将其重置。您可以使用来独立重置它 resetOnOperatorDebug(),因为它在内部使用了特定的密钥。

9.8. Adding a Context to a Reactive Sequence

One of the big technical challenges encountered when switching from an imperative programming perspective to a reactive programming mindset lies in how you deal with threading.

从命令式编程视角转换为反应式编程思维方式时遇到的重大技术挑战之一是如何处理线程。

Contrary to what you might be used to, in reactive programming, you can use a Thread to process several asynchronous sequences that run at roughly the same time (actually, in non-blocking locksteps). The execution can also easily and often jump from one thread to another.

与您习惯的反应式编程相反,您可以使用`Thread`处理几个大致同时运行的异步序列(实际上,非阻塞的锁步)。 执行也可以轻松且经常从一个线程跳转到另一个。

This arrangement is especially hard for developers that use features dependent on the threading model being more “stable,” such as ThreadLocal. As it lets you associate data with a thread, it becomes tricky to use in a reactive context. As a result, libraries that rely on ThreadLocal at least introduce new challenges when used with Reactor. At worst, they work badly or even fail. Using the MDC of Logback to store and log correlation IDs is a prime example of such a situation.

对于使用依赖于线程模型上开发更“稳定”的功能这种安排尤其困难。比如ThreadLocal。因为它使您可以将数据与线程相关联,所以在反应式上下文中使用它变得棘手。 ThreadLocal与Reactor一起使用时,至少依赖的库会带来新的挑战。在最坏的情况下,它们会表现不佳甚至失败。 使用Logback的MDC存储和记录相关ID是这种情况的主要示例。

The usual workaround for ThreadLocal usage is to move the contextual data, C, along your business data, T, in the sequence, by using (for instance) Tuple2<T, C>. This does not look good and leaks an orthogonal concern (the contextual data) into your method and Flux signatures.

使用的通常解决方法ThreadLocal是使用(例如)按顺序移动上下文数据C和业务数据。这看起来不好,并且将正交关注点(上下文数据)泄漏到您的方法和 签名中。TTuple2<T, C>Flux

Since version 3.1.0, Reactor comes with an advanced feature that is somewhat comparable to ThreadLocal but can be applied to a Flux or a Mono instead of a Thread. This feature is called Context.

从版本开始3.1.0,Reactor带有一项先进的功能,在某种程度上可以与ThreadLocal媲美,但可以应用于Flux或Mono代替Thread。此功能称为Context。

As an illustration of what it looks like, the following example both writes from and writes to Context:

为了说明其外观,以下示例同时写入和写入Context:

String key = "message";
Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                                   .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "World"));

StepVerifier.create(r)
            .expectNext("Hello World")
            .verifyComplete();

In the following sections, we cover Context and how to use it, so that you can eventually understand the preceding example.

在以下各节中,我们介绍Context了它以及如何使用它,以便您最终可以理解前面的示例。

This is an advanced feature that is more targeted at library developers. It requires good understanding of the lifecycle of a Subscription and is intended for libraries that are responsible for the subscriptions.
这是一项高级功能,更面向库开发人员。它需要对Subscription的生命周期有很好的了解,并且适用于负责订阅的库。

9.8.1. The Context API

Context is an interface reminiscent of Map.It stores key-value pairs and lets you fetch a value you stored by its key. More specifically:

Context是一个让人想起的接口。Map它存储键值对,并允许您获取通过其键存储的值。进一步来说:

  • Both key and values are of type Object, so a Context instance can contain any number of highly divergent values from different libraries and sources.

  • A Context is immutable.

  • Use put(Object key, Object value) to store a key-value pair, returning a new Context instance. You can also merge two contexts into a new one by using putAll(Context).

  • You can check whether the key is present with hasKey(Object key).

  • Use getOrDefault(Object key, T defaultValue) to retrieve a value (cast to a T) or fall back to a default one if the Context instance does not have that key.

  • Use getOrEmpty(Object key) to get an Optional<T> (the Context instance attempts to cast the stored value to T).

  • Use delete(Object key) to remove the value associated to a key, returning a new Context.

  • 键和值都是类型Object,因此Context实例可以包含来自不同库和源的任意数量的高度不同的值。

  • Context是不可变的。

  • 使用put(Object key, Object value)存储一个键值对,返回一个新的 Context实例。您还可以使用将两个上下文合并到一个新的上下文中 putAll(Context)。

  • 您可以检查密钥是否存在hasKey(Object key)。

  • 使用getOrDefault(Object key, T defaultValue)检索值(强制转换为T),或退回到一个默认的,如果Context实例没有这把钥匙。

  • 使用getOrEmpty(Object key)得到的Optional<T>(该Context实例尝试投的存储值T)。

  • 使用delete(Object key)删除关联到一个键的值,返回一个新的 Context。

When you create a Context, you can create pre-valued Context instances with up to five key-value pairs by using the static Context.of methods. They take 2, 4, 6, 8 or 10 Object instances, each couple of Object instances being a key-value pair to add to the Context.

创建时Context,您可以Context使用静态Context.of方法创建最多包含五个键值对的预值实例。 它们需要2、4、6、8或10个 Object实例,每对Object实例都是要添加到的键值对Context。

Alternatively you can also create an empty Context by using Context.empty().

或者,您也可以Context使用创建空白Context.empty()。

9.8.2. Tying a Context to a Flux and Writing

To make a Context be useful, it must be tied to a specific sequence and be accessible by each operator in a chain. Note that the operator must be a Reactor-native operator, as Context is specific to Reactor.

为了使它Context有用,它必须绑定到特定的序列,并且链中的每个操作员都可以访问。 请注意,运算符必须是Reactor本机运算符,这 Context是特定于Reactor的。

Actually, a Context is tied to each Subscriber in a chain. It uses the Subscription propagation mechanism to make itself available to each operator, starting with the final subscribe and moving up the chain.

实际上,Context是绑定在链中的每一个Subscriber。它使用Subscription 传播机制使每个操作员都可以使用,从最终操作开始, subscribe然后向上移动。

In order to populate the Context, which can only be done at subscription time, you need to use the subscriberContext operator.

为了填充Context只能在订阅进行时去完成,您需要使用subscriberContext运算符。

subscriberContext(Context) merges the Context you provide and the Context from downstream (remember, the Context is propagated from the bottom of the chain towards the top). This is done through a call to putAll, resulting in a new Context for upstream.

subscriberContext(Context)合并Context您提供的内容和 Context来自下游的内容(请记住,Context是从链的底部向顶部传播的)。 这是通过调用来完成的putAll,从而为上游创建了一个新的 Context。

You can also use the more advanced subscriberContext(Function<Context, Context>). It receives the state of the Context from downstream, lets you put or delete values as you see fit, and returns the new Context to use. You can even decide to return a completely different instance, although it is really not recommended (doing so might impact third-party libraries that depend on the Context).
您也可以使用更高级的subscriberContext(Function<Context, Context>)。 它Context从下游接收的状态,让您根据需要放置或删除值,并返回Context要使用的新值。您甚至可以决定返回一个完全不同的实例,尽管实际上不建议这样做(这样做可能会影响依赖的第三方库Context)。

9.8.3. Reading a Context

Once you haved populated a Context, you can retrieve that data. Most of the time, the responsibility of putting information into the Context is on the end user’s side, while exploiting that information is on the third-party library’s side, as such libraries are usually upstream of the client code.

填充后Context,您可以检索该数据。在大多数情况下,将信息放入的责任Context 在最终用户一方,而利用信息的责任在第三方库的一方,因为此类库通常位于客户端代码的上游。

The tool for reading data from the context is the static Mono.subscriberContext() method.

从上下文读取数据的工具的静态方法是 Mono.subscriberContext() 。

9.8.4. Simple Context Examples

The examples in this section are meant as ways to better understand some of the caveats of using a Context.

本节中的示例旨在更好地理解使用Context。

We first look back at our simple example from the introduction in a bit more detail, as the following example shows:

首先,我们将更详细地回顾一下引言中的简单示例,如以下示例所示:

String key = "message";
Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext() (2)
                                   .map( ctx -> s + " " + ctx.get(key))) (3)
                .subscriberContext(ctx -> ctx.put(key, "World")); (1)

StepVerifier.create(r)
            .expectNext("Hello World") (4)
            .verifyComplete();
1 The chain of operators ends with a call to subscriberContext(Function) that puts "World" into the Context under a key of "message". 运算符链的结尾是对的调用,subscriberContext(Function)该 调用"World"放入的Context密钥下"message"。
2 We flatMap on the source element, materializing the Context with Mono.subscriberContext(). 我们flatMap在源元素上,Context用 Mono.subscriberContext()实现。
3 We then use map to extract the data associated to "message" and concatenate that with the original word. 然后map,我们用于提取"message"与原始单词相关联的数据并将其与原始单词连接。
4 The resulting Mono<String> emits "Hello World". 结果Mono<String>发出"Hello World"。
The numbering above versus the actual line order is not a mistake. It represents the execution order. Even though subscriberContext is the last piece of the chain, it is the one that gets executed first (due to its subscription-time nature and the fact that the subscription signal flows from bottom to top).
上面的编号与实际的行顺序没有关系。它代表执行顺序。即使subscriberContext是链的最后一部分,它也是第一个被执行的(由于其订阅时间的性质以及订阅信号从下到上流动的事实)。
In your chain of operators, the relative positions of where you write to the Context and where you read from it matters. The Context is immutable and its content can only be seen by operators above it, as demonstrated in the following example:
在您的链中,您向链中写入位置的Context和从中读取的位置的相对位置很重要。因为Context 是不可变的,它的内容只能由上面运算符看出,如在下面的例子所示:
String key = "message";
Mono<String> r = Mono.just("Hello")
                     .subscriberContext(ctx -> ctx.put(key, "World")) (1)
                     .flatMap( s -> Mono.subscriberContext()
                                        .map( ctx -> s + " " + ctx.getOrDefault(key, "Stranger")));  (2)

StepVerifier.create(r)
            .expectNext("Hello Stranger") (3)
            .verifyComplete();
1 The Context is written to too high in the chain. 在Context被写入链太高。
2 As a result, in the flatMap, there is no value associated with our key. A default value is used instead. 结果,在中flatMap,没有与我们的密钥关联的值。而是使用默认值。
3 The resulting Mono<String> thus emits "Hello Stranger". 结果Mono<String>由此发出"Hello Stranger"。

The following example also demonstrates the immutable nature of the Context, and how Mono.subscriberContext() always returns the Context set by subscriberContext calls:

以下示例还演示了的不变性质Context,以及 Mono.subscriberContext()如何始终通过subscriberContext调用返回Context集合:

String key = "message";

Mono<String> r = Mono.subscriberContext() (1)
	.map( ctx -> ctx.put(key, "Hello")) (2)
	.flatMap( ctx -> Mono.subscriberContext()) (3)
	.map( ctx -> ctx.getOrDefault(key,"Default")); (4)

StepVerifier.create(r)
	.expectNext("Default") (5)
	.verifyComplete();
1 We materialize the Context 我们实现 Context
2 In a map we attempt to mutate it 在map我们尝试更新它
3 We re-materialize the Context in a flatMap 我们重新实现Context了flatMap
4 We read the attempted key in the Context 我们读取Context中的值
5 The key was never set to "Hello". 密钥从未设置为"Hello"。

Similarly, in the case of several attempts to write the same key to the Context, the relative order of the writes matters, too. Operators that read the Context see the value that was set closest to being under them, as demonstrated in the following example:

同样,在多次尝试向写入相同密钥的情况下,写入Context的相对顺序也很重要。 读取Contextsee的运算符将看到设置为最接近其值的值,如以下示例所示:

String key = "message";
Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                                   .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Reactor")) (1)
                .subscriberContext(ctx -> ctx.put(key, "World")); (2)

StepVerifier.create(r)
            .expectNext("Hello Reactor") (3)
            .verifyComplete();
1 A write attempt on key "message". 对key的写尝试"message"。
2 Another write attempt on key "message". 对key的另一次写尝试"message"。
3 The map only saw the value set closest to it (and below it): "Reactor". 该map只看见值最接近的设置为它(和它下面)"Reactor"。

In the preceding example, the Context is populated with "World" during subscription. Then the subscription signal moves upstream and another write happens. This produces a second immutable Context with a value of "Reactor". After that, data starts flowing. The flatMap sees the Context closest to it, which is our second Context with the "Reactor" value.

在前面的示例中,在订阅期间Context填充"World"。然后,订阅信号向上游移动,并发生另一次写操作。这将产生第二个不可变Context值"Reactor"。 之后,数据开始流动。在flatMap看到Context最接近于它,这是我们第二次Context与 "Reactor"价值。

You might wonder if the Context is propagated along with the data signal. If that was the case, putting another flatMap between these two writes would use the value from the top Context. But this is not the case, as demonstrated by the following example:

您可能想知道Context是否随数据信号一起传播。如果是这种情况,flatMap在这两次写入之间放置另一个将使用最顶上的Context值。 但这不是事实,如以下示例所示:

String key = "message";
Mono<String> r = Mono.just("Hello")
                     .flatMap( s -> Mono.subscriberContext()
                                        .map( ctx -> s + " " + ctx.get(key))) (3)
                     .subscriberContext(ctx -> ctx.put(key, "Reactor")) (2)
                     .flatMap( s -> Mono.subscriberContext()
                                        .map( ctx -> s + " " + ctx.get(key))) (4)
                     .subscriberContext(ctx -> ctx.put(key, "World")); (1)

StepVerifier.create(r)
            .expectNext("Hello Reactor World") (5)
            .verifyComplete();
1 This is the first write to happen. 这是第一次写入。
2 This is the second write to happen. 这是第二次写入。
3 The first flatMap sees second write. 第一个flatMap看到第二个写
4 The second flatMap concatenates the result from first one with the value from the first write. 第二个flatMap将第一个结果与第一次写入的值连接在一起。
5 The Mono emits "Hello Reactor World". 该Mono发射"Hello Reactor World"。

The reason is that the Context is associated to the Subscriber and each operator accesses the Context by requesting it from its downstream Subscriber.

原因是“Context”与“Subscriber”和每个运算符相关联,通过从其下游的“Subscriber”中请求来访问“Context”。

One last interesting propagation case is the one where the Context is also written to inside a flatMap, as in the following example:

最后一种有趣的传播情况是将“Context”也写入其中的情况在“ flatMap”中,如以下示例所示:

String key = "message";
Mono<String> r =
        Mono.just("Hello")
            .flatMap( s -> Mono.subscriberContext()
                               .map( ctx -> s + " " + ctx.get(key))
            )
            .flatMap( s -> Mono.subscriberContext()
                               .map( ctx -> s + " " + ctx.get(key))
                               .subscriberContext(ctx -> ctx.put(key, "Reactor")) (1)
            )
            .subscriberContext(ctx -> ctx.put(key, "World")); (2)

StepVerifier.create(r)
            .expectNext("Hello World Reactor")
            .verifyComplete();
1 This subscriberContext does not impact anything outside of its flatMap. 这个“ subscriberContext”不会影响其“ flatMap”之外的任何内容。
2 This subscriberContext impacts the main sequence’s Context. 这个“ subscriberContext”会影响主序列的“ Context”。

In the preceding example, the final emitted value is "Hello World Reactor" and not "Hello Reactor World", because the subscriberContext that writes "Reactor" does so as part of the inner sequence of the second flatMap. As a consequence, it is not visible or propagated through the main sequence and the first flatMap does not see it. Propagation and immutability isolate the Context in operators that create intermediate inner sequences such as flatMap.

在前面的示例中,最终发出的值为“Hello World Reactor“,而不是” Hello Reactor World”,因为写有“ Reactor”的“ subscriberContext”是作为第二个“flatMap”的内部序列的一部分。 结果,它不可见或传播通过主序列,第一个`flatMap`看不到它。 传播和不变性将“Context”隔离在创建中间内部序列(例如“ flatMap”)的运算符中。

9.8.5. Full Example

Now we can consider a more real life example of a library reading information from the Context: a reactive HTTP client that takes a Mono<String> as the source of data for a PUT but also looks for a particular Context key to add a correlation ID to the request’s headers.

现在,我们可以考虑一个更现实的例子,该图书馆从中读取信息Context: 一个反应性HTTP客户端,该客户端将 Mono<String>作为的数据源,PUT同时还寻找一个特定的Context键,以将相关ID添加到请求的标头中。

From the user perspective, it is called as follows:

从用户的角度来看,它称为:

doPut("www.example.com", Mono.just("Walter"))

In order to propagate a correlation ID, it would be called as follows:

为了传播相关性ID,将其称为如下:

doPut("www.example.com", Mono.just("Walter"))
	.subscriberContext(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))

As the preceding snippets show, the user code uses subscriberContext to populate a Context with an HTTP_CORRELATION_ID key-value pair. The upstream of the operator is a Mono<Tuple2<Integer, String>> (a simplistic representation of an HTTP response) returned by the HTTP client library. So it effectively passes information from the user code to the library code.

如前面的片段所示,用户代码使用`subscriberContext`进行填充具有“ HTTP_CORRELATION_ID”键值对的“Context”。 操作员的上游是 一个Mono <Tuple2 <Integer,String >>`(HTTP响应的简单表示) 由HTTP客户端库返回。 因此它有效地传递了来自用户代码到库代码。

The following example shows mock code from the library’s perspective that reads the context and “augments the request” if it can find the correlation ID:

以下示例从库的角度显示了模拟代码,该代码读取了context和“augments the request”(如果可以找到相关ID):

static final String HTTP_CORRELATION_ID = "reactive.http.library.correlationId";

Mono<Tuple2<Integer, String>> doPut(String url, Mono<String> data) {
	Mono<Tuple2<String, Optional<Object>>> dataAndContext =
			data.zipWith(Mono.subscriberContext() (1)
			                 .map(c -> c.getOrEmpty(HTTP_CORRELATION_ID))); (2)

	return dataAndContext
			.<String>handle((dac, sink) -> {
				if (dac.getT2().isPresent()) { (3)
					sink.next("PUT <" + dac.getT1() + "> sent to " + url + " with header X-Correlation-ID = " + dac.getT2().get());
				}
				else {
					sink.next("PUT <" + dac.getT1() + "> sent to " + url);
				}
				sink.complete();
			})
			.map(msg -> Tuples.of(200, msg));
}
1 Materialize the Context through Mono.subscriberContext(). 通过 Mono.subscriberContext() 实现Context
2 Extract a value for a the correlation ID key, as an Optional. 提取相关性ID密钥的值,作为Optional。
3 If the key was present in the context, use the correlation ID as a header. 如果密钥存在于context中,则将相关性ID用作标题。

The library snippet zips the data Mono with Mono.subscriberContext(). This gives the library a Tuple2<String, Context>, and that context contains the HTTP_CORRELATION_ID entry from downstream (as it is on the direct path to the subscriber).

库片段将数据`Mono`压缩为`Mono.subscriberContext()。 这给库一个`Tuple2 <String,Context>,并且 上下文包含来自下游的HTTP_CORRELATION_ID条目(因为它位于直接订户的路径)。

The library code then uses map to extract an Optional<String> for that key, and, if the entry is present, it uses the passed correlation ID as a X-Correlation-ID header. That last part is simulated by the handle.

然后,库代码map用于提取Optional<String>该密钥的,并且,如果存在该条目,它将使用传递的相关ID作为X-Correlation-ID标头。最后一部分由进行模拟handle。

The whole test that validates the library code used the correlation ID can be written as follows:

验证使用相关ID的库代码的整个测试可以编写如下:

@Test
public void contextForLibraryReactivePut() {
	Mono<String> put = doPut("www.example.com", Mono.just("Walter"))
			.subscriberContext(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
			.filter(t -> t.getT1() < 300)
			.map(Tuple2::getT2);

	StepVerifier.create(put)
	            .expectNext("PUT <Walter> sent to www.example.com with header X-Correlation-ID = 2-j3r9afaf92j-afkaf")
	            .verifyComplete();
}

9.9. Dealing with Objects that Need Cleanup

In very specific cases, your application may deal with types that necessitate some form of cleanup once they are no longer in use. This is an advanced scenario — for, example when you have reference-counted objects or when you deal with off-heap objects. Netty’s ByteBuf is a prime example of both.

在非常特定的情况下,您的应用程序可能会处理不再需要使用某种形式的清理的类型。这是一种高级方案,例如,当您有引用计数的对象或处理堆外对象时。Netty ByteBuf是这两者的典型例子。

In order to ensure proper cleanup of such objects, you need to account for it on a Flux-by-Flux basis, as well as in several of the global hooks (see Using Global Hooks):

为了确保正确清理此类对象,您需要基于“ Flux”(逐个)“ Flux”以及多个全局挂钩(see Using Global Hooks)进行考虑:

  • The doOnDiscard Flux/Mono operator

  • The onOperatorError hook

  • The onNextDropped hook

  • Operator-specific handlers

This is needed because each hook is made with a specific subset of cleanup in mind, and users might want (for example) to implement specific error-handling logic in addition to cleanup logic within onOperatorError.

之所以需要这样做,是因为每个挂钩都是根据特定的清理子集来进行的,并且用户可能希望(例如)除中的清理逻辑之外还实现特定的错误处理逻辑onOperatorError。

Note that some operators are less adapted to dealing with objects that need cleanup. For example, bufferWhen can introduce overlapping buffers, and that means that the discard “local hook” we used earlier might see a first buffer as being discarded and cleanup an element in it that is in a second buffer, where it is still valid.

请注意,某些运算符不太适合处理需要清除的对象。 例如,“ bufferWhen”可以引入重叠的缓冲区,这意味着我们前面使用的丢弃“ local hook”可能会看到第一个缓冲区被丢弃,并清理其中第二个缓冲区中的元素。 仍然有效。

For the purpose of cleaning up, all these hooks MUST be IDEMPOTENT. They might on some occasions get applied several times to the same object. Unlike the doOnDiscard operator, which performs a class-level instanceOf check, the global hooks are also dealing with instances that can be any Object. It is up to the user’s implementation to distinguish between which instances need cleanup and which do not.
为了清理起见,所有这些钩子必须是确定的。 在某些情况下,它们可能会多次应用于同一对象。 与执行类级别的instanceOf检查的doOnDiscard运算符不同,全局挂钩也处理可以是任何Object的实例。 由用户的实现来区分哪些实例需要清除,哪些不需要。

9.9.1. The doOnDiscard Operator or Local Hook

This hook has been specifically put in place for cleanup of objects that would otherwise never be exposed to user code. It is intended as a cleanup hook for flows that operate under normal circumstances (not malformed sources that push too many items, which is covered by onNextDropped).

该挂钩专门用于清理对象,否则这些对象将永远不会暴露给用户代码。 它旨在用作在正常情况下运行的流的清理钩子(不是格式错误的源,用于推送过多的项目,这由`onNextDropped`覆盖)。

It is local, in the sense that it is activated through an operator and applies only to a given Flux or Mono.

它是本地的,从某种意义上说,它是通过激活操作符,并且仅适用于给定的Flux或Mono。

Obvious cases include operators that filter elements from upstream. These elements never reach the next operator (or final subscriber), but this is part of the normal path of execution. As such, they are passed to the doOnDiscard hook. Examples of when you might use the doOnDiscard hook include the following:

明显的情况包括从上游过滤元素的运算符。这些元素永远不会到达下一个运算符(或最终订户),但这是正常执行路径的一部分。 这样,它们就传递给了doOnDiscard钩子。何时使用doOnDiscard挂钩的示例包括:

  • filter: Items that do not match the filter are considered to be “discarded.” 与过滤器不匹配的项目被视为“已丢弃”。

  • skip: Skipped items are discarded. 跳过的项目将被丢弃

  • buffer(maxSize, skip) with maxSize < skip: A “dropping buffer” — items in between buffers are discarded. 一个“丢弃缓冲区” —缓冲区之间的项目被丢弃。

But doOnDiscard is not limited to filtering operators, and is also used by operators that internally queue data for backpressure purposes. More specifically, most of the time, this is important during cancellation. An operator that prefetches data from its source and later drains to its subscriber upon demand could have un-emitted data when it gets cancelled. Such operators use the doOnDiscard hook during cancellation to clear up their internal backpressure Queue.

但是`doOnDiscard`不仅限于过滤运算符,还被内部排队数据以用于反压目的的运算符使用。 更具体地说,在大多数情况下,这在取消期间很重要。 从其源中预取数据,然后根据需要排到其订阅者上的操作符在取消数据时可能会收到未发射的数据。 这样的操作员在取消操作期间使用`doOnDiscard`挂钩来清除其内部背压`Queue'。

Each call to doOnDiscard(Class, Consumer) is additive with the others, to the extent that it is visible and used by only operators upstream of it.
每次对doOnDiscard(Class,Consumer)的调用都会与其他调用相加,以使其只能被其上游的操作员看到并使用。

9.9.2. The onOperatorError hook

The onOperatorError hook is intended to modify errors in a transverse manner (similar to an AOP catch-and-rethrow).

onOperatorError钩子旨在以横向方式修改错误(类似于AOP的捕捉和抛出)。

When the error happens during the processing of an onNext signal, the element that was being emitted is passed to onOperatorError.

当在处理onNext信号期间发生错误时,将要发出的元素传递给onOperatorError。

If that type of element needs cleanup, you need to implement it in the onOperatorError hook, possibly on top of error-rewriting code.

如果需要清除该类型的元素,则需要在onOperatorError钩子中实现它,可能在错误重写代码的顶部。

9.9.3. The onNextDropped Hook

With malformed Publishers, there could be cases where an operator receives an element when it expected none (typically, after having received the onError or onComplete signals). In such cases, the unexpected element is “dropped” — that is, passed to the onNextDropped hook. If you have types that need cleanup, you must detect these in the onNextDropped hook and implement cleanup code there as well.

对于格式错误的发布者,在某些情况下,操作员可能会在元素预期没有元素时收到该元素(通常是在收到onError或onComplete信号之后)。 在这种情况下,意外元素将被“丢弃”,即传递给onNextDropped挂钩。 如果您有需要清除的类型,则必须在onNextDropped挂钩中检测到它们,并在其中也执行清除代码。

9.9.4. Operator-specific Handlers

Some operators that deal with buffers or collect values as part of their operations have specific handlers for cases where collected data is not propagated downstream. If you use such operators with the type(s) that need cleanup, you need to perform cleanup in these handlers.

一些处理缓冲区或在其操作过程中收集值的运算符具有特定的处理程序,用于收集的数据不向下游传播的情况。 如果将此类运算符与需要清除的类型一起使用,则需要在这些处理程序中执行清除。

For example, distinct has such a callback that is invoked when the operator terminates (or is cancelled) in order to clear the collection it uses to judge whether an element is distinct or not. By default, the collection is a HashSet, and the cleanup callback is a HashSet::clear. However, if you deal with reference-counted objects, you might want to change that to a more involved handler that would release each element in the set before calling clear() on it.

例如,distinct具有这样的回调,当操作员终止(或取消)时,将调用该回调,以清除其用于判断元素是否与众不同的集合。 默认情况下,集合是HashSet,清理回调是HashSet :: clear。 但是,如果处理引用计数的对象,则可能需要将其更改为涉及更多的处理程序,该处理程序在调用clear()之前会释放集合中的每个元素。

9.10. Null Safety

Although Java does not allow expressing null-safety with its type system, Reactor now provides annotations to declare nullability of APIs, similar to those provided by Spring Framework 5.

尽管Java不允许使用其类型系统表示空安全性,但是Reactor现在提供了注释来声明API的空性,类似于Spring Framework 5提供的注释。

Reactor uses these annotations, but they can also be used in any Reactor-based Java project to declare null-safe APIs. Nullability of the types used inside method bodies is outside of the scope of this feature.

Reactor使用这些注释,但也可以在任何基于Reactor的Java项目中使用它们来声明空安全的API。 方法主体内使用的类型的可空性超出了此功能的范围。

These annotations are meta-annotated with JSR 305 annotations (a dormant JSR that is supported by tools such as IntelliJ IDEA) to provide useful warnings to Java developers related to null-safety in order to avoid NullPointerException at runtime. JSR 305 meta-annotations let tooling vendors provide null safety support in a generic way, without having to hard-code support for Reactor annotations.

这些注释用JSR 305注释(由IntelliJ IDEA之类的工具支持的休眠JSR)进行元注释,以向Java开发人员提供有关空安全性的有用警告,以避免在运行时出现NullPointerException。 JSR 305元注释使工具供应商能够以通用方式提供空安全支持,而不必对Reactor注释进行硬编码支持。

It is not necessary nor recommended with Kotlin 1.1.5+ to have a dependency on JSR 305 in your project classpath.

They are also used by Kotlin, which natively supports null safety. See this dedicated section for more details.

The following annotations are provided in the reactor.util.annotation package:

  • @NonNull: Indicates that a specific parameter, return value, or field cannot be null. (It is not needed on parameters and return values where @NonNullApi applies) .

  • @Nullable: Indicates that a parameter, return value, or field can be null.

  • @NonNullApi: Package-level annotation that indicates non-null is the default behavior for parameters and return values.

  • 指示特定的参数,返回值或字段不能为null。 (在@NonNullApi适用的参数和返回值上不需要)。

  • 表示参数,返回值或字段可以为null。

  • 指示非空的程序包级注释是参数和返回值的默认行为。

Nullability for generic type arguments, variable arguments, and array elements is not yet supported. See issue #878 for up-to-date information.

Appendix A: Which operator do I need?

In this section, if an operator is specific to Flux or Mono, it is prefixed accordingly. Common operators have no prefix. When a specific use case is covered by a combination of operators, it is presented as a method call, with leading dot and parameters in parentheses, as follows: .methodCall(parameter).

I want to deal with:

A.1. Creating a New Sequence…​

  • that emits a T, and I already have: just

    • …​from an Optional<T>: Mono#justOrEmpty(Optional<T>)

    • …​from a potentially null T: Mono#justOrEmpty(T)

  • that emits a T returned by a method: just as well

    • …​but lazily captured: use Mono#fromSupplier or wrap just inside defer

  • that emits several T I can explicitly enumerate: Flux#just(T…​)

  • that iterates over:

    • an array: Flux#fromArray

    • a collection or iterable: Flux#fromIterable

    • a range of integers: Flux#range

    • a Stream supplied for each Subscription: Flux#fromStream(Supplier<Stream>)

  • that emits from various single-valued sources such as:

    • a Supplier<T>: Mono#fromSupplier

    • a task: Mono#fromCallable, Mono#fromRunnable

    • a CompletableFuture<T>: Mono#fromFuture

  • that completes: empty

  • that errors immediately: error

    • …​but lazily build the Throwable: error(Supplier<Throwable>)

  • that never does anything: never

  • that is decided at subscription: defer

  • that depends on a disposable resource: using

  • that generates events programmatically (can use state):

    • synchronously and one-by-one: Flux#generate

    • asynchronously (can also be sync), multiple emissions possible in one pass: Flux#create (Mono#create as well, without the multiple emission aspect)

  • 发射 T, 已经有了: just

    • …​获取一个 Optional<T>: Mono#justOrEmpty(Optional<T>)

    • …​获取一个可能为 null 的T值: Mono#justOrEmpty(T)

  • 通过方法返回一个`T`并进行发射`T`: just as well

    • …​为了延迟加载: 使用 Mono#fromSupplier or 在定义的内部返回一个包装的just

  • 发射几个可以明确列举的 T : Flux#just(T…​)

  • 遍历:

    • 一个数组: Flux#fromArray

    • 一个 collection 或者 iterable: Flux#fromIterable

    • 一个整数范围: Flux#range

    • 为每个订阅提供了一个“Stream”: Flux#fromStream(Supplier<Stream>)

  • 从各种单值来源发出,例如:

    • 一个 Supplier<T>: Mono#fromSupplier

    • 一个任务: Mono#fromCallable, Mono#fromRunnable

    • 一个 CompletableFuture<T>: Mono#fromFuture

  • 完成一个空值: empty

  • 立即错误: error

    • …​也可以延迟构建 Throwable: error(Supplier<Throwable>)

  • 不做如何事情: never

  • 在订阅时定义: defer

  • 取决于可用资源: using

  • 以编程方式生成事件 (可使用状态):

    • 同步的: Flux#generate

    • 异步地 (也可以同步), 一次通过可能产生多种发射: Flux#create (Mono#create as well, without the multiple emission aspect)

A.2. Transforming an Existing Sequence

  • I want to transform existing data:

    • on a 1-to-1 basis (eg. strings to their length): map

      • …​by just casting it: cast

      • …​in order to materialize each source value’s index: Flux#index

    • on a 1-to-n basis (eg. strings to their characters): flatMap + use a factory method

    • on a 1-to-n basis with programmatic behavior for each source element and/or state: handle

    • running an asynchronous task for each source item (eg. urls to http request): flatMap + an async Publisher-returning method

      • …​ignoring some data: conditionally return a Mono.empty() in the flatMap lambda

      • …​retaining the original sequence order: Flux#flatMapSequential (this triggers the async processes immediately but reorders the results)

      • …​where the async task can return multiple values, from a Mono source: Mono#flatMapMany

  • I want to add pre-set elements to an existing sequence:

    • at the start: Flux#startWith(T…​)

    • at the end: Flux#concatWith(T…​)

  • I want to aggregate a Flux: (the Flux# prefix is assumed below)

    • into a List: collectList, collectSortedList

    • into a Map: collectMap, collectMultiMap

    • into an arbitrary container: collect

    • into the size of the sequence: count

    • by applying a function between each element (eg. running sum): reduce

      • …​but emitting each intermediary value: scan

    • into a boolean value from a predicate:

      • applied to all values (AND): all

      • applied to at least one value (OR): any

      • testing the presence of any value: hasElements

      • testing the presence of a specific value: hasElement

  • 我想转换现有数据

    • 基于一对一 (例如. 字符串长度): map

      • …​只需投射它: cast

      • …​为了实现每个源值的索引: Flux#index

    • 在1-n的基础上(例如,将字符串转换为字符): flatMap + 使用工厂方法

    • 在一对一的基础上,针对每个源元素和/或状态进行编程行为: handle

    • 为每个源项目运行异步任务(例如,http请求的网址): flatMap + 异步 Publisher 返回方法

      • …​忽略一些数据:在flatMap lambda中有条件地返回`Mono.empty()`

      • …​保留原始顺序:Flux#flatMapSequential(这会立即触发异步过程,但会对结果重新排序)

      • …​异步任务可以从“ Mono”源返回多个值:“ Mono#flatMapMany”

  • 我想将预设元素添加到现有序列中:

    • 在开头: Flux#startWith(T…​)

    • 在结尾: Flux#concatWith(T…​)

  • 我想要聚合一个 Flux: (the Flux# prefix is assumed below)

    • 转换一个 List: collectList, collectSortedList

    • 转换一个 Map: collectMap, collectMultiMap

    • 放入任意容器中: collect

    • 转换为序列的大小: count

    • 通过在每个元素之间应用函数(例如,运行总和): reduce

      • …​散发出每个中介价值: scan

    • 断言转换成布尔值:

      • 应用于所有值(AND): all

      • 应用于至少一个值(OR): any

      • 测试是否存在任何值: hasElements

      • 测试特定值的存在: hasElement

  • 我要合并发布者…​

    • 按顺序:Flux#concat或.concatWith(other)

      • …​会延迟任何错误,直到发出剩余的发布者为止:Flux#concatDelayError

      • …​热切地订阅后续的发布者:Flux#mergeSequential

    • 按排放顺序(合并的项目随其发射而来):Flux#merge /.mergeWith(other)

      • …​具有不同类型的。(转换合并):Flux#zip /Flux#zipWith

    • 通过配对值:

      • 从2个Monos变成一个`Tuple2`:Mono#zipWith

      • 全部完成时从n个Monos中获取:Mono#zip

    • 通过协调其终止:

      • 从1个Mono和任何源到一个Mono <Void>`:Mono#and

      • 当它们全部完成时,从n个来源中获取:Mono#when

      • 转换为任意容器类型:

        • 每次发射出所有信号时:Flux#zip(直至最小基数)

        • 每当有新值到达任一侧时:Flux#combineLatest

    • 只考虑先发出的序列:Flux#firstMono#firstmono.or (otherMono).or(thirdMono)flux.or(otherFlux).or(thirdFlux)

    • 由源序列中的元素触发:switchMap(每个源元素都映射到发布者)

    • 由发布者序列中的下一个发布者的开始触发:switchOnNext

  • 我想重复一个现有的序列: repeat

    • …​每隔一段时间: Flux.interval(duration).flatMap(tick → myExistingPublisher)

  • 我有一个空序列,但是…​

    • 我想要一个值: defaultIfEmpty

    • 我想要另一个序列: switchIfEmpty

  • 我有一个序列,但对值不感兴趣: ignoreElements

    • …​我希望将完成表示为 Mono: then

    • …​我想等到最后一个任务完成: thenEmpty

    • …​最后我想切换到另一个`Mono`: Mono#then(mono)

    • …​我最后想发出一个值: Mono#thenReturn(T)

    • …​最后我想切换到 Flux: thenMany

  • 我有一个Mono,我想推迟完成…​

    • …​直到从该值派生的另一个发布者完成为止: Mono#delayUntil(Function)

  • 我想要递归地将元素展开成序列图,并发出组合……​

    • …​首先扩展图的宽度: expand(Function)

    • …​首先扩展图形深度: expandDeep(Function)

A.3. Peeking into a Sequence

  • 在不修改最终顺序的情况下,我想:

    • 得到通知/执行附加行为(有时称为“副作用”):

      • 发射: doOnNext

      • 完成: Flux#doOnComplete, Mono#doOnSuccess (包含结果,如果有的话)

      • 错误终止: doOnError

      • 消除: doOnCancel

      • 序列的“开始”: doFirst

        • 对应 Publisher#subscribe(Subscriber)

      • 订阅后 : doOnSubscribe

        • 就像在 Subscription subscribe 确认之后

        • 对应`Subscriber#onSubscribe(Subscription)`

      • 请求: doOnRequest

      • 完成或错误: doOnTerminate (Mono 版本包含结果)

        • 但是 after 会被传播到下游: doAfterTerminate

      • 任何类型的信号,表示为 Signal: Flux#doOnEach

      • 任何终止条件 (complete, error, cancel): doFinally

    • 记录内部发生的情况: log

  • 我想知道所有事件:

    • 每个表示为Signal对象:

      • 在序列外的回调中: doOnEach

      • 而不是原始的onNext发射: materialize

        • …​回到onNexts: dematerialize

    • 作为日志中的一行: log

A.4. Filtering a Sequence

  • 我想过滤一个序列:

    • 基于任意标准: filter

      • …​是异步计算的: filterWhen

    • 限制发射对象的类型: ofType

    • 通过完全忽略这些值: ignoreElements

    • 通过忽略重复项:

      • 在整个序列(逻辑集合)中: Flux#distinct

      • 在后续发射的项目之间(重复数据删除): Flux#distinctUntilChanged

  • 我只想保留序列的一个子集:

    • 通过取N个元素:

      • 在序列的开头: Flux#take(long)

        • …​基于持续时间: Flux#take(Duration)

        • …​仅第一个元素, 如 Mono: Flux#next()

        • …​使用request(N)而不是取消: Flux#limitRequest(long)

      • 在序列的末尾: Flux#takeLast

      • u直到满足标准(包括)为止: Flux#takeUntil (predicate-based), Flux#takeUntilOther (companion publisher-based)

      • 符合条件(不包括)时: Flux#takeWhile

    • 最多采用1个元素:

      • 在特定位置: Flux#elementAt

      • 在末尾: .takeLast(1)

        • …​并且如果为空则发出错误: Flux#last()

        • …​并发出默认值(如果为空: Flux#last(T)

    • 通过跳过元素:

      • 在序列的开头: Flux#skip(long)

        • …​基于持续时间: Flux#skip(Duration)

      • 在序列的末尾: Flux#skipLast

      • 直到满足标准(包括)为止: Flux#skipUntil (predicate-based), Flux#skipUntilOther (companion publisher-based)

      • 符合条件(不包括)时: Flux#skipWhile

    • 通过抽样项目:

      • 按持续时间: Flux#sample(Duration)

        • 但是将第一个元素而不是最后一个元素保留在采样窗口中: sampleFirst

      • 通过基于发布者的窗口: Flux#sample(Publisher)

      • 基于发布者的“定时” : Flux#sampleTimeout (每个元素触发一个发布者,如果该发布者与下一个不重叠则发出)

  • 我期望最多1个元素(如果超过一个元素则会出错)…​

    • 如果序列为空,我想要一个错误: Flux#single()

    • 如果序列为空,我想要一个默认值: Flux#single(T)

    • 我也接受一个空序列: Flux#singleOrEmpty

A.5. Handling Errors

  • 我想创建一个错误的序列: error…​

    • …​代替成功的完成 Flux: .concat(Flux.error(e))

    • …​代替成功发射的 Mono: .then(Mono.error(e))

    • …​如果两次onNexts之间的时间过长: timeout

    • …​异步: error(Supplier<Throwable>)

  • 我想要等效的try / catch:

    • throwing: error

    • 捕获异常:

      • 并退回到默认值: onErrorReturn

      • 然后掉落到另一个Flux或Mono: onErrorResume

      • 并包装并重新抛出: .onErrorMap(t → new RuntimeException(t))

    • the finally block: doFinally

    • Java 7中的使用模式: using factory method

  • 我想从错误中恢复…​

    • 通过后退:

      • 值: onErrorReturn

      • 转换到 Publisher or Mono, 可能是不同的,这取决于不同错误: Flux#onErrorResume and Mono#onErrorResume

    • 重试: retry

      • …​从简单的策略(尝试的最大数量: retry(), retry(long)

      • …​由同伴控件Flux触发: retryWhen

      • …​ 使用标准的回退策略(带抖动的指数回退):retryWhen(Retry.backoff(…))(请参阅重试中的其他工厂方法)

  • 我想处理背压``错误''(来自上游的最大请求数,并在下游没有产生足够的请求时应用该策略)…​

    • 通过抛出一个特殊的 IllegalStateException: Flux#onBackpressureError

    • 通过删除多余的值: Flux#onBackpressureDrop

      • …​除了最后看到的一个: Flux#onBackpressureLatest

    • 通过缓冲多余的值(有界或无界: Flux#onBackpressureBuffer

      • …​在有界缓冲区也溢出时应用策略: Flux#onBackpressureBuffer with a BufferOverflowStrategy

A.6. Working with Time

  • 我想将发射与时间相联系起来 (Tuple2<Long, T>) 测量…​

    • 自订阅以来: elapsed

    • 从时间的开端 (well, 计算机的时间): timestamp

  • 如果发射之间的延迟太长,我希望中断我的序列: timeout

  • 我想从一个时钟上得到滴答声,有规律的时间间隔: Flux#interval

  • 我想在初始延迟后发出一个0 : static Mono.delay.

  • 我想引进一个延迟: **每个onNext信号之间: Mono#delayElement, Flux#delayElements

    • 订阅发生之前: delaySubscription

A.7. Splitting a Flux

  • 我想通过边界条件将 Flux<T>划分为Flux<Flux<T>>:

    • 大小: window(int)

      • …​窗口重叠或掉落: window(int, int)

    • 时间的 window(持续)

      • …​重叠或掉落的窗户: window(Duration, Duration)

    • 大小或者时间 (达到计数或超时后窗口关闭): windowTimeout(int, Duration)

    • 基于元素的发射: windowUntil

      • …​…在下一个窗口中发出触发边界的元素 (cutBefore variant): .windowUntil(predicate, true)

      • …​元素与断言匹配时保持窗口打开: windowWhile (没有匹配的元素不发射)

    • 由控件发布器中的ONEXTS可以驱动任意边界: window(Publisher), windowWhen

  • 我想将一个 Flux<T>元素 和缓冲区中的元素分割在一起

    • 转换到 List:

      • 通过大小边界: buffer(int)

        • …​缓冲区重叠或丢弃: buffer(int, int)

      • 通过持续时间边界: buffer(Duration)

        • …​缓冲区重叠或丢弃: buffer(Duration, Duration)

      • 通过大小或持续时间边界: bufferTimeout(int, Duration)

      • 通过任意标准边界: bufferUntil(Predicate)

        • …​将触发边界的元素放入下一个缓冲区: .bufferUntil(predicate, true)

        • …​在断言匹配时缓冲并丢弃触发边界的元素 bufferWhile(Predicate)

      • 由控件发布器中的ONEXTS可以驱动任意边界: buffer(Publisher), bufferWhen

    • 变成任意的“集合”类型C:使用变体,如 buffer(int, Supplier<C>)

  • 我想分裂一个`Flux<T>` 这样一个具有相同特性的元素就会在同一个子flux中结束: groupBy(Function<T,K>) 提示:请注意,这将返回一个 Flux<GroupedFlux<K, T>>,每个内部GroupedFlux共享K可通过访问的相同的方法key().

A.8. Going Back to the Synchronous World

注意:在一个"非堵塞"的`Scheduler` (by default parallel() and single()) 上调用所有这些方法, 除了 `Mono#toFuture`都会抛出一个`UnsupportedOperatorException`的异常,

  • 我有一个 Flux<T> 我想要:

    • 阻塞,直到我得到第一个元素: Flux#blockFirst

      • …​超时: Flux#blockFirst(Duration)

    • 阻塞,直到我可以获取最后一个元素(如果为空,则返回null): Flux#blockLast

      • …​超时: Flux#blockLast(Duration)

    • 同步切换到 Iterable<T>: Flux#toIterable

    • 同步切换到Java 8 Stream<T>: Flux#toStream

  • 我有一个 Mono<T> and 我想要:

    • 阻塞直到我可以得到值: Mono#block

      • …​超时: Mono#block(Duration)

    • a CompletableFuture<T>: Mono#toFuture

A.9. Multicasting a Flux to several Subscribers

  • 我想将多个 Subscriber 链接到 Flux:

    • 决定何时触发源,使用`connect(): `publish() (returns a ConnectableFlux)

    • 并立即触发源(后期订阅者可以查看以后的数据): share()

    • 当有足够的用户注册时,永久连接源: .publish().autoConnect(n)

    • 当组织者超过/低于阈值时,自动连接和取消源: .publish().refCount(n)

      • ..​让新的注册者有机会在取消前进来: .publish().refCountGrace(n, Duration)

  • 我想从发布服务器缓存数据并将其重放给以后的订阅者:

    • 取决于n元素: cache(int)

    • 缓存Duration(生存时间)中看到的最新元素 (Time-To-Live): cache(Duration)

      • ..只保留以下n元素: cache(int, Duration)

    • 但不立即触发来源: Flux#replay (returns a ConnectableFlux)

Appendix B: FAQ, Best Practices, and "How do I…​?"

This section covers the following content:

B.1. How Do I Wrap a Synchronous, Blocking Call? 如何包装同步阻塞呼叫?

通常情况下,信息源是同步和阻塞的。 若要处理反应堆应用程序中的此类源,请应用以下模式:

Mono blockingWrapper = Mono.fromCallable(() -> { (1)
    return /* make a remote synchronous call */ (2)
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.boundedElastic()); (3)
1 使用fromCallable创建一个新Mono的。
2 返回异步的阻塞资源.
3 确保每个订阅都发生在来自`Schedulers.boundedElastic()`的专用单线程上运行

You should use a Mono, because the source returns one value. You should use Schedulers.boundedElastic, because it creates a dedicated thread to wait for the blocking resource without impacting other non-blocking processing, while also ensuring that there is a limit to the amount of threads that can be created, and blocking tasks that can be enqueued and deferred during a spike.

您应该使用 Mono,因为源返回一个值。 您应该使用`Schedulers.boundedElastic`,因为它创建了一个专用线程来等待阻塞资源,而不会影响其他非阻塞处理, 同时还确保可以创建的线程数量有限制,以及可以在尖峰期间排队和延迟的阻塞任务。

Note that subscribeOn does not subscribe to the Mono. It specifies what kind of Scheduler to use when a subscribe call happens.

请注意,subscribeOn 不订阅Mono。 它指定在发生订阅调用时使用哪种调度器。

B.2. I Used an Operator on my Flux but it Doesn’t Seem to Apply. What Gives?

Make sure that the variable you .subscribe() to has been affected by the operators you think should have been applied to it.

确保你`subscribe()`的变量已经受到您认为应该应用于它的运算符的影响。

Reactor operators are decorators. They return a different instance that wraps the source sequence and add behavior. That is why the preferred way of using operators is to chain the calls.

Reactor操作符是装饰。 它们返回一个不同的实例,该实例包裹源序列并添加行为。 这就是为什么使用运算符的首选方法是将通过调用链接起来。

Compare the following two examples:

比较以下两个示例:

Flux<String> flux = Flux.just("something", "chain");
flux.map(secret -> secret.replaceAll(".", "*")); (1)
flux.subscribe(next -> System.out.println("Received: " + next));
1 The mistake is here. The result is not attached to the flux variable. 错误就在这里。结果未附加到flux变量。
Example 21. without chaining (correct) 没有链接(正确)
Flux<String> flux = Flux.just("something", "chain");
flux = flux.map(secret -> secret.replaceAll(".", "*"));
flux.subscribe(next -> System.out.println("Received: " + next));

The following sample is even better (because it is simpler): 以下示例甚至更好(因为它更简单):

Flux<String> secrets = Flux
  .just("something", "chain")
  .map(secret -> secret.replaceAll(".", "*"))
  .subscribe(next -> System.out.println("Received: " + next));

The first version outputs the following: 第一个版本输出以下内容:

Received: something
Received: chain

The two other versions output the expected values, as follows: 其他两个版本输出期望值,如下所示:

Received: *********
Received: *****

== My Mono zipWith or zipWhen is never called

Consider the following example:

myMethod.process("a") // this method returns Mono<Void>
        .zipWith(myMethod.process("b"), combinator) //this is never called
        .subscribe();

If the source Mono is either empty or a Mono<Void> (a Mono<Void> is empty for all intents and purposes), some combinations are never called.

This is the typical case for any transformer such as the zip static method or the zipWith zipWhen operators, which (by definition) need an element from each source to produce their output.

Using data-suppressing operators on sources of zip is thus problematic. Examples of data-suppressing operators include then(), thenEmpty(Publisher<Void>), ignoreElements() and ignoreElement(), and when(Publisher…​).

Similarly, operators that use a Function<T,?> to tune their behavior, such as flatMap, need at least one element to be emitted for the Function to have a chance to apply. Applying these on an empty (or <Void>) sequence nevers produce an element.

You can use .defaultIfEmpty(T) and .switchIfEmpty(Publisher<T>) to replace an empty sequence of T with a default value or a fallback Publisher<T> (respectively), which could help avoid some of these situations. Note that this does not apply to Flux<Void>/Mono<Void> sources, as you can only switch to another Publisher<Void>, which is still guaranteed to be empty. The following example uses defaultIfEmpty:

Example 22. use defaultIfEmpty before zipWhen
myMethod.emptySequenceForKey("a") // this method returns empty Mono<String>
        .defaultIfEmpty("") // this converts empty sequence to just the empty String
        .zipWhen(aString -> myMethod.process("b")) //this is called with the empty String
        .subscribe();

B.3. How to Use retryWhen to Emulate retry(3)?

The retryWhen operator can be quite complex. Hopefully the following snippet of code can help you understand how it works by attempting to emulate a simpler retry(3):

retryWhen运算符可能会相当复杂。希望以下代码片段可以通过尝试模拟更简单的代码来帮助您了解其工作原理 retry(3):

Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
    .retryWhen(companion -> companion
    .zipWith(Flux.range(1, 4), (1)
          (error, index) -> { (2)
            if (index < 4) return index; (3)
            else throw Exceptions.propagate(error); (4)
          })
    );
1 Trick one: use zip and a range of "number of acceptable retries + 1". 技巧一:使用zip和range“可接受的重试次数+ 1”
2 The zip function lets you count the retries while keeping track of the original error. 该zip功能可让您计算重试次数,同时跟踪原始错误
3 To allow for three retries, indexes before 4 return a value to emit.为了允许进行三次重试,4之前的索引将返回一个要发出的值
4 In order to terminate the sequence in error, we throw the original exception after these three retries. 为了终止错误的序列,我们在这三个重试之后抛出了原始异常

B.4. How can I use retryWhen for Exponential Backoff?

指数补偿产生重试尝试,每次尝试之间的延迟越来越大,以避免使源系统过载,并冒全面崩溃的风险。 理由是,如果源产生错误,它已经处于不稳定状态,不太可能立即从中恢复。 因此,盲目地立即重试可能会产生另一个错误,并增加不稳定性。

Since 3.2.0.RELEASE, Reactor comes with such a retry baked in: Flux.retryBackoff.

从3.3.4开始。 反应堆配备了一个构建器来进行这样的重试,Flux.retryBackoff.

The following example shows how to implement an exponential backoff with retryWhen. It delays retries and increases the delay between each attempt (pseudocode: delay = attempt number * 100 milliseconds):

下面的示例展示了构建器的简单使用,在重试尝试延迟之前和之后都有钩子日志消息。 它延迟重试并增加每次尝试之间的延迟(伪代码:延迟=尝试数*100毫秒):

AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalStateException("boom"))
		.doOnError(e -> {  (1)
			errorCount.incrementAndGet();
			System.out.println(e + " at " + LocalTime.now());
		})
		.retryWhen(Retry
				.backoff(3, Duration.ofMillis(100)).jitter(0d) (2)
				.doAfterRetry(rs -> System.out.println("retried at " + LocalTime.now())) (3)
				.onRetryExhaustedThrow((spec, rs) -> rs.failure()) (4)
		);
1 We will log the time of errors emitted by the source and count them. 我们将记录源发出的错误的时间并计数它们。
2 We configure an exponential backoff retry with at most 3 attempts and no jitter. 我们配置一个指数备份重试,最多3次尝试,没有抖动。
3 We also log the time at which the retry happens. 我们还记录了重试发生的时间。
4 By default an Exceptions.retryExhausted exception would be thrown, with the last failure() as a cause. Here we customize that to directly emit the cause as onError. 默认情况下,Exceptions.retryExhausted 异常将被抛出,最后一个failure()作为原因。 在这里,我们自定义它,以直接发出原因作为错误。

When subscribed to, this fails and terminates after printing out the following: 如果订阅,则失败并在打印出以下内容后终止:

java.lang.IllegalArgumentException at 18:02:29.338
retried at 18:02:29.459 (1)
java.lang.IllegalArgumentException at 18:02:29.460
retried at 18:02:29.663 (2)
java.lang.IllegalArgumentException at 18:02:29.663
retried at 18:02:29.964 (3)
java.lang.IllegalArgumentException at 18:02:29.964
1 大约100毫秒后第一次重试
2 大约200毫秒后的第二次重试
3 大约300毫秒后的第三次重试

B.5. How Do I Ensure Thread Affinity when I Use publishOn()?

As described in Schedulers, publishOn() can be used to switch execution contexts. The publishOn operator influences the threading context where the rest of the operators in the chain below it run, up to a new occurrence of publishOn. So the placement of publishOn is significant.

正如调度程序中所描述的,`publishOn()`可用于切换执行上下文。 这个`publishOn()`操作符影响线程上下文,其中它下面的链中的其他操作符运行, 直到发布新的`publishOn`出现。 因此,`publishOn`的位置是重要的。

Consider the following example: 考虑以下例子:

EmitterProcessor<Integer> processor = EmitterProcessor.create();
processor.publishOn(scheduler1)
         .map(i -> transform(i))
         .publishOn(scheduler2)
         .doOnNext(i -> processNext(i))
         .subscribe();

The transform function in map() is run on a worker of scheduler1, and the processNext method in doOnNext() is run on a worker of scheduler2. Each subscription gets its own worker, so all elements pushed to the corresponding subscriber are published on the same Thread.

`map()`中的`transform`函数在`调度程序1`的上运行,而在`doOnNext()`中的`processNext`方法在调度程序2上运行。 每个订阅都有自己的工作人员,因此推送到相应订阅者的所有元素都发布在同一个线程上。

You can use single-threaded schedulers to ensure thread affinity for different stages in the chain or for different subscribers.

您可以使用单线程调度程序来确保链中不同阶段或不同订阅者的线程亲和力。

B.6. What Is a Good Pattern for Contextual Logging? (MDC)

Most logging frameworks allow contextual logging, letting users store variables that are reflected in the logging pattern, generally by way of a Map called the MDC ("Mapped Diagnostic Context"). This is one of the most recurring use of ThreadLocal in Java, and as a consequence this pattern assumes that the code being logged is tied in a one-to-one relationship with a Thread.

大多数日志框架允许上下文日志记录,允许用户存储日志模式中反映的变量,通常是通过名为MDC的Map(“Mapped Diagnostic Context”)。 这是Java中最经常使用ThreadLocal的方法之一,因此,这种模式假设正在记录的代码与Thread进行一对一的关系绑定。

That might have been a safe assumption to make before Java 8, but with the advent of functional programming elements in the Java language things have changed a bit…​

在Java8之前,这可能是一个安全的假设,但是随着Java语言中函数编程元素的出现,事情发生了一些变化…​​。

Let’s take the example of a API that was imperative and used the template method pattern, then switches to a more functional style. With the template method pattern, inheritance was at play. Now in its more functional approach, higher order functions are passed to define the "steps" of the algorithm. Things are now more declarative than imperative, and that frees the library to make decisions about where each step should run. For instance, knowing which steps of the underlying algorithm can be parallelized, the library can use an ExecutorService to execute some of the steps in parallel.

让我们举一个API的例子,它是必需的,并使用模板方法模式,然后切换到一个更实用的样式。 使用模板方法模式,继承正在发挥作用。 现在,在其更多的功能方法中,传递高阶函数来定义算法的“步骤”。 事情现在更多的是陈述性的而不是命令性的,这使得库可以自由地决定每个步骤应该运行在哪里。 例如,知道底层算法的哪些步骤可以并行化,库可以使用`ExecutorService`并行执行一些步骤。

One concrete example of such a functional API is the Stream API introduced in Java 8 and its parallel() flavor. Logging with a MDC in a parallel Stream is not a free lunch: one need to ensure the MDC is captured and reapplied in each step.

这种功能API的一个具体例子是Java8中引入的StreamAPI及`parallel()`风味。 在平行的`Stream`中通过MDC记录信息并不是免费午餐,需要确保在每个步骤中捕获和重新应用 MDC。

The functional style enables such optimizations, because each step is thread-agnostic and referentially transparent, but it can break the MDC assumption of a single Thread. The most idiomatic way of ensuring any kind of contextual information is accessible to all stages would be to pass that context around through the composition chain. During the development of Reactor we encountered the same general class of problem, and we wanted to avoid this very hands-down and explicit approach. This is why the Context was introduced: it propagates through the execution chain as long as Flux and Mono are used as the return value, by letting stages (operators) peek at the Context of their downstream stage. So instead of using ThreadLocal, Reactor offers this map-like object that is tied to a Subscription and not a Thread.

功能样式允许这样的优化,因为每个步骤都是线程不可知的和区域透明的,但它可以打破单个线程的MDC假设。 确保所有阶段都能获得任何类型的上下文信息的最惯用的方法是通过组合链传递这种上下文。 在Reactor的发展过程中,我们遇到了同样的一般问题,我们希望避免这种非常直接和明确的方法。 这就是为什么引入`Context`:它通过执行链传播,只要Flux和Mono被用作返回值,就可以让阶段(运算符)窥视其下游阶段的上下文。 因此,`Reactor`没有使用ThreadLocal,而是提供了这个与订阅而不是线程绑定的类似映射的对象。

Now that we’ve established that MDC "just working" is not the best assumption to make in a declarative API, how can we perform contextualized log statements in relation to events in a Reactive Stream (onNext, onError, and onComplete)?

既然我们已经确定MDC“只是能用”不是在声明性API中做出的最佳使用,那么我们如何才能对Reactive中的事件执行上下文化的日志声明 (onNext, onError, and onComplete)?

This entry of the FAQ offers a possible intermediate solution when one wants to log in relation to these signals in a straightforward and explicit manner. Make sure to read the Adding a Context to a Reactive Sequence section beforehand, and especially how a write must happen towards the bottom of the operator chain for operators above it to see it.

当一个人希望以直接和明确的方式记录这些信号时,常见问题的这一条目提供了一个可能的中间解决方案。 确保事先阅读添加上下文到反应性序列部分,特别是写必须如何发生在操作符链的底部,以便上面的操作符看到它。

To get contextual information from the Context to the MDC, the simplest way is to wrap logging statements in a doOnEach operator with a little bit of boilerplate code. This boilerplate depends on both the logging framework/abstraction of your choice and the information you want to put in the MDC, so it has to be in your codebase.

要从上下文中获取上下信息到MDC,最简单的方法是用一点点样板代码将日志语句包装在每个操作符上。 这个样板取决于您选择的日志框架/抽象以及您想在MDC中放置的信息,因此它必须位于代码库中。

The following is an example of such a helper function around a single MDC variable and focused on logging onNext events, using Java 9 enhanced Optional API:

下面是围绕单个MDC变量的这种辅助函数的示例,重点是使用Java9增强的可选的API记录 onNext 事件:

public static <T> Consumer<Signal<T>> logOnNext(Consumer<T> logStatement) {
	return signal -> {
		if (!signal.isOnNext()) return; (1)
		Optional<String> toPutInMdc = signal.getContext().getOrEmpty("CONTEXT_KEY"); (2)

		toPutInMdc.ifPresentOrElse(tpim -> {
			try (MDC.MDCCloseable cMdc = MDC.putCloseable("MDC_KEY", tpim)) { (3)
				logStatement.accept(signal.get()); (4)
			}
		},
		() -> logStatement.accept(signal.get())); (5)
	};
}
1 doOnEach信号包括onComplete和onError。在此示例中,我们仅对日志记录感兴趣`onNext`
2 我们将从反Reactor上下文中提取一个有趣的值(参见The Context API部分)
3 在本例中,我们使用SLF4J2中的MDCClose,允许在日志语句执行后使用try-withresource语法自动清理MDC。
4 正确的日志声明由调用者 Consumer<T>(下一个值的消费者)提供
5 如果预期的键没有设置在上下文中,我们使用替代路径,其中在MDC中没有任何内容

Using this boilerplate code ensures that we are good citizens with the MDC: we set a key right before we execute a logging statement and remove it immediately after. There is no risk of polluting the MDC for subsequent logging statements.

使用这个样板代码可以确保我们是MDC的好公民:我们在执行日志记录语句之前设置了一个键,并在之后立即删除它。 没有污染MDC的风险,以便随后的日志记录。

Of course, this is a suggestion. You might be interested in extracting multiple values from the Context or in logging things in case of onError. You might want to create additional helper methods for these cases or craft a single method that makes use of additional lambdas to cover more ground.

当然,这是个建议。 您可能有兴趣从上下文中提取多个值,或者在发生错误的情况下记录事物。 您可能希望为这些案例创建额外的辅助方法,或者创建一个使用额外lambdas覆盖更多范围的单一方法。

In any case, the usage of the preceding helper method could look like the following reactive web controller:

在任何情况下,前面的helper方法的使用可能看起来像以下reactive web 控制器:

@GetMapping("/byPrice")
public Flux<Restaurant> byPrice(@RequestParam Double maxPrice, @RequestHeader(required = false, name = "X-UserId") String userId) {
	String apiId = userId == null ? "" : userId; (1)

	return restaurantService.byPrice(maxPrice))
			   .doOnEach(logOnNext(r -> LOG.debug("found restaurant {} for ${}", (2)
					r.getName(), r.getPricePerPerson())))
			   .subscriberContext(Context.of("CONTEXT_KEY", apiId)); (3)
}
1 我们需要从请求头获取上下文信息,以便将其放入上下文中
2 在这里,我们应用我们的助手方法到Flux,使用doOnEach。 记住:操作符看到它们下面定义的上下文值。
3 我们使用所选的键CONTEXT_KEY将值从标题 写入上下文。

In this configuration, the restaurantService can emit its data on a shared thread, yet the logs will still reference the correct X-UserId for each request.

在此配置中,restaurantService`可以在共享线程上发出其数据,但是日志仍然会为每个请求引用正确的`X-UserID

For completeness, we can also see what an error-logging helper could look like:

为了完整起见,我们还可以看到错误记录助手的外观:

public static Consumer<Signal<?>> logOnError(Consumer<Throwable> errorLogStatement) {
	return signal -> {
		if (!signal.isOnError()) return;
		Optional<String> toPutInMdc = signal.getContext().getOrEmpty("CONTEXT_KEY");

		toPutInMdc.ifPresentOrElse(tpim -> {
			try (MDC.MDCCloseable cMdc = MDC.putCloseable("MDC_KEY", tpim)) {
				errorLogStatement.accept(signal.getThrowable());
			}
		},
		() -> errorLogStatement.accept(signal.getThrowable()));
	};
}

Nothing much has changed, except for the fact that we check that the Signal is effectively an onError, and that we provide said error (a Throwable) to the log statement lambda.

没有什么变化,除了我们检查信号实际上是一个`onError`,以及我们向日志声明的lambda提供所说的错误(一个可抛出的)。

Applying this helper in the controller is very similar to what we’ve done before:

在控制器中应用这个助手与我们以前所做的非常相似:

@GetMapping("/byPrice")
public Flux<Restaurant> byPrice(@RequestParam Double maxPrice, @RequestHeader(required = false, name = "X-UserId") String userId) {
	String apiId = userId == null ? "" : userId;

	return restaurantService.byPrice(maxPrice))
			   .doOnEach(logOnNext(v -> LOG.info("found restaurant {}", v))
			   .doOnEach(logOnError(e -> LOG.error("error when searching restaurants", e)) (1)
			   .subscriberContext(Context.of("CONTEXT_KEY", apiId));
}
1 In case the restaurantService emits an error, it will be logged with MDC context here 如果`restaurantService`发出错误,它将在这里使用MDC上下文中记录

Appendix C: Reactor-Extra

The reactor-extra artifact contains additional operators and utilities that are for users of reactor-core with advanced needs.

As this is a separate artifact, you need to explicitly add it to your build. The following example shows how to do so in Gradle:

dependencies {
     compile 'io.projectreactor:reactor-core'
     compile 'io.projectreactor.addons:reactor-extra' (1)
}
1 Add the reactor extra artifact in addition to core. See Getting Reactor for details about why you do not need to specify a version if you use the BOM, usage in Maven, and other details.

C.1. TupleUtils and Functional Interfaces

The reactor.function package contains functional interfaces that complement the Java 8 Function, Predicate, and Consumer interfaces, for three to eight values.

TupleUtils offers static methods that act as a bridge between lambdas of these functional interfaces to a similar interface on the corresponding Tuple.

This lets you easily work with independent parts of any Tuple, as the following example shows:

.map(tuple -> {
  String firstName = tuple.getT1();
  String lastName = tuple.getT2();
  String address = tuple.getT3();

  return new Customer(firstName, lastName, address);
});

You can rewrite the preceding example as follows:

.map(TupleUtils.function(Customer::new)); (1)
1 (as Customer constructor conforms to Consumer3 functional interface signature)

C.2. Math Operators With MathFlux

The reactor.math package contains a MathFlux specialized version of Flux that offers mathematical operators, including max, min, sumInt, averageDouble, and others.

C.3. Repeat and Retry Utilities

The reactor.retry package contains utilities to help in writing Flux#repeatWhen and Flux#retryWhen functions. The entry points are factory methods in the Repeat and Retry interfaces, respectively.

You can use both interfaces as a mutative builder, and they implement the correct Function signature to be used in their counterpart operators.

Since 3.2.0, one of the most advanced retry strategies offered by these utilities is also part of the reactor-core main artifact directly. Exponential backoff is available as the Flux#retryBackoff operator.

C.4. Schedulers

Reactor-extra comes with several specialized schedulers:

  • ForkJoinPoolScheduler (in the reactor.scheduler.forkjoin package): Uses the Java ForkJoinPool to execute tasks.

  • SwingScheduler (in the reactor.swing package): Runs tasks in the Swing UI event loop thread, the EDT.

  • SwtScheduler (in the reactor.swing package): Runs tasks in the SWT UI event loop thread.