Overview:
In this tutorial, I would like to show you how to implement gRPC Bidirectional Streaming API in Java.
I assume that you have a basic understanding of what gRPC is. If not, read the below articles first.
- Protocol Buffers – A Simple Introduction
- gRPC – An Introduction Guide
- gRPC Unary API In Java – Easy Steps
- gRPC Server Streaming API In Java
- gRPC Client Streaming API In Java
gRPC Bidirectional Streaming:
In the gRPC Bidirectional streaming API, the client and server can exchange multiple messages between them via a single TCP connection. These message exchanges can be completely independent of each other. The client and server can close the call when they are done with the message exchanges.
For ex: Consider these application behaviors. They are good example of client-server bidirectional streaming.
- Google search screen: As soon as you enter some keywords, it is sent to the server and the server immediately responds with possible search keywords.
- Netflix/YouTube: Based on the videos you search/watch, you get more suggestions related to that.
Sample Application:
For this tutorial, We are going to create an interesting GPS for your car! That is – you would like to travel from point A to point B which is 100 units apart. Once you start driving, we would be syncing with the server every 3 seconds and tracking your position (in this case based on the distance you traveled in every 3 seconds). Our server will respond with remaining distance to the destination and approximate time taken to reach the destination until the trip is completed!
Protobuf – Service Definition:
Let’s create a service definition for this. navigate is going to be the method to be implemented on the server side. This service definition shows what type of input to be sent and what type of output to expect. We use the stream keyword for both input & output to indicate that it is going to be a bidirectional streaming request/response.
syntax = "proto3";
package gps;
option java_package = "com.vinsguru.gps";
option java_multiple_files = true;
message TripRequest {
int32 distanceTravelled = 1;
}
message TripResponse {
int32 remainingDistance = 1;
int32 timeToDestination = 2;
}
service NavigationService {
// grpc bidirectional stream
rpc navigate(stream TripRequest) returns (stream TripResponse);
}
When we issue the below maven command, maven automatically creates the client and server side code using protoc tool.
mvn clean compile
NavigatorServiceImplBase class in the below picture is auto-generated abstract class which needs to be implemented by the server for the above service definition. Similarly NavigatorServiceStub is the actual implementation class which client should use to make a request
gRPC Bidirectional Streaming – Server Side:
Service Implementation: Let’s extend the abstract NavigatorServiceImplBase to add our implementation to respond to the navigate call. The server can expect multiple input objects from client as it is a streaming request. When the client calls onCompleted, it notifies the server that it has reached the destination. At that time server can also close the call.
- When we first start, we initialize the distance as 100 units, we set the start time etc.
- Every 3 seconds, the client will send the units it traveled in 3 seconds window as part of the onNext call.
- When we receive the onNext call, we check the total units traveled by the client and calculate the remaining distance.
- We also the know the speed of the client. (Distance traveled / Time taken). Using this speed, we also calculate the approximate time taken to reach the destination.
public class TripRequestObserver implements StreamObserver<TripRequest> {
private final int totalDistance = 100;
private LocalTime startTime = LocalTime.now();
private int distanceTraveled;
private final StreamObserver<TripResponse> tripResponseStreamObserver;
public TripRequestObserver(StreamObserver<TripResponse> tripResponseStreamObserver) {
this.tripResponseStreamObserver = tripResponseStreamObserver;
}
@Override
public void onNext(TripRequest tripRequest) {
this.distanceTraveled = Math.min(totalDistance, (this.distanceTraveled + tripRequest.getDistanceTravelled()));
int remainingDistance = Math.max(0, (totalDistance - distanceTraveled));
// the client has reached destination
if(remainingDistance == 0){
this.tripResponseStreamObserver.onNext(TripResponse.getDefaultInstance());
return;
}
// client has not yet reached destination
long elapsedDuration = Duration.between(this.startTime, LocalTime.now()).getSeconds();
elapsedDuration = elapsedDuration < 1 ? 1 : elapsedDuration;
double currentSpeed = (distanceTraveled * 1.0d) / elapsedDuration;
int timeToReach = (int) (remainingDistance / currentSpeed);
TripResponse tripResponse = TripResponse.newBuilder()
.setRemainingDistance(remainingDistance)
.setTimeToDestination(timeToReach)
.build();
this.tripResponseStreamObserver.onNext(tripResponse);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
this.tripResponseStreamObserver.onCompleted();
System.out.println("Client reached safely");
}
}
The above implementation is the actual business logic for our use case which needs to be executed whenever the client initiates a new navigate request.
public class NavigationService extends NavigationServiceGrpc.NavigationServiceImplBase {
@Override
public StreamObserver<TripRequest> navigate(StreamObserver<TripResponse> responseObserver) {
return new TripRequestObserver(responseObserver);
}
}
Once the service implementation is done, Let’s add it to the server to serve the client calls. We are listening on port 6565. Start this server by invoking the main method.
public class GPSServer {
public static void main(String[] args) throws IOException, InterruptedException {
// build gRPC server
Server server = ServerBuilder.forPort(6565)
.addService(new NavigationService())
.build();
// start
server.start();
// shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("GPS is shutting down!");
server.shutdown();
}));
server.awaitTermination();
}
}
Now our server is ready, up and running!
gRPC Bidirectional Streaming – Client Side:
Protobuf already has generated the client library. As a first step to make this request, we need to have an implementation for the StreamObserver.
- As part of the drive method, we would travel for 3 seconds and let the server know the units traveled in 3 seconds. We keep doing this again and again until we reach the destination.
- When we reach the destination, we let the server know that, this call can be closed.
public class TripResponseStreamObserver implements StreamObserver<TripResponse> {
private StreamObserver<TripRequest> requestStreamObserver;
@Override
public void onNext(TripResponse tripResponse) {
if(tripResponse.getRemainingDistance() > 0){
print(tripResponse);
this.drive();
}else{
this.requestStreamObserver.onCompleted();
}
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
System.out.println("Trip Completed");
}
public void startTrip(StreamObserver<TripRequest> requestStreamObserver){
this.requestStreamObserver = requestStreamObserver;
this.drive();
}
private void drive(){
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
TripRequest tripRequest = TripRequest.newBuilder().setDistanceTravelled(ThreadLocalRandom.current().nextInt(1, 10)).build();
requestStreamObserver.onNext(tripRequest);
}
private void print(TripResponse tripResponse){
System.out.println(LocalTime.now() + ": Remaining Distance : " + tripResponse.getRemainingDistance());
System.out.println(LocalTime.now() + ": Time To Reach (sec): " + tripResponse.getTimeToDestination());
System.out.println("------------------------------");
}
}
The client has to do the following to make a request and receive the response.
- Creating channel: The client has to create a channel/connection with the back-end server first.
- Stub: The client will use a non-blocking stub to make a request by passing the required parameters.
To demo this, I am going to create a simple JUnit test class to act like a gRPC client. Do note that client can be anything. It could even be another microservice.
public class BiDirectionalStreamingTest {
private ManagedChannel channel;
private NavigationServiceGrpc.NavigationServiceStub clientStub;
@Before
public void setup(){
this.channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext()
.build();
this.clientStub = NavigationServiceGrpc.newStub(channel);
}
@Test
public void tripTest() throws InterruptedException {
TripResponseStreamObserver tripResponseStreamObserver = new TripResponseStreamObserver();
StreamObserver<TripRequest> requestStreamObserver = this.clientStub.navigate(tripResponseStreamObserver);
tripResponseStreamObserver.startTrip(requestStreamObserver);
}
@After
public void teardown(){
this.channel.shutdown();
}
}
Now we are ready to drive!
Demo:
When I run the test, I am able to see output as shown here.
...
14:25:03.214192: Remaining Distance : 23
14:25:03.214291: Time To Reach (sec): 12
------------------------------
14:25:06.217532: Remaining Distance : 15
14:25:06.217628: Time To Reach (sec): 7
------------------------------
14:25:09.220906: Remaining Distance : 7
14:25:09.221105: Time To Reach (sec): 3
------------------------------
14:25:12.224692: Remaining Distance : 5
14:25:12.224855: Time To Reach (sec): 2
------------------------------
Trip Completed
gRPC Course:
I learnt gRPC + Protobuf in a hard way. But you can learn them quickly on Udemy. Yes, I have created a separate step by step course on Protobuf + gRPC along with Spring Boot integration for the next generation Microservice development. Click here for the special link.
Summary:
We were able to successfully demonstrate the gRPC Bidirectional Streaming API.
Learn more about gRPC.
- gRPC File Upload With Client Streaming
- gRPC Spring Boot Integration
- gRPC vs REST Performance Comparison
The source code is available here.
Happy learning 🙂
Terrific post but I was wondering if you could write a litte more on this subject? I’d be very grateful if you could elaborate a little bit further. Many thanks!