Overview:
In this tutorial, I would like to show you how to integrate RSocket with Spring Boot. If you are new to RSocket, take a look at this article to learn more.
RSocket Interaction Models:
RSocket is a message passing protocol for multiplexed, duplex communication which supports TCP, WebSockets and Aeron (UDP). It supports following interaction models:
Sample Application:
Lets consider a simple movie theater application to demo RSocket & its interaction models .
- A ticket purchase request would be sent. The application would respond back with ISSUED status.
- request-and-response
- A ticket cancel request would be sent. The application would process the refund asynchronously.
- fire-and-forget
- A play movie request would be sent with a valid ticket. The application will respond back with list of scenes to be played
- request-response-stream
We will have a separate controller class for TV.
- A play movie request would be sent with the list of scenes to be played. The application will respond with corresponding scenes.
- channel / bi-directional stream
RSocket With Spring Boot – Project Setup:
Lets create a Spring Boot project with these dependencies.
Models/DTOs:
We will have following models.
- TicketStatus
public enum TicketStatus {
TICKET_PENDING,
TICKET_ISSUED,
TICKET_CANCELLED;
}
- TicketRequest
@Data
@NoArgsConstructor
public class TicketRequest {
private UUID requestId;
private TicketStatus status = TicketStatus.TICKET_PENDING;
public TicketRequest(UUID requestId) {
this.requestId = requestId;
}
}
- MovieScene
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MovieScene {
private int sceneId;
private String sceneDescription;
}
Movie Service:
We have below service class which has list of scenes to be played for a movie.
@Service
public class MovieService {
private final List<MovieScene> scenes = List.of(
new MovieScene(1, "Scene 1"),
new MovieScene(2, "Scene 2"),
new MovieScene(3, "Scene 3"),
new MovieScene(4, "Scene 4"),
new MovieScene(5, "Scene 5")
);
public List<MovieScene> getScenes(){
return this.scenes;
}
public MovieScene getScene(int index){
return this.scenes.get(index);
}
}
RSocket With Spring Boot – Server Side:
RSocket is a message passing protocol. Spring Boot does all the heavy lifting by simply letting us annotate with message handler names as shown here.
- fire-and-forget
- Here we assume that cancel request would be sent to the “ticket.cancel” handler. The name can be anything. [supports alpha numeric and .]
- Note that we do not return anything.
@MessageMapping("ticket.cancel")
public void cancelTicket(Mono<TicketRequest> request){
// cancel and refund asynchronously
request
.doOnNext(t -> t.setStatus(TicketStatus.TICKET_CANCELLED))
.doOnNext(t -> System.out.println("cancelTicket :: " + t.getRequestId() + " : " + t.getStatus()))
.subscribe();
}
- request-response
- We change the incoming ticket status from pending to ISSUED and respond.
@MessageMapping("ticket.purchase")
public Mono<TicketRequest> purchaseTicket(Mono<TicketRequest> request){
return request
.doOnNext(t -> t.setStatus(TicketStatus.TICKET_ISSUED))
.doOnNext(t -> System.out.println("purchaseTicket :: " + t.getRequestId() + " : " + t.getStatus()));
}
- request-response-stream
- We first check if the request has a valid ticket issued
- Then we stream the movie scenes to be played
@MessageMapping("movie.stream")
public Flux<MovieScene> playMovie(Mono<TicketRequest> request){
return request
.map(t -> t.getStatus().equals(TicketStatus.TICKET_ISSUED) ? this.movieService.getScenes() : Collections.emptyList())
.flatMapIterable(Function.identity())
.cast(MovieScene.class)
.delayElements(Duration.ofSeconds(1));
}
- bi-directional stream
- Here we assume the users uses his TV. He randomly chooses the scenes to be played.
- We stream the corresponding scene back to the user.
@MessageMapping("tv.movie")
public Flux<MovieScene> playMovie(Flux<Integer> sceneIndex){
return sceneIndex
.map(index -> index - 1) // list is 0 based index
.map(this.movieService::getScene)
.delayElements(Duration.ofSeconds(1));
}
- Spring boot automatically starts the rsocket server based on this application property.
spring.rsocket.server.port=6565
RSocket With Spring Boot – Client Side:
- RSocket Configuration
@Configuration
public class RSocketConfig {
@Bean
public RSocketStrategies rSocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
}
@Bean
public Mono<RSocketRequester> getRSocketRequester(RSocketRequester.Builder builder){
return builder
.rsocketConnector(rSocketConnector -> rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2))))
.dataMimeType(MediaType.APPLICATION_CBOR)
.connect(TcpClientTransport.create(6565));
}
}
- RSocketRequester bean is the one to be autowired / to be used by the client.
- RSocketRequester has route method which is used to send the request to the specific handler we saw above.
- Check this below – cancel ticket test
- We send a TicketRequest to the “ticket.cancel” handler
@Test
public void ticketCancel(){
Mono<Void> mono = this.rSocketRequester
.map(r -> r.route("ticket.cancel").data(new TicketRequest(UUID.randomUUID())))
.flatMap(RSocketRequester.RetrieveSpec::send);
StepVerifier.create(mono)
.verifyComplete();
}
- ticket purchase request test
@Autowired
private Mono<RSocketRequester> rSocketRequester;
@Test
public void ticketPurchase(){
Mono<TicketRequest> ticketRequestMono = this.rSocketRequester
.map(r -> r.route("ticket.purchase").data(new TicketRequest(UUID.randomUUID())))
.flatMap(r -> r.retrieveMono(TicketRequest.class))
.doOnNext(r -> System.out.println(r.getRequestId() + ":" + r.getStatus()));
StepVerifier.create(ticketRequestMono)
.expectNextMatches(t -> t.getStatus().equals(TicketStatus.TICKET_ISSUED))
.verifyComplete();
}
- play movie – request-response stream – test
@Test
public void playMovie(){
Mono<TicketRequest> ticketRequestMono = this.rSocketRequester
.map(r -> r.route("ticket.purchase").data(new TicketRequest(UUID.randomUUID())))
.flatMap(r -> r.retrieveMono(TicketRequest.class));
Flux<MovieScene> sceneFlux = this.rSocketRequester
.zipWith(ticketRequestMono)
.map(tuple -> tuple.getT1().route("movie.stream").data(tuple.getT2()))
.flatMapMany(r -> r.retrieveFlux(MovieScene.class))
.doOnNext(m -> System.out.println("Playing : " + m.getSceneDescription()));
// assert all the movie scenes
StepVerifier.create(sceneFlux)
.expectNextMatches(m -> m.getSceneDescription().equals("Scene 1"))
.expectNextMatches(m -> m.getSceneDescription().equals("Scene 2"))
.expectNextMatches(m -> m.getSceneDescription().equals("Scene 3"))
.expectNextMatches(m -> m.getSceneDescription().equals("Scene 4"))
.expectNextMatches(m -> m.getSceneDescription().equals("Scene 5"))
.verifyComplete();
}
- tv movie play – bi-directional stream
@Test
public void tvPlayMovie(){
Flux<Integer> movieSceneFlux = Flux.just(1, 2, 2, 1, 2, 3, 3, 4, 5);
Flux<MovieScene> tvFlux = this.rSocketRequester
.map(r -> r.route("tv.movie").data(movieSceneFlux))
.flatMapMany(r -> r.retrieveFlux(MovieScene.class))
.doOnNext(m -> System.out.println("TV : " + m.getSceneDescription()));
StepVerifier.create(tvFlux)
.expectNextMatches(m -> m.getSceneDescription().equals("Scene 1"))
.expectNextMatches(m -> m.getSceneDescription().equals("Scene 2"))
.expectNextMatches(m -> m.getSceneDescription().equals("Scene 2"))
.expectNextMatches(m -> m.getSceneDescription().equals("Scene 1"))
.expectNextMatches(m -> m.getSceneDescription().equals("Scene 2"))
.expectNextMatches(m -> m.getSceneDescription().equals("Scene 3"))
.expectNextMatches(m -> m.getSceneDescription().equals("Scene 3"))
.expectNextMatches(m -> m.getSceneDescription().equals("Scene 4"))
.expectNextMatches(m -> m.getSceneDescription().equals("Scene 5"))
.verifyComplete();
}
Summary:
We were able to successfully integrate RSocket with Spring Boot. Passing messages between client and server is very easy with Spring Boot. Serialization and Deserialization happen automatically for us. Based on the method parameter and return type – Spring Boot automatically finds what kind of interaction model to use.
Learn more about RSocket:
The source code is available here.
Happy learning 🙂
great article,
just a question, ip and port in config is at startup, how to handle say for example you have 3 replica pods. we can get the ip of the pods, but what if the pod restart, so ip changed.
In real life we would not depend on IP. You can use the service name or some sort of client side load balancer / service discovery like consul etc.