Overview:
In this reactive programming series, Lets see how we could combine multiple sources of reactive streams. If you have not read the previous articles in these series, please take a look at them first in the below order.
- Reactive Programming – A Simple Introduction
- Reactive Programming – Creating Sequences – Flux vs Mono
- Reactive Programming – Publisher Types – Cold Vs Hot
- Reactive Programming – Flux – Create Vs Generate
- Reactive Programming – Schedulers
You might wonder why one would need to combine multiple publishers! You would have seen draining pipes likes this which collects the rainwater into a tank in an efficient way!
Similarly when you have multiple sources of data streams, you might want to collect them and make them pass through a single downstream pipeline for efficient processing of data.
Creating Hot Publishers:
Lets first create a couple of hot publishers to understand the behavior. We will be emitting data through these two sources periodically and study how the combining methods behaves.
DirectProcessor<String> source1 = DirectProcessor.create();
DirectProcessor<String> source2 = DirectProcessor.create();
We will create runnables to start emitting data at random interval.
- source1 emits from 0 to 4
- source2 emits from 0 to 9
//emitting data through source 1
Runnable r1 = () -> {
for (int i = 0; i < 5; i++) {
int idle = ThreadLocalRandom.current().nextInt(100, 1000);
sleep(Duration.ofMillis(idle));
source1.onNext("source1 - " + i);
}
source1.onComplete();
};
//emitting data through source 2
Runnable r2 = () -> {
for (int i = 0; i < 10; i++) {
int idle = ThreadLocalRandom.current().nextInt(100, 1000);
sleep(Duration.ofMillis(idle));
source2.onNext("source2 - " + i);
}
source2.onComplete();
};
//lets run via in separate threads
new Thread(r1).start();
new Thread(r2).start();
Here sleep is a helper method I have created to block the thread for the given duration in milliseconds.
source1
.subscribe(System.out::println);
source2
.subscribe(System.out::println);
If we run the above code, we can see that these 2 pipelines print the data on the console as and when elements are emitted.
Now lets discuss the various methods we have, to combine these sources.
concat:
The concat method works as shown here. It connects to the data sources sequentially in the given order and collects all the data emitted from the sources. That is, it first collects all the data from source1, then connects to source2 and collects the data and so on.
// concat 2 sources and direct the emitted data through single pipeline
Flux.concat(source1, source2)
.subscribe(System.out::println);
Output:
source1 - 0
source1 - 1
source1 - 2
source1 - 3
source1 - 4
source2 - 2
source2 - 3
source2 - 4
source2 - 5
source2 - 6
If you see, first we see only the source1 data. Once it completes, then we see source2 data. By the time source1 completes, source2 had already emitted some data which we lost (It is a hot publisher). So we see only the latest from source2 – starting from element 2. We do not see element 0 and 1 from source2 as concat method was busy collecting elements from source1 as it has not completed then.
concatWith:
The above concat method is a convenient static method from Flux. Below method is the instance method of source1 which concatenates with other sources. The behavior would be exactly same.
source1.concatWith(source2)
.subscribe(System.out::println);
concatDelayError:
When we combine multiple sources like this, one of the sources might throw some error at run time. We could either stop processing immediately or delay the error until we process elements from all the sources. This concatDelayError helps us to suppress the runtime error for a while.
I just modify the runnable1 to throw error after emitting element 4.
Runnable r1 = () -> {
for (int i = 0; i < 5; i++) {
int idle = ThreadLocalRandom.current().nextInt(100, 1000);
sleep(Duration.ofMillis(idle));
source1.onNext("source1 - " + i);
}
source1.onError(new RuntimeException("source1 error"));
};
Without concatDelayError, the output would be as shown here. As the source1 throws the error, our pipeline will simply stop.
Output:
source1 - 0
source1 - 1
source1 - 2
source1 - 3
source1 - 4
Exception in thread "Thread-0" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: source1 error
Caused by: java.lang.RuntimeException: source1 error
Lets use concatDelayError and observe the behavior.
Flux.concatDelayError(source1, source2)
.subscribe(System.out::println);
Output:
The error is emitted once we processed all the elements.
source1 - 0
source1 - 1
source1 - 2
source1 - 3
source1 - 4
source2 - 4
source2 - 5
source2 - 6
source2 - 7
source2 - 8
source2 - 9
Exception in thread "Thread-1" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: source1 error
Caused by: java.lang.RuntimeException: source1 error
at com.pearson.qglobal.util.FluxSequenceTest.lambda$main$1(FluxSequenceTest.java:30)
at java.base/java.lang.Thread.run(Thread.java:835)
combineLatest:
This combineLatest accepts multiple sources and a combining function to combine the data from all the sources.
Lets assume source1 emits 1,2,3,4… every 1 second and source2 emits a,b,c…etc every 3 seconds. This combineLatest method combines the latest data like this 1a, 2a,3a, 4b… etc
When source1 emits 2, source2 latest emitted data would be a as source2 emits data only every 3 seconds. So whenever a source emits data, we will combine with other sources latest emitted data.
Flux.combineLatest(source1, source2, (a, b) -> a + " :: " + b)
.subscribe(System.out::println);
Output:
source1 - 0 :: source2 - 0
source1 - 0 :: source2 - 1
source1 - 0 :: source2 - 2
source1 - 0 :: source2 - 3
source1 - 1 :: source2 - 3
source1 - 1 :: source2 - 4
source1 - 2 :: source2 - 4
source1 - 3 :: source2 - 4
source1 - 3 :: source2 - 5
source1 - 4 :: source2 - 5
merge:
This is almost same as above concat. The major difference between these 2 is – merge connects to all the data sources eagerly. So as and when data is emitted from the any of the sources, it would be passed to the downstream pipeline.
Flux.merge(source1, source2)
.subscribe(System.out::println);
Output:
The output indicates that we receive elements from the sources as and when they emit the data. concat method would start connecting to source2 only after source1 is completed. But merge connects to all the sources immediately.
source1 - 0
source2 - 0
source2 - 1
source1 - 1
source2 - 2
source1 - 2
source2 - 3
source1 - 3
source2 - 4
source1 - 4
mergeWith:
Similar to concatWith, there is a mergeWith instance method. This behavior would be same as above merge method.
source1.mergeWith(source2)
.subscribe(System.out::println);
mergeDelayError:
Again similar to concatDelayError, there is also a mergeDelayError to suppress any error for a while. Here the first parameter is ‘prefetch’. Prefetch is simply an integer to request initial amount of data from the subscriber to a publisher.
Flux.mergeDelayError(15, source1, source2)
.subscribe(System.out::println);
zip:
The zip method collects the data from sources and places them inside an object (Tuple – something like a box) and passes to the downstream. We can get the specific object we are interested in from the Tuple.
Zip will work as long as both sources emit data. Any of the sources completes/throws error, it will stop.
Flux.zip(source1, source2)
.subscribe(System.out::println);
Output:
Here the output stops after source1 emits 4 even though source2 keeps emitting data after that.
[source1 - 0,source2 - 0]
[source1 - 1,source2 - 1]
[source1 - 2,source2 - 2]
[source1 - 3,source2 - 3]
[source1 - 4,source2 - 4]
Zip passes the tuple object downstream – we can get the specific object we are interested in as shown below.
Flux.zip(source1, source2)
.subscribe(tuple -> {
System.out.println("First : " + tuple.getT1());
System.out.println("Second : " + tuple.getT2());
});
Output:
First : source1 - 0
Second : source2 - 0
First : source1 - 1
Second : source2 - 1
First : source1 - 2
Second : source2 - 2
zipWith:
By this time, I am sure you know what it is for!
source1.zipWith(source2)
.subscribe(System.out::println);
Mono:
In our examples above we had seen only Flux sources. Same techniques can be applied with Mono as well.
Mono<String> monoSource1 = Mono.just("a");
Mono<String> monoSource2 = Mono.just("1");
monoSource1.concatWith(monoSource2)
.subscribe(System.out::println);
Output:
a
1
Summary:
Reactor is one of the coolest libraries in Java which makes the non-blocking reactive programming is very easy. Here we demonstrated various methods to combine multiple sources of publishers.
Happy coding 🙂