Overview:
In this tutorial, I would like to demo Spring RSocket Load Balancing on the client side.
If you are new to RSocket, take a look at these articles on RSocket.
RSocket Load Balancing:
RSocket is a binary message passing protocol for client server application development which supports Reactive Streams. It establishes a persistent connection between the client and the server (like gRPC). It is a good choice for inter-Microservices communication. However there is a challenge in load balancing because of the persistent connection.
Lets see how we can achieve RSocket Load Balancing when we have a pool of servers.
High Level Setup:
Lets consider a simple client-server application in which we have 3 instances of the server-app are running. The client wants to send multiple requests and distribute the load across all the server instances. It gets the server instance details periodically from a service-registry (like consul/eureka..etc) and sends the requests accordingly.
Project Setup:
Create a Spring Boot application with RSocket & Lombok dependency.
Server Side:
- The server-app exposes an endpoint as shown here. It is simple and nothing else is required for this demo.
@Controller
public class ServerController {
@MessageMapping("square-calculator")
public Mono<Integer> square(Mono<Integer> input){
return input
.doOnNext(i -> System.out.println("Received : " + i))
.delayElement(Duration.ofMillis(500))
.map(i -> i * i);
}
}
- Run this command – do a package.
mvn clean package
- Run 3 instances of the app as shown here (run this command from the server-app/target ).
java -jar -Dspring.rsocket.server.port=6565 server-app-0.0.1-SNAPSHOT.jar
java -jar -Dspring.rsocket.server.port=6566 server-app-0.0.1-SNAPSHOT.jar
java -jar -Dspring.rsocket.server.port=6567 server-app-0.0.1-SNAPSHOT.jar
Client Side:
- I assume the client will be provided with the list of server instance details somehow (in real life you can get this from a service registry). For this demo I use this application.yaml.
rsocket:
square-service:
servers:
- host: localhost
port: 6565
- host: localhost
port: 6566
- host: localhost
port: 6567
- I create a simple model class to get the above details as shown here.
@Data
@ToString
public class RSocketServerInstance {
private String host;
private int port;
}
- Service Registry
- Here I assume you use something like consul/eureka from where you can fetch the list of dynamic IP addresses of server instances for the given service name.
- In our case, we will be running 3 instances of the server-app and every time I am going to exclude 1 just to simulate that instance is not healthy!
@Service
@ConfigurationProperties(prefix = "rsocket.square-service")
public class DummyServiceRegistry {
private List<RSocketServerInstance> servers;
private AtomicInteger atomicInteger = new AtomicInteger(0);
public void setServers(List<RSocketServerInstance> servers) {
this.servers = servers;
}
// we exclude 1 instance every time to simulate something is not available
public List<RSocketServerInstance> getServers() {
atomicInteger.getAndIncrement();
return IntStream.rangeClosed(0, 2)
.filter(i -> atomicInteger.get() % 3 != i)
.mapToObj(servers::get)
.collect(Collectors.toList());
}
}
- Load Balance Target
- Above class is simulating a service registry.
- Once we get the IP addresses of the healthy instances and create List<LoadbalanceTarget>
- We periodically query the service registry & make it as a Flux<List<LoadbalanceTarget>> – a never ending stream.
@Configuration
public class LoadBalanceTargetConfig {
@Autowired
private DummyServiceRegistry serviceRegistry;
@Bean
public Flux<List<LoadbalanceTarget>> targets(){
return Mono.fromSupplier(() -> serviceRegistry.getServers())
.repeatWhen(longFlux -> longFlux.delayElements(Duration.ofSeconds(2)))
.map(this::toLoadBalanceTarget);
}
private List<LoadbalanceTarget> toLoadBalanceTarget(List<RSocketServerInstance> rSocketServers){
return rSocketServers.stream()
.map(server -> LoadbalanceTarget.from(server.getHost() + server.getPort(), TcpClientTransport.create(server.getHost(), server.getPort())))
.collect(Collectors.toList());
}
}
- RSocket Configuration
- Once we have the Flux<List<LoadbalanceTarget>>, we build the RSocketRequester with that flux as shown here.
@Configuration
public class RSocketConfig {
@Bean
public RSocketStrategies rSocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
}
@Bean
public RSocketRequester rSocketClient(RSocketRequester.Builder builder, Flux<List<LoadbalanceTarget>> targetFlux){
return builder.transports(targetFlux, new RoundRobinLoadbalanceStrategy());
}
}
- That’s it! Now the RSocketRequester is ready to be used and balance the load across the given instances.
@SpringBootApplication
public class RSocketClientApplication implements CommandLineRunner {
@Autowired
private RSocketRequester rSocketRequester;
public static void main(String[] args) {
SpringApplication.run(RSocketClientApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
Flux.range(1, 10000)
.delayElements(Duration.ofMillis(100))
.flatMap(i -> rSocketRequester.route("square-calculator").data(i).retrieveMono(Integer.class).retry(1))
.doOnNext(i -> System.out.println("Response : " + i))
.blockLast();
}
}
When we run the client side application, we can see that RSocket Load Balancing happens across all the server instances.
Summary:
We were able to successfully demonstrate RSocket Load Balancing from Client Side. Special thanks to RSocket maintainer Oleg for clarifying my issues on this topic!
Read more about RSocket.
- RSocket Security With Spring
- RSocket File Upload Example
- RSocket + WebSocket + Spring Boot = Real Time Application
The source code is available here.
Happy learning 🙂