07 July 2020

What is Reactor?

The purpose of Reactor, and reactive steams in general is to enable operations on large amounts of data to be broken down and executed on many different threads (multi-threading) in the most efficient, scalable, and fast way possible. Although parallel processing can be achieved simply using Java 8’s parallel stream, reactive streams add a plethora of additional functionality and customization such as error handling, retry, caching and replaying streams, handling backpressure, and more.

You can think of a reactive stream as having three rails, the data rail, the completion rail (whether or not the stream has completed), and the error rail. Also, each of the rails can be converted into the other: complete streams could be replaced, an operation could throw an Exception, or an Exception could be handled and replaced with more data.

In addition, Reactor adds the concept of Context, which we will explore later on.

Getting Started

If you have a Maven build, add the following to your pom file:

<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-core</artifactId>
  <version>3.3.5.RELEASE</version>
</dependency>
<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-test</artifactId>
  <version>3.3.5.RELEASE</version>
  <scope>test</scope>
</dependency>

For Gradle builds, add the following to your Gradle build file’s dependencies:

implementation 'io.projectreactor:reactor-core:3.3.5.RELEASE'
testImplementation 'io.projectreactor:reactor-test:3.3.5.RELEASE'

Creating a Flux or Mono

You can create a Flux from fixed data (cold) or programmatically from dynamic data (hot).

The following are some different way to create a cold Flux:

Flux<String> flux1 = Flux.just("a", "b", "foobar"); //1

List<String> iterable = Arrays.asList("a", "b", "foobar");

Flux<String> flux2 = Flux.fromIterable(iterable); //2

Flux<Integer> numbers = Flux.range(1, 64); //3
  1. Create a Flux from a list of values.

  2. Create a Flux from an iterable.

  3. Create a range from 1 to 64.

You can create a simple Mono that is empty or has just one element like the following:

Mono<String> noData = Mono.empty(); //1

Mono<String> data = Mono.just("foo"); //2

Mono<String> monoError = Mono.error(new RuntimeException("error")); //3
  1. Create an empty Mono.

  2. Create a Mono with one element.

  3. Create a Mono wrapping a RuntimeException.

You can also programmatically create a (hot) Flux using one of the generate, create, or push methods.

Tuples and Zip

Tuples are strongly typed collections of two or more elements and Reactor comes with them built in. Some operations such as zipWith, return reactive streams of Tuples.

Flux has an instance method zipWith(Publisher<? extends T2> source2) which has a return type of Flux<Tuple2<T,T2>>. It waits for both Fluxes (the initial flux and source2) to emit an element and then combines the two into a Tuple. There’s also static method Flux.zip which is overloaded to take from two to eight Publishers and zip them together.

Zipping is useful when you want to perform multiple operations that return reactive results (Flux or Mono).

Mono has two main flavors of zipping (which both have a return type of Mono<Tuple2<T,T2>>):

  • zipWith(Mono<? extends T2> other) – Zips the current stream with another stream, giving the combination of each corresponding element in the form of Tuple2.

  • zipWhen(Function<T,Mono<? extends T2>> rightGenerator) – Zips the current Mono with another Mono, giving the combination of each corresponding element in the form of Tuple2, but only after the first stream’s operation has completed.

For example, given you have two methods which perform asynchronous operations Mono<Course> getCourse(Long id) and Mono<Integer> getStudentCount(Course course), imagine you want to get the student count from the course Id you could do the following:

Mono<Integer> getStudentCount(Long id) {
return getCourse(id)
.zipWhen(course -> getStudentCount(course))
.map(tuple2 -> tuple2.getT2());
}

This is a simple example but you can imagine combining two different entities, or performing logic on them before returning, or calling another method which takes two arguments, and so on.

Reactor Addons

Project Reactor provides additional functionality under the io.projectreactor.addons groupId. Reactor extra includes additional math functions, different ways to retry including Jitter and Backoff, and TupleUtils.

<dependency>
  <groupId>io.projectreactor.addons</groupId>
  <artifactId>reactor-extra</artifactId>
  <version>3.3.3.RELEASE</version>
</dependency>

For Gradle builds, add the following to your Gradle build file’s dependencies:

implementation 'io.projectreactor.addons:reactor-extra:3.3.3.RELEASE'

When your application fails at an integration point, such as when calling another RESTful service, to make your overall system reliable you might want to retry the call some number of times. However, to keep from overloading the failing service, you should employ Backoff, or increasing the time between each retry, and Jitter, randomly modifying the time so that the retries from many different instances do not happen at the same time (correlate). For example take a look at the following code:

var retry = Retry.anyOf(IOException.class) \\1
 .exponentialBackoff(Duration.ofMillis(100), \\2
    Duration.ofSeconds(60))
 .jitter(Jitter.random()) \\3
 .retryMax(5)
 .withApplicationContext(appContext) \\4
 .doOnRetry(context ->
    context.applicationContext().rollback()); return flux.retryWhen(retry); \\5
  1. We create the Retry with exception value of IOException, meaning it will retry only when that exception is thrown.

  2. We define exponential backoff with a starting value of 100 ms and maximum value of 60 seconds.

  3. We add random Jitter and set the retry max to 5, meaning it retry at most 5 times.

  4. We add the Spring ApplicationContext and use it to apply rollback after each failure.

  5. Finally we call retryWhen(retry) on a Flux instance to apply the Retry to that Flux.

For more information on retries, backoff, and jitter see this excellent article from Amazon’s builder library.