Site icon Vinsguru

RSocket With Java – Getting Started

Overview:

In this tutorial, I would like to introduce you to RSocket with Java, its interaction models and how it will be helpful for Microservices communication or client – server application development.

RSocket:

RSocket is a binary & message passing protocol over a single connection between client and server. It supports TCP, WebSockets and Aeron (UDP).

It supports following interaction models.

Interaction Model Behavior / Usage
fire-and-forget No response required
request-and-response Unary / Traditional request and response model
one-to-one
request-response-stream Send one message and gets multiple streaming response back
one-to-many
channel bi-directional stream. Continuous message exchange
many-to-many

Need For RSocket:

Microservices are the popular way to design distributed systems. A big monolith application is broken down into multiple independent Microservices. These Microservices have few advantages compared to traditional monolith applications – like easy to deploy, scale, reusable etc. Microservices do not come alone. There will be always more than 1 service. These Microservices communicate with each other mostly with REST using HTTP/1.1 protocol by exchanging JSON.

If an user tries to send a request to order-service to order a product, order-service might internally send multiple requests to other services to fulfill the request. One request from user might trigger multiple internal requests among microservices in an application design.

REST is simple and very easy to use. REST is great for browser. Easy to test our APIs. Developers love this. However most of our current implementation is with HTTP/1.1 which has following issues.

REST is good between browser and back-end. But we need something better than REST for inter microservices communication to avoid above mentioned issues.

RSocket vs gRPC:

We had already discussed gRPC which is another replacement for REST based communication in this site here. So, you might want to compare RSocket vs gRPC.

RSocket With Java:

Include the below maven dependencies to get started with rsocket.

<dependency>
    <groupId>io.rsocket</groupId>
    <artifactId>rsocket-core</artifactId>
    <version>1.0.1</version>
</dependency>
<dependency>
    <groupId>io.rsocket</groupId>
    <artifactId>rsocket-transport-netty</artifactId>
    <version>1.0.1</version>
</dependency>

In this article, we would directly create server, clients without any framework integration. In the following articles, We would integrate with Spring Boot to simplify rsocket development.

RSocket – Interaction Models:

We would be taking a look at following method implementations of RSocket interface.

private static class SimpleRSocket implements RSocket {
        @Override
        public Mono<Void> fireAndForget(Payload payload) {
        }

        @Override
        public Mono<Payload> requestResponse(Payload payload) {
        }

        @Override
        public Flux<Payload> requestStream(Payload payload) {
        }
}

Do note that RSocket is a binary protocol. The request payload and response would be in the ByteBuffer format. To keep things simple, we would be using String for this article. We can use complex data types when we integrate with Spring Boot.

@Override
public Mono<Void> fireAndForget(Payload payload) {
    // just print the received string
    var str = payload.getDataUtf8();
    System.out.println("Received :: " + str);
    return Mono.empty();
}
@Override
public Mono<Payload> requestResponse(Payload payload) {
    // just convert to upper case
    var str = payload.getDataUtf8();
    return Mono.just(DefaultPayload.create(str.toUpperCase()));
}
@Override
public Flux<Payload> requestStream(Payload payload) {
    // convert the given str to char array and return
    var str = payload.getDataUtf8();
    return Flux.fromStream(str.chars().mapToObj(i -> (char) i))
            .map(Object::toString)
            .map(DefaultPayload::create);
}

Socket Acceptor Implementation:

Create a SocketAcceptor implementation as shown here with our RSocket implementation.

public class SimpleRSocketAcceptor implements SocketAcceptor {

    @Override
    public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
        return Mono.just(new SimpleRSocket());
    }

}

RSocket With Java – Server Setup:

Our RSocket server is very simple. It listens on port 6565. We add the socket acceptor instance, so that our server will know how to process the requests.

public class Server {

    private static Disposable disposable;

    public static void start() {
        RSocketServer rSocketServer = RSocketServer.create();
        rSocketServer.acceptor(new SimpleRSocketAcceptor());
        rSocketServer.payloadDecoder(PayloadDecoder.ZERO_COPY);
        disposable = rSocketServer.bind(TcpServerTransport.create(6565))
                     .subscribe();
    }

    public static void stop(){
        disposable.dispose();
    }

}

RSocket With Java – Client Setup:

Our server is ready! Lets create JUnit test class to test our RSocket method implementations.

private static RSocket rSocket;

@BeforeClass
public static void setUpClient(){
    Server.start();
    rSocket = RSocketConnector.connectWith(TcpClientTransport.create(6565))
                    .block();
}
private Flux<Payload> getRequestPayload(){
   return Flux.just("hi", "hello", "how", "are", "you")
            .delayElements(Duration.ofSeconds(1))
            .map(DefaultPayload::create);
}
@Test
public void fireAndForget(){
    this.getRequestPayload()
            .flatMap(payload -> rSocket.fireAndForget(payload))
            .blockLast(Duration.ofMinutes(1));
}
// output

Received :: hi
Received :: hello
Received :: how
Received :: are
Received :: you
@Test
public void requestAndResponse(){
    this.getRequestPayload()
            .flatMap(payload -> rSocket.requestResponse(payload))
            .doOnNext(response -> System.out.println("Response from server :: " + response.getDataUtf8()))
            .blockLast(Duration.ofMinutes(1));
}
// output

Response from server :: HI
Response from server :: HELLO
Response from server :: HOW
Response from server :: ARE
Response from server :: YOU
@Test
public void requestAndResponseStream(){
    this.getRequestPayload()
            .flatMap(payload -> rSocket.requestStream(payload))
            .doOnNext(response -> System.out.println("Response from server :: " + response.getDataUtf8()))
            .blockLast(Duration.ofMinutes(1));
}
// output

Response from server :: h
Response from server :: i
Response from server :: h
Response from server :: e
Response from server :: l
Response from server :: l
Response from server :: o
Response from server :: h
Response from server :: o
Response from server :: w
Response from server :: a
Response from server :: r
Response from server :: e
Response from server :: y
Response from server :: o
Response from server :: u

Summary:

We were able to successfully demonstrate RSocket with Java. In the following articles, Lets see Spring boot integration, performance comparison between RSocket vs gRPC etc.

Learn more about RSocket.

The complete source code is available here.

Happy learning 🙂

Share This:

Exit mobile version