Overview:
In this Project reactor series, Lets take a look at switchOnFirst and its usage. If you are new to reactive programming, take a look at these following articles in the given order.
- Reactive Programming – A Simple Introduction
- Reactive Programming – Creating Sequences – Flux vs Mono
- Reactive Programming – Publisher Types – Cold Vs Hot
- Reactive Programming – Flux – Create Vs Generate
- Reactive Programming – Schedulers
- Reactive Programming – Reactor – Combining Multiple Sources Of Flux / Mono
- Reactive Programming – How To Consume / Clean Up Resources With Flux.using
SwitchOnFirst:
This Flux’s method helps us to switch flux pipeline based on the first emitted value. This method will receive 2 parameters.
- First emitted signal
- The Flux
It should return the publisher. It could transform the pipeline if it is required.
Let’s take this simple example.
Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4, 5, 6, 7);
integerFlux
.defaultIfEmpty(100)
.subscribe(System.out::println);
The above code will return 1 to 7.
Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4, 5, 6, 7);
integerFlux
.switchOnFirst((signal, flux) -> signal.get() == 2 ? flux : Flux.empty())
.defaultIfEmpty(100)
.subscribe(System.out::println);
I add the above switchOnfirst, which checks if the first emitted value is 2. If it is 2, then we will receive other elements as well. If not, then I get the default value of 100 as the switchOnFirst cancels the source emission and publishes empty to the downstream.
How else can we use this?
We can completely switch the pipeline based on the first emitted value. For example, our requirement is to check the first number.
- If it is odd number, the pipeline should receive the remaining items as they are.
- If it is even number, we would double each item and we all also filter items which are greater than 10.
To implement the above requirement, We can define 2 methods as shown here.
private Flux<Integer> getOddFlux(Flux<Integer> flux) {
return flux
.map(i -> {
System.out.println("Odd Handler: " + i);
return i;
});
}
private Flux<Integer> getEvenFlux(Flux<Integer> flux) {
return flux
.map(i -> {
System.out.println("Even Handler: " + i);
return i * 2;
})
.filter(i -> i <= 10);
}
Lets test this.
Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4, 5, 6, 7);
integerFlux
.switchOnFirst((signal, flux) ->
signal.get() % 2 == 0 ? getEvenFlux(flux) : getOddFlux(flux)
)
.subscribe(s -> System.out.println("Received Final :: " + s));
I get below output.
Received In Odd : 1
Received Final :: 1
Received In Odd : 2
Received Final :: 2
Received In Odd : 3
Received Final :: 3
Received In Odd : 4
Received Final :: 4
Received In Odd : 5
Received Final :: 5
Received In Odd : 6
Received Final :: 6
Received In Odd : 7
Received Final :: 7
If we change the integerFlux as shown here, then the output as shown below.
Flux<Integer> integerFlux = Flux.just(2, 2, 3, 4, 5, 6, 7);
Output:
Received In even : 2
Received Final :: 4
Received In even : 2
Received Final :: 4
Received In even : 3
Received Final :: 6
Received In even : 4
Received Final :: 8
Received In even : 5
Received Final :: 10
Received In even : 6
Received In even : 7