29 July 2020

In the previous post we introduced Reactor and covered why reactor is important and some of the basics. We talked about Flux, Mono, zipping, Tuples, and retry. However, one of the most important aspects of reactive streams, is the ability to handle backpressure.

Backpressure is a mechanism for explicitly handling the problem of having too many items to process in real time. Without such a mechanism, you typically see the problem of services timing out and eventually the user sees either a very slow or unresponsive system. This is compounded by the fact that users will give up quickly if a system is slow to respond.

Many other techniques for handling concurrency do not have explicit mechanisms for handling backpressure. Instead, system engineers and architects must hope to scale the system either horizontally (more) servers, or vertically (a faster CPU/more cores/more memory).

Handling Backpressure in Reactor

Reactor, like all implementations of Reactive Streams, has the ability to handle backpressure. Use one of the following methods on a Flux (or others not listed) to specify which backpressure strategy you want to use:

  • onBackpressureBuffer(): Buffers all items until they can be handled downstream.

  • onBackpressureBuffer(maxSize): Buffers items up to the given count.

  • onBackpressureBuffer(maxSize, BufferOverflowStrategy): Buffers items up to the given count and allows you to specify the BufferOverflowStrategy, such as onBackpressureBuffer(100, BufferOverflowStrategy.DROP_OLDEST)

  • onBackpressureLatest(): Similar to keeping a buffer of only the last item added. If the downstream does not keep up with upstream, only the latest element will be given downstream.

  • onBackpressureError(): Ends the Flux with an error (calling the downstream Subscriber’s onError) with an IllegalStateException from Exceptions.failWithOverflow() if more items were produced upstream than requested downstream.

  • onBackpressureDrop(): Drops any items produced above what was requested.

  • onBackpressureDrop(Consumer): Drops any items produced above what was requested and calls the given Consumer for each dropped item.

With each of these methods, the strategy only applies when the stream produces items faster than they can be handled. If that’s not the case, for example with a cold stream, no backpressure strategy is necessary.

Also, keep in mind that Reactor is not magic and some care should be taken when considering backpressure strategies. For example, if each item in the stream is critical, do not use onBackpressureDrop(). If you use onBackpressureError(), you will cause an Exception to be thrown when there are too many items, so use this with extreme caution.

If you use any backpressure strategy, you should consider writing a test to validate that your whole system will work as expected. One way you could do this is to simulate a very slow downstream by using Thread.sleep(1000) for example.

Luckily, Project Reactor supplies us with some great testing features.

Testing

Automated testing is always a good idea, and it would be nice to have tools to directly test Reactive Streams. Luckily, Reactor comes with a few elements dedicated to testing which are gathered into their own artifact we already included: reactor-test. The two main uses of reactor-test are the following:

  • Testing a sequence follows a given scenario with StepVerifier.

  • Producing data to test the behavior of operators (including you own operators) downstream with TestPublisher.

StepVerifier

Reactor’s StepVerifier can be used to verify the behavior of a Reactor Publisher (Flux or Mono).

Here’s a simple example of a Junit test utilizing StepVerifier:

@Test
public void testStepVerifier_Mono_error() {
  Mono<String> monoError = Mono.error(new RuntimeException("error")); //1
  StepVerifier.create(monoError) //2
    .expectErrorMessage("error") //3
    .verify(); //4
}
  1. Create a Mono wrapping a RuntimeException imitating an actual error state.

  2. Create a StepVerifier wrapping that Mono.

  3. Declare that an onError event is expected and the Exception’s error message is “error”.

  4. We call verify() at the end. This will throw an AssertionError if any expectations are not met.

Next, we’ll create a Mono of just one string and verify it:

@Test public void testStepVerifier_Mono_foo() {
Mono<String> foo = Mono.just(“foo”); //1
StepVerifier.create(foo)             //2
.expectNext(“foo”)                 //3
.verifyComplete();                 //4
}
  1. Create a Mono wrapping one value, “foo”.

  2. Create a StepVerifier wrapping that Mono.

  3. Expect onNext is called with “foo”.

  4. Call verifyComplete() has the same effect as verify() but also expects onComplete was called.

Verifying Backpressure

Next we’ll test a Flux with four values and verify any additional values are dropped using backpressure handling.

@Test
public void testStepVerifier_Flux_backpressure() {
    Flux<Integer> source = Flux.<Integer>create(emitter -> { //1
        emitter.next(1);
        emitter.next(2);
        emitter.next(3);
        emitter.next(4);
        emitter.complete();
    }).onBackpressureDrop();                            //2

    StepVerifier.withVirtualTime(() -> source, 3)      //3
            .expectNext(1)
            .expectNext(2)
            .expectNext(3)
            .expectComplete()                       //4
            .verifyThenAssertThat()
            .tookLessThan(Duration.ofMillis(50));   //5
}
  1. Create a Flux of just four numbers using an emitter (this is a "hot" flux meaning it is time sensitive).

  2. Using onBackpressureDrop to drop any items that aren’t handled fast enough.

  3. Create a StepVerifier with the Flux using withVirtualTime and requesting only 3 items. Then call expectNext for each value expected.

  4. Then call expectComplete thus verifying that the backpressure logic worked!

  5. Finally, we verify that the Flux is complete and the whole process took less than a 50 milliseconds.

The main point of using withVirtualTime here is to specify that we only request three elements. This emulates the real-world experience that would happen if a down-stream (Subscriber) could not keep up with the upstream (Publisher), since the down-stream is what causes items to be requested in Reactive Streams.

Although this example only uses 1,2,3,4 you can imagine the stream (Flux) could be composed of any objects.

Virtual time (with StepVerifier) can also be useful for testing things like the Flux.interval.

TestPublisher

The TestPublisher<T> class offers the ability to provide finely tuned data for test purposes. TestPublisher<T> is a reactive-streams Publisher<T> but can be converted to either a Flux or Mono.

TextPublisher has the following methods:

  • next(T) and next(T, T…​) : Triggers 1-n onNext signals.

  • emit(T…​) : Does the same as next and terminates with an onComplete signal.

  • complete() : Terminates with an onComplete signal.

  • error(Throwable) : Terminates with an onError signal.

The following demonstrates how you might use TestPublisher<T>:

TestPublisher<Object> publisher = TestPublisher.create(); //1
Flux<Object> stringFlux = publisher.flux();               //2
List list = new ArrayList();                              //3
stringFlux.subscribe(next -> list.add(next),
                     ex -> ex.printStackTrace());         //4
publisher.emit("foo", "bar");                             //5
assertEquals(2, list.size());                             //6
assertEquals("foo", list.get(0));
assertEquals("bar", list.get(1));
  1. Create the TestPublisher instance.

  2. Convert it to a Flux.

  3. Create a new List. For test purposes we will use this list to collect values from the publisher.

  4. Subscribe to the publisher using two lambda expressions for onNext and onError. This will add each value emitted from the publisher to the list.

  5. Finally, emit the values “foo” and “bar” from the TestPublisher.

  6. Assert the list’s size is two as expected.

Note that you must subscribe to the TestPublisher (which is done by subscribing to the stringFlux in the above example) before emitting any values.

Coming Next…​

In this article I’ve shown how Reactor can be used to handle backpressure and how to test it. In my next article, we’ll look into how Reactor integrates with the whole Spring ecosystem - especially with WebFlux and WebClient.