Overview:
In this short tutorial, I would like to demo Reactor Parallel Flux to process all the items in a reactive pipeline in parallel using Schedulers/Thread pools.
If you are new to this, take a look at the entire set of articles here in the given order.
Reactor Sequential Flux:
Reactive programming is a style of programming in which we have a channel which observes data streams and reacts to the events. By default all the operations in the channel are getting executed for the data being received sequentially in a non-blocking way as shown below.
Flux.range(1, 10)
.map(i -> i + 1)
.map(i -> i * 2)
.map(i -> i + 1)
.subscribe(System.out::println);
That is, the above 3 map operations are applied for the number 1 to 10 sequentially. Sometimes, these operations could be time consuming and on top of that if process all the elements one by one, it might take more time to process. To make use of all the CPU cores available, we might want to process them in parallel for the incoming data. (To be clear – I mean the processing data in parallel. Not the operations themselves).
Reactor Parallel Flux:
Reactor provides a method, parallel(), to process the items in parallel. In this example, instead of processing all the 10 items sequentially, I would like to divide the work between 2 CPU cores. So I create 2 parallel rails by invoking the parallel method.
The parallel method returns ParallelFlux which needs a Scheduler to run. So, If we use parallel, then chain that with runOn method to pass the schedulers. For more info on Schedulers, check this. Do note that to really parallelize, you have to use runOn method along with parallel!
Flux.range(1, 10)
.parallel(2)
.runOn(Schedulers.parallel())
.map(i -> i + 1)
.map(i -> i * 2)
.map(i -> i + 1)
.subscribe(System.out::println);
Note: When we subscribe to the data, we might not be receiving them in emission order as we are processing in parallel.
Groups:
Reactor Parallel Flux can also group the elements based on the thread processing the element!
Lets consider this model.
@Data
@AllArgsConstructor
@ToString
public class ItemGroup {
private int threadIndex;
private Flux<Integer> items;
}
We can collect the elements based on thread index as shown here.
Flux.range(1, 30)
.parallel(5)
.runOn(Schedulers.parallel())
.groups()
.map(gf -> new ItemGroup(gf.key(), gf))
.subscribe(System.out::println);
Output:
ItemGroup(threadIndex=0, items=ParallelInnerGroup)
ItemGroup(threadIndex=1, items=ParallelInnerGroup)
ItemGroup(threadIndex=2, items=ParallelInnerGroup)
ItemGroup(threadIndex=3, items=ParallelInnerGroup)
ItemGroup(threadIndex=4, items=ParallelInnerGroup)
Here the items are individual Flux which can be subscribed to consume the elements.
Flux.range(1, 30)
.parallel(5)
.runOn(Schedulers.parallel())
.groups()
.map(gf -> new ItemGroup(gf.key(), gf))
.flatMap(ig -> ig.getItems().collectList())
.subscribe(System.out::println);
Output:
[5, 10, 15, 20, 25, 30]
[3, 8, 13, 18, 23, 28]
[4, 9, 14, 19, 24, 29]
[1, 6, 11, 16, 21, 26]
[2, 7, 12, 17, 22, 27]
Reactor Parallel Flux to Sequential Flux:
Once we have created parallel rails to process the elements and if we need to merge them all together as a single Flux, we can use sequential as shown below.
Flux.range(1, 10)
.parallel(5)
.runOn(Schedulers.parallel())
.map(i -> i * 2)
.sequential()
.publishOn(Schedulers.single())
.map(i -> i + 2)
.subscribe(System.out::println);
Summary:
We were able to successfully demonstrate Reactor Parallel Flux behavior.
Read more about Reactive Programming.
- Java Reactive Programming – How To Consume / Clean Up Resources With Flux.using
- Reactor limitRate Example
Happy coding 🙂