Site icon Vinsguru

RSocket With Spring Boot

Overview:

In this tutorial, I would like to show you how to integrate RSocket with Spring Boot. If you are new to RSocket, take a look at this article to learn more.

RSocket Interaction Models:

RSocket is a message passing protocol for multiplexed, duplex communication which supports TCP, WebSockets and Aeron (UDP). It supports following interaction models:

Interaction Model Behavior / Usage
fire-and-forget No response required
request-and-response Unary / Traditional request and response model
one-to-one
request-response-stream Send one message and gets multiple streaming response back
one-to-many
channel bi-directional stream. Continuous message exchange
many-to-many

Sample Application:

Lets consider a simple movie theater application to demo RSocket & its interaction models .

 

 

 

We will have a separate controller class for TV.

RSocket With Spring Boot – Project Setup:

Lets create a Spring Boot project with these dependencies.

Models/DTOs:

We will have following models.

public enum TicketStatus {

    TICKET_PENDING,
    TICKET_ISSUED,
    TICKET_CANCELLED;

}
@Data
@NoArgsConstructor
public class TicketRequest {

    private UUID requestId;
    private TicketStatus status = TicketStatus.TICKET_PENDING;

    public TicketRequest(UUID requestId) {
        this.requestId = requestId;
    }
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MovieScene {

    private int sceneId;
    private String sceneDescription;

}

Movie Service:

We have below service class which has list of scenes to be played for a movie.

@Service
public class MovieService {

    private final List<MovieScene> scenes =  List.of(
            new MovieScene(1, "Scene 1"),
            new MovieScene(2, "Scene 2"),
            new MovieScene(3, "Scene 3"),
            new MovieScene(4, "Scene 4"),
            new MovieScene(5, "Scene 5")
    );

    public List<MovieScene> getScenes(){
        return this.scenes;
    }

    public MovieScene getScene(int index){
        return this.scenes.get(index);
    }

}

RSocket With Spring Boot – Server Side:

RSocket is a message passing protocol. Spring Boot does all the heavy lifting by simply letting us annotate with message handler names as shown here.

@MessageMapping("ticket.cancel")
public void cancelTicket(Mono<TicketRequest> request){
    // cancel and refund asynchronously
    request
            .doOnNext(t -> t.setStatus(TicketStatus.TICKET_CANCELLED))
            .doOnNext(t -> System.out.println("cancelTicket :: " + t.getRequestId() + " : " + t.getStatus()))
            .subscribe();
}
@MessageMapping("ticket.purchase")
public Mono<TicketRequest> purchaseTicket(Mono<TicketRequest> request){
    return request
            .doOnNext(t -> t.setStatus(TicketStatus.TICKET_ISSUED))
            .doOnNext(t -> System.out.println("purchaseTicket :: " + t.getRequestId() + " : " + t.getStatus()));

}
@MessageMapping("movie.stream")
public Flux<MovieScene> playMovie(Mono<TicketRequest> request){
    return request
            .map(t -> t.getStatus().equals(TicketStatus.TICKET_ISSUED) ? this.movieService.getScenes() : Collections.emptyList())
            .flatMapIterable(Function.identity())
            .cast(MovieScene.class)
            .delayElements(Duration.ofSeconds(1));
}
@MessageMapping("tv.movie")
public Flux<MovieScene> playMovie(Flux<Integer> sceneIndex){
    return sceneIndex
            .map(index -> index - 1) // list is 0 based index
            .map(this.movieService::getScene)
            .delayElements(Duration.ofSeconds(1));
}
spring.rsocket.server.port=6565

RSocket With Spring Boot – Client Side:

@Configuration
public class RSocketConfig {

    @Bean
    public RSocketStrategies rSocketStrategies() {
        return RSocketStrategies.builder()
                .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
                .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
                .build();
    }

    @Bean
    public Mono<RSocketRequester> getRSocketRequester(RSocketRequester.Builder builder){
        return builder
                .rsocketConnector(rSocketConnector -> rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2))))
                .dataMimeType(MediaType.APPLICATION_CBOR)
                .connect(TcpClientTransport.create(6565));
    }

}
@Test
public void ticketCancel(){
    Mono<Void> mono = this.rSocketRequester
            .map(r -> r.route("ticket.cancel").data(new TicketRequest(UUID.randomUUID())))
            .flatMap(RSocketRequester.RetrieveSpec::send);

    StepVerifier.create(mono)
            .verifyComplete();
}
@Autowired
private Mono<RSocketRequester> rSocketRequester;

@Test
public void ticketPurchase(){
    Mono<TicketRequest> ticketRequestMono = this.rSocketRequester
            .map(r -> r.route("ticket.purchase").data(new TicketRequest(UUID.randomUUID())))
            .flatMap(r -> r.retrieveMono(TicketRequest.class))
            .doOnNext(r -> System.out.println(r.getRequestId() + ":" + r.getStatus()));
    StepVerifier.create(ticketRequestMono)
            .expectNextMatches(t -> t.getStatus().equals(TicketStatus.TICKET_ISSUED))
            .verifyComplete();
}
@Test
public void playMovie(){
    Mono<TicketRequest> ticketRequestMono = this.rSocketRequester
            .map(r -> r.route("ticket.purchase").data(new TicketRequest(UUID.randomUUID())))
            .flatMap(r -> r.retrieveMono(TicketRequest.class));

    Flux<MovieScene> sceneFlux = this.rSocketRequester
            .zipWith(ticketRequestMono)
            .map(tuple -> tuple.getT1().route("movie.stream").data(tuple.getT2()))
            .flatMapMany(r -> r.retrieveFlux(MovieScene.class))
            .doOnNext(m -> System.out.println("Playing : " + m.getSceneDescription()));

    // assert all the movie scenes
    StepVerifier.create(sceneFlux)
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 1"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 2"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 3"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 4"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 5"))
            .verifyComplete();
}
@Test
public void tvPlayMovie(){
    Flux<Integer> movieSceneFlux = Flux.just(1, 2, 2, 1, 2, 3, 3, 4, 5);
    Flux<MovieScene> tvFlux = this.rSocketRequester
            .map(r -> r.route("tv.movie").data(movieSceneFlux))
            .flatMapMany(r -> r.retrieveFlux(MovieScene.class))
            .doOnNext(m -> System.out.println("TV : " + m.getSceneDescription()));
    StepVerifier.create(tvFlux)
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 1"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 2"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 2"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 1"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 2"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 3"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 3"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 4"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 5"))
            .verifyComplete();
}

Summary:

We were able to successfully integrate RSocket with Spring Boot. Passing messages between client and server is very easy with Spring Boot. Serialization and Deserialization happen automatically for us. Based on the method parameter and return type – Spring Boot automatically finds what kind of interaction model to use.

Learn more about RSocket:

RSocket File Upload Example

 

The source code is available here.

Happy learning 🙂

 

Share This:

Exit mobile version