reactor hot publisher

Reactor Repeat vs Retry

Overview:

In the reactive programming series with project reactor, I would like to show you the difference between Reactor Repeat vs Reactor Retry.

If you are new to reactive programming or project reactor, take a look at this entire series to get a good idea on that.

Need For Reactor Repeat & Retry:

As we know already, reactive programming is a style of programming which observes on the data streams, reacting to the changes and propagating them! The data stream will be closed when there is no more data for source to emit or when there is an unhandled exception!

Sometimes, we might want not to stop the process and we might have to keep that going forever!

Reactor Method Usage
repeat re-start the reactive pipeline / re-subscribes to the source when the source sends the complete signal.
retry re-start the reactive pipeline / re-subscribes to the source when there is an error signal.

Reactor Retry:

As we just saw above, Reactor retry resubscribes to the source if we get any error signal. For example, assume that one microservice tries to send a request to another microservice and expect a response. Due to some network issue, we get the request timed out and it throws an exception. To make the system more resilient, reactor can absorb this error and retry the requests.

To play with this retry feature, Let’s take a very simple example. Here we have our source which emits number from 1 to 10. Unfortunately our pipeline can not process numbers beyond 5. It abruptly stops when we get > 5.

Flux.range(1, 10)
    .doOnNext(i -> System.out.println("Emitted :: " + i))
    .map(i -> {
        if(i > 5)
            throw new RuntimeException("Can not process > 5");
        return i;
    })
    .subscribe(i -> System.out.println("Received :: " + i),
            err -> System.out.println("Error :: " + err));

Output:

Emitted :: 1
Received :: 1
Emitted :: 2
Received :: 2
Emitted :: 3
Received :: 3
Emitted :: 4
Received :: 4
Emitted :: 5
Received :: 5
Emitted :: 6
Error :: java.lang.RuntimeException: Can not process > 5
  • retry(N) – to retry N times whenever there is an error
Flux.range(1, 10)
    .doOnNext(i -> System.out.println("Emitted :: " + i))
    .map(i -> {
        if(i > 5)
            throw new RuntimeException("Can not process > 5");
        return i;
    })
    .retry(1)
    .subscribe(i -> System.out.println("Received :: " + i),
            err -> System.out.println("Error :: " + err));

When we add the retry(1) and get the error signal, we resubscribe to the source. Our source tries to emit 1 to 10 once again for every subscription.  After 1 retry attempt, then the error is thrown as usual.

Emitted :: 1
Received :: 1
Emitted :: 2
Received :: 2
Emitted :: 3
Received :: 3
Emitted :: 4
Received :: 4
Emitted :: 5
Received :: 5
Emitted :: 6  // will retry now
Emitted :: 1
Received :: 1
Emitted :: 2
Received :: 2
Emitted :: 3
Received :: 3
Emitted :: 4
Received :: 4
Emitted :: 5
Received :: 5
Emitted :: 6
Error :: java.lang.RuntimeException: Can not process > 5
  • retry() – to retry indefinitely
  • retryWhen(….) – we can use retryWhen for more advanced options.
    • To retry 2 times, each attempt with delay of 3 seconds.
.retryWhen(Retry.fixedDelay(2, Duration.ofSeconds(3)))
.retryWhen(Retry.backoff(5, Duration.ofSeconds(1)))
  • To retry indefinitely with retryWhen
.retryWhen(Retry.indefinitely())
  • maxInARow(N) – will be useful we want to try until continuously we get N errors.
    • We modify the code not to process any number which is > 0
Flux.range(1, 10)
    .doOnNext(i -> System.out.println("Emitted :: " + i))
    .map(i -> {
        if(i > 0)
            throw new RuntimeException("Can not process > 0");
        return i;
    })
    .retryWhen(Retry.maxInARow(5))
    .subscribe(i -> System.out.println("Received :: " + i),
            err -> System.out.println("Error :: " + err));

When the flux emits 1, we get the error and we retry. The flux emits 1 again because of the new subscription. This will throw error once again. It will repeat!  We allow up to 5 attempts and then we stop.

Emitted :: 1
Emitted :: 1
Emitted :: 1
Emitted :: 1
Emitted :: 1
Emitted :: 1
Error :: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 5/5 in a row (5 total)
  • companion source – Sometimes we need to retry only when things are required. In that case, we can use companion source function.
// some source
DirectProcessor<Object> processor = DirectProcessor.create();

// flux pipeline
Flux.range(1, 10)
        .doOnNext(i -> System.out.println("Emitted :: " + i))
        .map(i -> {
            if(i > 0)
                throw new RuntimeException("Can not process > 5");
            return i;
        })
        .retryWhen(Retry.from(s -> processor)) // companion source
        .subscribe(i -> System.out.println("Received :: " + i),
                err -> System.out.println("Error :: " + err));

// somewhere else
FluxSink<Object> sink = processor.sink();
for (int i = 0; i < 3; i++) {
    sleep(Duration.ofSeconds(3));
    System.out.println("Sink emission :: " + i);
    sink.next(i);  // every emission here will make the flux above emit the element
}

Output:

Emitted :: 1
Sink emission :: 0
Emitted :: 1
Sink emission :: 1
Emitted :: 1
Sink emission :: 2
Emitted :: 1

Reactor Repeat:

Repeat is kind of retry – but it gets triggered when the source sends the complete signal.

Flux.range(1, 3)
    .doOnNext(i -> System.out.println("Emitted :: " + i))
    .subscribe(i -> System.out.println("Received :: " + i));

This code produces below output and pipeline is closed as there is no more data to process.

Output:

Emitted :: 1
Received :: 1
Emitted :: 2
Received :: 2
Emitted :: 3
Received :: 3
  • repeat(N) – this will resubscribe to the source N times whenever we get the complete signal.
Flux.range(1, 3)
    .doOnNext(i -> System.out.println("Emitted :: " + i))
    .repeat(1)
    .subscribe(i -> System.out.println("Received :: " + i));
  • repeat(BooleanSupplier) – We can repeat only when required by passing a boolean supplier.
Flux.range(1, 3)
    .doOnNext(i -> System.out.println("Emitted :: " + i))
    .repeat(() -> true)
    .subscribe(i -> System.out.println("Received :: " + i));
  • repeat with companion source
    • This flux first emits 3 elements. After 4 seconds of sleep, as the companion source emitted an element, our flux will emit 3 elements once again.
    • This behavior will stop when the companion source stops emitting the elements.
// some source
DirectProcessor<Object> processor = DirectProcessor.create();

// flux with companion repeat
Flux.range(1, 3)
        .doOnNext(i -> System.out.println("Emitted :: " + i))
        .repeatWhen(i -> processor)
        .subscribe(i -> System.out.println("Received :: " + i));

// random sleep
sleep(Duration.ofSeconds(4));

FluxSink<Object> sink = processor.sink();
sink.next(1); // this will make the above flux repeat

The companion source does not have to be an external source. We can also decide whether to continue or not by checking the last emitted item.

Flux.range(1, 3)
    .doOnNext(i -> System.out.println("Emitted :: " + i))
    .repeatWhen(flux -> flux.handle((lastEmitted, sink) -> {
        if(lastEmitted > 3)
            sink.complete();
        else
            sink.next(1);
    }))
    .subscribe(i -> System.out.println("Received :: " + i));

Summary:

We were able to successfully demonstrate the difference between Reactor repeat vs Reactor retry and when to use what.

Learn more about Reactor / Java reactive programming.

Happy learning 🙂

Share This:

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.