Overview:
In this tutorial, I would like to show you how to apply the reactive streams & develop Reactive gRPC. If you are new to both gRPC and reactor, then you might have to check these articles first!
Prerequisite:
Reactive gRPC:
In the above Unary/Streaming API implementation for gRPC, we had seen that gRPC java library by default creates bindings with StreamObserver. It works just fine. Interestingly there is a reactive grpc implementation which provides different sets of bindings using project reactor to apply reactive streams on gRPC.
Lets see how we to implement that.
Service Definition:
To keep things simple, I am going to consider a simple Calculator Service with below service definition containing unary and streaming APIs.
- findSquare: an unary call in which the server returns the square of the given number.
- findFactors: a server streaming API in which we find the factors which can divide the given number with 0 remainder. (We do not consider 1 here)
- findSum: a client streaming API in which we find the sum of all the numbers client sent to the server.
syntax = "proto3";
package calculator;
option java_package = "com.vinsguru.calculator";
option java_multiple_files = true;
message Input {
int32 number = 1;
}
message Output {
int64 result = 1;
}
service CalculatorService {
// unary
rpc findSquare(Input) returns (Output);
// server stream
rpc findFactors(Input) returns (stream Output);
// client stream
rpc findSum(stream Input) returns (Output);
}
- Along with gRPC specific dependencies, I have to include this below plugin.
<protocPlugins>
<protocPlugin>
<id>reactor-grpc</id>
<groupId>com.salesforce.servicelibs</groupId>
<artifactId>reactor-grpc</artifactId>
<version>1.0.1</version>
<mainClass>com.salesforce.reactorgrpc.ReactorGrpcGenerator</mainClass>
</protocPlugin>
</protocPlugins>
When we issue the below maven command, maven automatically creates the client and server side code using protoc tool.
mvn clean compile
ReactorCalculatorServiceGrpc class in the below picture is auto-generated abstract class which needs to be implemented by the server for the above service definition. If you notice the method signature, it uses Flux / Mono Instead of StreamObserver.
Reactive gRPC – Server Side:
Service Implementation: Let’s extend the abstract ReactorCalculatorServiceGrpc.CalculatorServiceImplBase to add our implementation for all the APIs.
public class CalculatorService extends ReactorCalculatorServiceGrpc.CalculatorServiceImplBase {
@Override
public Mono<Output> findSquare(Mono<Input> request) {
return request.map(Input::getNumber)
.map(i -> Output.newBuilder().setResult(i * i).build());
}
@Override
public Flux<Output> findFactors(Mono<Input> request) {
return request.map(Input::getNumber)
.filter(i -> i > 0)
.flatMapMany(input -> Flux.range(2, input / 2)
.filter(f -> input % f == 0))
.map(o -> Output.newBuilder().setResult(o).build());
}
@Override
public Mono<Output> findSum(Flux<Input> request) {
return request
.map(Input::getNumber)
.reduce(Integer::sum)
.map(i -> Output.newBuilder().setResult(i).build());
}
}
Once the service implementation is done, Let’s add it to the server to serve the client calls. We are listening on port 6565. Start this server by invoking the main method.
public class CalculatorServer {
public static void main(String[] args) throws IOException, InterruptedException {
// build gRPC server
Server server = ServerBuilder.forPort(6565)
.addService(new CalculatorService())
.build();
// start
server.start();
// shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("gRPC server is shutting down!");
server.shutdown();
}));
server.awaitTermination();
}
}
Now our server is ready, up and running!
gRPC Course:
I learnt gRPC + Protobuf in a hard way. But you can learn them quickly on Udemy. Yes, I have created a separate step by step course on Protobuf + gRPC along with Spring Boot integration for the next generation Microservice development. Click here for the special link.
Reactive gRPC – Client Side:
To access the server methods, we need stubs. As usual the Protobuf will automatically create the stubs for us. The client has to do the following to make a request and receive the response.
- Creating channel: The client has to create a channel/connection with the back-end server first.
- Stub: The client will use a reactive stub to make a request by passing the required parameters.
To demo this, I am going to create a simple JUnit test class to act like a gRPC client. Do note that client can be anything. It could even be another microservice.
public class ReactiveGrpcTest {
private ManagedChannel channel;
private ReactorCalculatorServiceGrpc.ReactorCalculatorServiceStub serviceStub;
@Before
public void setup(){
this.channel = ManagedChannelBuilder
.forAddress("localhost", 6565)
.usePlaintext()
.build();
this.serviceStub = ReactorCalculatorServiceGrpc.newReactorStub(channel);
}
@Test
public void findSquareTest(){
Input input = Input.newBuilder()
.setNumber(5)
.build();
this.serviceStub.findSquare(input)
.map(Output::getResult)
.as(StepVerifier::create)
.expectNext(25L)
.verifyComplete();
}
@Test
public void findFactorsTest(){
Input input = Input.newBuilder()
.setNumber(20)
.build();
this.serviceStub.findFactors(input)
.map(Output::getResult)
.as(StepVerifier::create)
.expectNext(2L, 4L, 5L, 10L)
.verifyComplete();
}
@Test
public void sumAllTest(){
Flux<Input> inputFlux = Flux.range(1, 10)
.map(i -> Input.newBuilder().setNumber(i).build());
this.serviceStub.findSum(inputFlux)
.map(Output::getResult)
.as(StepVerifier::create)
.expectNext(55L)
.verifyComplete();
}
}
When we run this test, we get the results as expected.
Summary:
We were able to successfully implement Reactive gRPC by using project reactor .
The complete source code is available here.
Happy learning 🙂
Test
Your test passed!