Overview:
In our Java Reactive Programming With Project Reactor series, in this article, Let’s discuss Reactor Schedulers & the difference between PublishOn vs SubscribeOn with some code samples.
This is 5th article in the Reactive Programming series. Take a look at these articles if you have not read them before
- Reactive Programming – A Simple Introduction
- Reactive Programming – Creating Sequences – Flux vs Mono
- Reactive Programming – Publisher Types – Cold Vs Hot
- Reactive Programming – Flux – Create Vs Generate
Reactor Schedulers:
Reactive Streams provide a standard for asynchronous stream processing. We achieve asynchronous/non-blocking behavior by scheduling tasks on worker threads. Creating and managing threads ourselves is not an easy task. Project Reactor provides below convenient factory methods to use workers threads via Schedulers class.
PublishOn vs SubscribeOn:
publishOn & subscribeOn are convenient methods in Project Reactor which accepts any of the above Schedulers to change the task execution context for the operations in a reactive pipeline. While subscribeOn forces the source emission to use specific Schedulers, publishOn changes Schedulers for all the downstream operations in the pipeline as shown below.
A Simple Publisher:
Flux<Integer> flux = Flux.range(0, 2)
.map(i -> {
System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
});
Here the map function simply returns the received value. We just wanted to do some logging to see the thread name which is doing the map operation. In the below runnable implementation, we simply subscribe to the above flux! Do note that in the reactive programming world, nothing happens until you subscribe!
//create a runnable with flux subscription
Runnable r = () -> flux.subscribe(s -> {
System.out.println("Received " + s + " via " + Thread.currentThread().getName());
});
Create 2 threads named t1 and t2 to execute the above runnable.
Thread t1 = new Thread(r, "t1");
Thread t2 = new Thread(r, "t2");
//lets start the threads. (this is when we are subscribing to the flux)
System.out.println("Program thread :: " + Thread.currentThread().getName());
t1.start();
t2.start();
Output:
Program thread :: main
Mapping for 0 is done by thread t1
Mapping for 0 is done by thread t2
Received 0 via t1
Received 0 via t2
Mapping for 1 is done by thread t1
Mapping for 1 is done by thread t2
Received 1 via t1
Received 1 via t2
If you notice the above output carefully, for each subscription (cold subscription), our Publisher publishes values. The map and consumption operations are getting executed in the respective threads where subscription happens. This is the default behavior.
However Reactor provides an easy way to switch the task execution in the reactive chain using below methods.
- publishOn
- subscribeOn
PublishOn:
publishOn accepts a Scheduler which changes the task execution context for the operations in the downstream. (for all the operations or until another publishOn switches the context in the chain). Lets take a look at the below examples.
-
Scheulers.immediate():
To keep the execution in the current thread.
Flux<Integer> flux = Flux.range(0, 2)
.publishOn(Schedulers.immediate())
.map(i -> {
System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
});
The output will still be same! Here Schedulers.immediate() keeps the execution in the current thread. Here the current thread is not main. The thread which called the subscribe method.
-
Schedulers.single():
A single reusable thread. When we use this thread, all the operations of the reactive chain are executed using this thread by all the callers.
Flux<Integer> flux = Flux.range(0, 2)
.publishOn(Schedulers.single())
.map(i -> {
System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
});
Output:
Program thread :: main
Mapping for 0 is done by thread single-1
Received 0 via single-1
Mapping for 1 is done by thread single-1
Received 1 via single-1
Mapping for 0 is done by thread single-1
Received 0 via single-1
Mapping for 1 is done by thread single-1
Received 1 via single-1
Check the output. Both subscriptions done by t1 and t2 are executed via a single thread.
-
Schedulers.newSingle():
Same as above. But a dedicated single thread just for the caller.
Flux<Integer> flux = Flux.range(0, 2)
.publishOn(Schedulers.newSingle("vinsguru"));
Output:
Program thread :: main
Mapping for 0 is done by thread vinsguru-1
Received 0 via vinsguru-1
Mapping for 1 is done by thread vinsguru-1
Received 1 via vinsguru-1
Mapping for 0 is done by thread vinsguru-1
Received 0 via vinsguru-1
Mapping for 1 is done by thread vinsguru-1
Received 1 via vinsguru-1
-
Schedulers.elastic():
This is a thread pool with unlimited threads which is no longer preferred. So DO NOT USE this option.
-
Schedulers.boundedElastic():
This is a preferred one instead of above elastic. This thread pool contains 10 * number of CPU cores you have. Good choice for IO operations or any blocking call.
Flux<Integer> flux = Flux.range(0, 2)
.publishOn(Schedulers.boundedElastic());
Output:
Program thread :: main
Mapping for 0 is done by thread boundedElastic-1
Mapping for 0 is done by thread boundedElastic-2
Received 0 via boundedElastic-1
Mapping for 1 is done by thread boundedElastic-1
Received 1 via boundedElastic-1
Received 0 via boundedElastic-2
Mapping for 1 is done by thread boundedElastic-2
Received 1 via boundedElastic-2
-
Schedulers.parallel():
A fixed pool of workers that is tuned for parallel work. It creates as many workers as you have CPU cores. Should be used for any CPU operation. Not for IO or blocking calls.
Flux<Integer> flux = Flux.range(0, 2)
.publishOn(Schedulers.parallel());
Output:
Program thread :: main
Mapping for 0 is done by thread parallel-1
Mapping for 0 is done by thread parallel-2
Received 0 via parallel-1
Mapping for 1 is done by thread parallel-1
Received 0 via parallel-2
Received 1 via parallel-1
Mapping for 1 is done by thread parallel-2
Received 1 via parallel-2
If you do not want to use these dedicated parallel / boundedElastic pools, Schedulers has convenient methods to create a new parallel and boundedElastic thread pools using below methods
- newParallel()
- newBoundedElastic()
Multiple PublishOn Methods:
Flux<Integer> flux = Flux.range(0, 2)
.map(i -> {
System.out.println("Mapping one for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.boundedElastic())
.map(i -> {
System.out.println("Mapping two for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.parallel())
.map(i -> {
System.out.println("Mapping three for " + i + " is done by thread " + Thread.currentThread().getName());
return i;
});
Output:
Program thread :: main
Mapping one for 0 is done by thread t2
Mapping one for 0 is done by thread t1
Mapping one for 1 is done by thread t2
Mapping one for 1 is done by thread t1
Mapping two for 0 is done by thread boundedElastic-2
Mapping two for 0 is done by thread boundedElastic-1
Mapping two for 1 is done by thread boundedElastic-2
Mapping two for 1 is done by thread boundedElastic-1
Mapping three for 0 is done by thread parallel-1
Received 0 via parallel-1
Mapping three for 1 is done by thread parallel-1
Received 1 via parallel-1
Mapping three for 0 is done by thread parallel-2
Received 0 via parallel-2
Mapping three for 1 is done by thread parallel-2
Received 1 via parallel-2
Check the above output.
- Here the very first map is executed on the subscription thread
- The second map is executed on the bounded Elastic
- The third map is executed on the parallel thread pool
- As there is no more publish on method, even the last consumption method is executed on the parallel thread pool
SubscribeOn:
subscribeOn method affects the context of the source emission. That is, as we had said earlier, nothing happens in the reactive chain until we subscribe! Once subscribed, the pipeline is getting executed by default on the thread which subscribed. When the publishOn method is encountered, it switches the context for the downstream operations. But the source which is the Flux / Mono / or any publisher, is always executed on the current thread which subscribed. This subscribeOn method will change the behavior.
Runnable r = () -> flux
.subscribeOn(Schedulers.single())
.subscribe(s -> {
System.out.println("Received " + s + " via " + Thread.currentThread().getName());
});
Output:
Program thread :: main
Mapping one for 0 is done by thread single-1
Mapping one for 1 is done by thread single-1
Mapping two for 0 is done by thread boundedElastic-1
Mapping two for 1 is done by thread boundedElastic-1
Mapping one for 0 is done by thread single-1
Mapping three for 0 is done by thread parallel-1
Received 0 via parallel-1
Mapping three for 1 is done by thread parallel-1
Received 1 via parallel-1
Mapping one for 1 is done by thread single-1
Mapping two for 0 is done by thread boundedElastic-2
Mapping two for 1 is done by thread boundedElastic-2
Mapping three for 0 is done by thread parallel-2
Received 0 via parallel-2
Mapping three for 1 is done by thread parallel-2
Received 1 via parallel-2
From the output, we can clearly see that we can control which thread pool should be used for the source emission. Do note that We can have multiple publishOn methods which will keep switching the context. However the subscribeOn method can not do like that. Only the very first subscribeOn method which is close to the source takes precedence.
Summary:
We were able to successfully demonstrate the difference between PublishOn vs SubscribeOn by using various Reactor Schedulers.
Learn more about Java Reactive Programming.
- Java Reactive Programming – Combining Multiple Sources Of Flux / Mono
- Java Reactive Programming – How To Consume / Clean Up Resources With Flux.using
Happy learning 🙂
I have been trying to understand and this is the first article I found which is very to the point and helpful. Thank you!