Overview:
In this tutorial, I would like to show the difference between the Reactor Hot Publisher vs Cold Publisher.
If you are new to Java Reactive programming, you can take a look at these articles to give you an idea.
Reactor Hot Publisher vs Cold Publisher:
We have 2 different types of implementations for the Publisher interface.
- Flux
- Mono
Both emit elements asynchronously. While Flux can emit 0 . . . N elements, Mono can emit 0 or 1 element.
Based on the their emission behavior, We can categorize the publishers into 2 types.
- Hot
- Cold
Reactor Cold Publisher:
Imagine Netflix / YouTube. Netflix does not stream movie on its own when nobody is watching. If you login to the Netflix site and wanted to watch your favorite movie, you can start watching it anytime. You and me are 2 different Netflix subscribers. We might watch the same movie from different locations. It is 2 different streaming. I could have been watching the movie for an hour and you could have just started watching the movie. Netflix is like a Cold publisher here.
Publishers by default do not produce any value unless at least 1 observer subscribes to it. Publishers create new data producers for each new subscription.
Lets take a look at this code. We have a simple method which returns an integer.
private int getDataToBePublished(){
System.out.println("getDataToBePublished was called");
return 1;
}
Lets assume that we would be emitting the element using Mono as shown here.
Mono.fromSupplier(() -> getDataToBePublished());
Now what does the above code do? The above code would not print any value as there is nobody to observe it. We need at least 1 observer. The below code produces the below output as we have 1 observer.
Mono.fromSupplier(() -> getDataToBePublished())
.subscribe(i -> System.out.println("Observer-1 :: " + i));
//Output
getDataToBePublished was called
Observer-1 :: 1
Lets consider this method which is going to stream a movie for us.
private Stream<String> getMovie(){
System.out.println("Got the movie streaming request");
return Stream.of(
"scene 1",
"scene 2",
"scene 3",
"scene 4",
"scene 5"
);
}
Out NetFlux app implementation is as shown here.
//our NetFlux streamer
//each scene will play for 2 seconds
Flux<String> netFlux = Flux.fromStream(() -> getMovie())
.delayElements(Duration.ofSeconds(2));
// you start watching the movie
netFlux.subscribe(scene -> System.out.println("You are watching " + scene));
//I join after sometime
Thread.sleep(5000);
netFlux.subscribe(scene -> System.out.println("Vinsguru is watching " + scene));
Output:
Got the movie streaming request
You are watching scene 1
You are watching scene 2
Got the movie streaming request
You are watching scene 3
Vinsguru is watching scene 1
You are watching scene 4
Vinsguru is watching scene 2
You are watching scene 5
Vinsguru is watching scene 3
Vinsguru is watching scene 4
Vinsguru is watching scene 5
Here from the output, we can understand that each new subscription triggered the getMovie request. That is why we see the ‘Got the movie streaming request‘ twice in the output. You started your movie request first. Even before I start watching the movie, you had already finished 2 scenes in that movie. You and me watch the same movie in parallel – but different scenes. I did not lose any scene/element just because I joined late. Our NetFlux server accepted my request and started streaming the scenes/elements freshly just for me.
Reactor Hot Publisher:
Imagine a Movie theater / Radio station. It does not matter if people are really listening to the radio. They will be always streaming songs/news. Listeners can observe anytime they want. But all listeners get they same info at any given moment. They get the news/songs whatever being broadcast in that moment. Observers lose the information if they joined late. Hot Publishers are like this Radio Station.
Hot Publishers do not create new data producer for each new subscription (as the Cold Publisher does). Instead there will be only one data producer and all the observers listen to the data produced by the single data producer. So all the observers get the same data.
Lets take a look at couple of methods which makes cold source into hot.
Share
- Case 1:
Check the below code. Here we just added a ‘share‘ method in our Flux to make the Netflux server into a Movie theater. Share turns the Cold source into Hot by multi casting the emitted data to multiple subscribers.
//our movie theatre
//each scene will play for 2 seconds
Flux<String> movieTheatre = Flux.fromStream(() -> getMovie())
.delayElements(Duration.ofSeconds(2)).share();
// you start watching the movie
movieTheatre.subscribe(scene -> System.out.println("You are watching " + scene));
//I join after sometime
Thread.sleep(5000);
movieTheatre.subscribe(scene -> System.out.println("Vinsguru is watching " + scene));
Output:
Got the movie streaming request
You are watching scene 1
You are watching scene 2
You are watching scene 3
Vinsguru is watching scene 3
You are watching scene 4
Vinsguru is watching scene 4
You are watching scene 5
Vinsguru is watching scene 5
From the output, I lost the first 2 scenes in that movie as I joined late. However I am able to watch latest scene played in the theater along with others.
- Case 2:
Consider this example using share method. But here when the second subscriber joins, the source has already emitted data & completed. So the second subscription repeats the emission process. Imagine this like a same movie theater example with subsequent show once a first show is completed.
//our movie theatre
//each scene will play for 2 seconds
Flux<String> movieTheatre = Flux.fromStream(() -> getMovie())
.delayElements(Duration.ofSeconds(2)).share();
// you start watching the movie
movieTheatre.subscribe(scene -> System.out.println("You are watching " + scene));
//I join after the source is completed
Thread.sleep(12000);
movieTheatre.subscribe(scene -> System.out.println("Vinsguru is watching " + scene));
Output:
Got the movie streaming request
You are watching scene 1
You are watching scene 2
You are watching scene 3
You are watching scene 4
You are watching scene 5
Got the movie streaming request
Vinsguru is watching scene 1
Vinsguru is watching scene 2
Vinsguru is watching scene 3
Vinsguru is watching scene 4
Vinsguru is watching scene 5
Cache:
- Case 1:
In the previous example, if we do not want to repeat, use cache method. Cache method caches the history and multi casts to multiple subscribers. In the below example, if you notice the output, we are NOT making movie streaming request second time. However, second subscriber is able to watch all the scenes from the beginning as it has cached all the items for future subscribers.
//our movie theatre
//each scene will play for 2 seconds
Flux<String> movieTheatre = Flux.fromStream(() -> getMovie())
.delayElements(Duration.ofSeconds(2)).cache();
// you start watching the movie
movieTheatre.subscribe(scene -> System.out.println("You are watching " + scene));
//I join after the source is completed
Thread.sleep(12000);
movieTheatre.subscribe(scene -> System.out.println("Vinsguru is watching " + scene));
Output:
Got the movie streaming request
You are watching scene 1
You are watching scene 2
You are watching scene 3
You are watching scene 4
You are watching scene 5
Vinsguru is watching scene 1
Vinsguru is watching scene 2
Vinsguru is watching scene 3
Vinsguru is watching scene 4
Vinsguru is watching scene 5
- Case 2:
If you do not like caching all the items, use cache(0).
Output:
Got the movie streaming request
You are watching scene 1
You are watching scene 2
You are watching scene 3
You are watching scene 4
You are watching scene 5
Reactor Hot Publisher vs Cold Publisher:
Summary:
Hopefully this article gave you an idea about the difference between the Reactor Hot Publisher vs Cold Publisher in Project Reactor, converting a Cold stream into a Hot stream etc with some samples. Each publisher type has its own advantages and we also discussed when to use what.
Read more about Project Reactor / Java Reactive Programming.
Happy learning 🙂
Reference: Reactor 3 Reference guide
Great explanation! Keep up your good work!!!
Hi Vinod,
Thanks for the article. From the below code I observe cold behaviour though flux is converted to hot publisher. Could you please explain
final Flux flux = Flux.fromIterable(words).share();
flux.subscribe((String x) -> System.out.println(“First ” + x));
Thread.sleep(5000);
flux.subscribe((String x) -> System.out.println("Second " + x));
Thread.sleep(5000);
Flux/Mono API has many methods for all the possible use cases. sharebasically multicasts the data to multiple subscribers at the same. So in a way it converts the cold source to hot.
In your example, the source is already complete. When a second subscriber subscribes, it repeats & starts the emission once again.
So it behaves like cold in your case.
If you do not like this behavior, replace share() with cache(0) in your example. I think this is what you are looking for.
I will also include in this article.
Makes sense. Thank you for the explanation.
This is the best tutorial I found until now about Reactor. Thanks a lot vinsguru. Keep up the good work.