Overview
In this tutorial, I would like to demonstrate Scatter Gather Pattern which is one of the Enterprise Integration Patterns for the distributed system architecture using NATS messaging server.
Please check my previous artcile here to know more about NATS.
Scatter Gather Pattern
Let’s consider an application in which we need to do a set of tasks to complete the business workflow. If these tasks do not depend on each other, then it does not make sense to do them sequentially. We can do these tasks in parallel.
Scatter Gather Pattern helps us to distribute these tasks to achieve parallel processing of tasks/messages/events & finally aggregate the responses as a single response as shown above.
Sample Application
Let’s consider a flight booking application in which user searches for flight deals. The application sends the information to all the airlines, find their fares and then responds back.
As our application depends on 3rd party APIs and we need to provide best user experience to our user, we will publish the user request to all airlines and whichever responds within specific timeout period, we will collect all results and show the top 5 deals to our users.
The main application does not even know how many airlines are listening to the requests. Even if some of the airlines services are not be up and running, it is not going to affect our flight-app.
NATS Server
Please ensure that NATS server is up and running. We can easily spin up NATS by using docker.
docker run -p 4222:4222 nats:alpine
Project Setup
Create a Spring Boot application with below dependencies.
It will be a multi-module maven project as shown here.
Our project depends on super-fast NATS messaging server. So add this dependency as well.
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.6.8</version>
</dependency>
Common DTO
- Flight Search Request
@Data
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
public class FlightSearchRequest {
private String from;
private String to;
}
- Flight Schedule
@Data
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
public class FlightSchedule {
private String date;
private int price;
private String airline;
}
- Flight Search Response
@Data
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
public class FlightSearchResponse {
private FlightSearchRequest searchRequest;
private List<FlightSchedule> schedules;
}
Airline – Service
This service class represents the individual airlines. It receives the request and provide the schedules along with price.
- This is a separate app. We would be running multiple instances of this app.
public class AirlineService {
private static final String AIRLINE = Objects.toString(System.getenv("AIRLINE_NAME"), "UNKNOWN");
private static final String NATS_SERVER = Objects.toString(System.getenv("NATS_SERVER"), "nats://localhost:4222");
public static void main(String[] args) throws IOException, InterruptedException {
final Connection nats = Nats.connect(NATS_SERVER);
final Dispatcher dispatcher = nats.createDispatcher(msg -> {});
dispatcher.subscribe("flight.search", (msg) -> {
ObjectUtil.toObject(msg.getData(), FlightSearchRequest.class)
.ifPresent(searchRequest -> {
List<FlightSchedule> flightSchedules = getFlightSchedules(searchRequest);
nats.publish(msg.getReplyTo(), ObjectUtil.toBytes(flightSchedules));
});
});
}
private static List<FlightSchedule> getFlightSchedules(FlightSearchRequest searchRequest){
// input parameter is not used
int randomNoResponse = ThreadLocalRandom.current().nextInt(0, 3);
return IntStream.rangeClosed(0, randomNoResponse)
.mapToObj(i -> getRandomSchedule())
.collect(Collectors.toList());
}
private static FlightSchedule getRandomSchedule(){
int randomDate = ThreadLocalRandom.current().nextInt(0, 30);
int randomPrice = ThreadLocalRandom.current().nextInt(50, 500);
var date = LocalDate.now().plusDays(randomDate);
return FlightSchedule.of(date.toString(), randomPrice, AIRLINE);
}
}
Scatter Gather Pattern
Now lets work on the flight-search customer facing app.
- NATS bean
@Bean
public Connection nats(@Value("${nats.server}") String natsServer) {
return Nats.connect(natsServer);
}
- Controller
@RestController
@RequestMapping("flight")
public class FlightSearchController {
@Autowired
private BroadcastService service;
@GetMapping("/{from}/{to}")
public Mono<FlightSearchResponse> search(@PathVariable String from, @PathVariable String to){
return this.service.broadcast(FlightSearchRequest.of(from, to));
}
}
- ScatterGatherService
- This class is responsible for broadcasting the request and receiving the responses
@Service
public class ScatterGatherService {
@Autowired
private Connection nats;
public Mono<FlightSearchResponse> broadcast(FlightSearchRequest flightSearchRequest){
// create inbox
String inbox = nats.createInbox();
Subscription subscription = nats.subscribe(inbox);
return Flux.generate((SynchronousSink<FlightSchedule[]> fluxSink) -> receiveSchedules(fluxSink, subscription))
.flatMap(Flux::fromArray)
.bufferTimeout(5, Duration.ofSeconds(1))
.map(list -> {
list.sort(Comparator.comparing(FlightSchedule::getPrice));
return list;
})
.map(list -> FlightSearchResponse.of(flightSearchRequest, list))
.next()
.doFirst(() -> nats.publish("flight.search", inbox, ObjectUtil.toBytes(flightSearchRequest)))
.doOnNext(i -> subscription.unsubscribe());
}
private void receiveSchedules(SynchronousSink<FlightSchedule[]> synchronousSink, Subscription subscription){
try{
Message message = subscription.nextMessage(Duration.ofSeconds(1));
ObjectUtil.toObject(message.getData(), FlightSchedule[].class).ifPresent(synchronousSink::next);
}catch (Exception e){
synchronousSink.error(e);
}
}
}
Scatter Gather Pattern – Demo
- I send a request
http://localhost:8080/flight/Houston/LasVegas
- I receive a response as shown here
{
"searchRequest":{
"from":"Houston",
"to":"LasVegas"
},
"schedules":[
{
"date":"2021-01-02",
"price":72,
"airline":"DELTA"
},
{
"date":"2020-12-28",
"price":87,
"airline":"UNITED_AIRLINE"
},
{
"date":"2021-01-02",
"price":109,
"airline":"FRONTIER"
},
{
"date":"2021-01-08",
"price":229,
"airline":"UNITED_AIRLINE"
},
{
"date":"2021-01-02",
"price":408,
"airline":"DELTA"
}
]
}
Summary
We were able to successfully demonstrate the use of Scatter Gather Pattern in our Microservices Architecture to efficiently process tasks in parallel and aggregate the results finally.
Read more Microservice Design Patterns.
- Microservice Pattern – Competing Consumers Pattern Implementation With Kubernetes
- CQRS Pattern – Microservice Design Patterns
- Materialized View PostgreSQL – Microservice Design Patterns
The source code is available here.
Happy learning 🙂
Hi Vinoth,
It seems instructions are not clear on how to start the Nets server. When I run the airlines app, I get below
Exception in thread “main” java.io.IOException: Unable to connect to NATS servers: nats://localhost:4222.
at io.nats.client.impl.NatsConnection.connect(NatsConnection.java:239)
at io.nats.client.impl.NatsImpl.createConnection(NatsImpl.java:29)
at io.nats.client.Nats.createConnection(Nats.java:249)
at io.nats.client.Nats.connect(Nats.java:131)
at com.example.service.AirlineService.main(AirlineService.java:26)
I am really sorry!! Thanks again for reporting such issues. I have updated the instructions.