The gRPC bidirectional streaming RPC allows a client to send a stream of messages to a server, and the server responds with a stream of messages. With bidirectional streaming, the client sends multiple messages to the server, and the server can choose to respond immediately, as soon as it receives metadata, or wait for the client to start streaming. The client and server can read and write messages in any order, as the two streams are independent.
This article will explain how to implement gRPC bidirectional streaming RPC in Java.
In this article
Let’s get started!
Want to know more about the gRPC? Check other posts:
What is gRPC Bidirectional Streaming RPC?
In the bidirectional streaming RPC, a gRPC client sends a stream of messages, and the gRPC server responds with a stream of messages. The client can send a stream of messages and, optionally, a signal to notify the server about the end of the stream. The client sends this signal as an onComplete()
method call (more about this later). Similarly, the server can send a stream of messages and, optionally, a signal about the end of the stream.
It’s important to note that the client and server streams are completely independent. Therefore, it is completely up to the client and server to decide their communication pattern once the initial connection is established. Also, streams of messages are ordered, i.e. messages sent from the client or server are received in the same order.
Bidirectional Streaming Use Cases
You can use bidirectional streaming for advanced use cases, such as:
- For full duplex communication between server and client.
- Bulk upload, where a client sends chunks of data, and the server responds with an acknowledgement or resumption key.
- Application level flow control where a client or server can ask for more data as and when it needs.
Bidirectional Streaming Issues
The bidirectional streaming RPC is a powerful feature allowing full duplex communication between the client and server. However, it requires careful handling to avoid issues such as race conditions or deadlocks. Additionally, bidirectional streaming may not be necessary for simpler use cases.
Bidirectional Streaming Service Definition
In gRPC, a bidirectional streaming service is defined with stream keyword in both request and response as :
rpc BidirectionalRPC(stream Request) returns (stream Response);
Whereas other RPCs are defined as:
rpc UnaryRPC(Request) returns (Response);
rpc ClientStreamingRPC(stream Request) returns (Response);
rpc ServerStreamingRPC(Request) returns (stream Response);
Implementing a Bidirectional Streaming Service
Let’s consider a use case of a ride-hailing app. In our example use case, when a driver starts a trip, the client application (mobile app used by the driver) sends the real-time location to the server. The server periodically responds by sending an estimated charge, distance covered and time to complete the trip. When the trip finishes, the server summarises and sends the total time and distance covered and the final charge.
For the above use case, we can define the gRPC service as:
syntax = "proto3";
package ride_sharing.trip;
option java_package = "dev.techdozo.ride_sharing.trip";
message TripDataRequest {
RideType ride_type = 1;
double latitude = 2;
double longitude = 3;
}
message RideType {
string driver_id = 1;
string ride_id = 2;
}
message TripSummaryResponse {
double distance = 1;
double charge = 2;
double time = 3;
}
service TripService {
rpc SendTripData(stream TripDataRequest) returns (stream TripSummaryResponse);
}
Generating Client and Server stub
For the above protobuf service definition, we can generate client and server stubs using the potoc compiler. The protoc
compiler supports code generation in many different languages.
For our example, we will use Protobuf Gradle Plugin to generate source code in Java. The protocol buffer plugin assembles the Protobuf Compiler (protoc
) command line and uses it to generate Java source files from the proto files. The generated java source files should be added to the sourceSet
, in the build.gradle
, so they can be packaged along with Java classes.
sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/java'
}
}
}
Running the command gradlew build
generates source code in the directory build/generated/source/proto/main/grpc
and build/generated/source/proto/main/java
.
Implementing gRPC Server
The gRPC server implements services defined in the proto files and exposes those as RPC API.
In our case, the gRPC server has one RPC method rpc SendTripData(stream TripDataRequest) returns (stream TripSummaryResponse)
. Let’s, see how to implement server-side gRPC API for bidirectional streaming.
Implementing Service Definition
The bidirectional streaming server implementation is not very different from the client streaming implementation.
Similar to the client streaming implementation, we need to override sendTripData
method in the autogenerated abstract
class TripServiceGrpc.TripServiceImplBase
and return StreamObserver
object as:
public class RideSharingAPI extends TripServiceGrpc.TripServiceImplBase {
@Override
public StreamObserver<TripDataRequest> sendTripData(
StreamObserver<TripSummaryResponse> responseObserver) {
return new StreamObserver<TripDataRequest>() {
private String rideId;
@Override
public void onNext(TripDataRequest request) {
// called when a client sends a message
//....
}
@Override
public void onError(Throwable t) {
// called when a client sends an error
//....
}
@Override
public void onCompleted() {
// called when the client completes
//....
}
};
}
}
In the above code,
- The
StreamObserver
’sonNext
method is called every time client makes a streaming request. In this call, we can store the streaming data in the database for trip summary calculation and send the response back to the client with an estimated distance and charge. - The
StreamObserver
’sonError
method is called when a client sends an error. - The
StreamObserver
’sonCompleted
method is called when a client completes the streaming call by callingtripDataRequestStreamObserver.onCompleted()
. In this method, the server sends trip summary data to the client.
Server Implementation
public class RideSharingAPI extends TripServiceGrpc.TripServiceImplBase {
public static final int ESTIMATED = 1;
public static final int FINAL = 2;
private final TripRepository tripRepository;
private final TripSummaryService tripSummaryService;
@Override
public StreamObserver<TripDataRequest> sendTripData(
StreamObserver<TripSummaryResponse> responseObserver) {
return new StreamObserver<TripDataRequest>() {
private String rideId;
@Override
public void onNext(TripDataRequest request) {
// Save Ride data
tripRepository.saveTripData(
new RideData(
request.getRideType().getDriverId(),
request.getRideType().getRideId(),
request.getLatitude(),
request.getLongitude()));
//Send a response to the client after every call
//We may decide to batch the server response
this.rideId = request.getRideType().getRideId();
var tripSummary = tripSummaryService.getTripSummary(rideId);
responseObserver.onNext(
TripSummaryResponse.newBuilder()
.setDistance(tripSummary.getDistance())
.setCharge(tripSummary.getCharge())
.setTime((int) tripSummary.getTime())
.setStatus(ESTIMATED)
.build());
}
@Override
public void onError(Throwable t) {
log.error("Error while processing request ");
}
@Override
public void onCompleted() {
// Once Trip is completed then generate Trip summary
var tripSummary = tripSummaryService.getTripSummary(rideId);
responseObserver.onNext(
TripSummaryResponse.newBuilder()
.setDistance(tripSummary.getDistance())
.setCharge(tripSummary.getCharge())
.setTime((int) tripSummary.getTime())
.setStatus(FINAL)
.build());
responseObserver.onCompleted();
log.info("Request completed");
}
};
}
}
Registering Service
To expose the RPC service RideSharingAPI
, we need to create a gRPC server instance and register the service by calling the addService
method. The server listens to the specified port and dispatches all requests to the relevant service.
public RideSharingServer(int port) {
this.port = port;
var rideSharingAPI = new RideSharingAPI(...);
this.server = ServerBuilder.forPort(port).addService(rideSharingAPI).build();
}
Implementing gRPC Client Stub
The first thing we need to do to implement a gRPC client is to generate client stubs using the proto files and then create a gRPC channel specifying the server address and port as ManagedChannelBuilder.forAddress(host, port).usePlaintext().build()
.
The channel represents a virtual connection to an endpoint to perform RPC.
We can create the client stub from the managed channel as:
var managedChannel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext().build();
TripServiceGrpc.TripServiceStub tripServiceStub = TripServiceGrpc
.newStub(managedChannel);
Similar to client streaming, the bidirectional streaming RPC is always async. So, we need to create an async stub as TripServiceGrpc.newStub(managedChannel)
.
public class RideSharingClient {
private final String host;
private final int port;
public RideSharingClient(String host, int port) {
this.host = host;
this.port = port;
}
public void callServer() {
log.info("Calling Server..");
var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
TripServiceGrpc.TripServiceStub tripServiceStub = TripServiceGrpc.newStub(managedChannel);
//...
//...
}
}
Then, we can register a callback as:
StreamObserver<Service.TripDataRequest> tripDataRequestStreamObserver =
tripServiceStub.sendTripData(new TripSummaryCallback());
TripSummaryCallback
is of type StreamObserver
, which has three methods – onNext
, onError
and onCompleted
. The callback methods are called when the server needs to send some data. For example, the onNext
method is called when the server calls the responseObserver.onNext()
method.
TripSummaryCallback
Implementation
private static class TripSummaryCallback implements StreamObserver<TripSummaryResponse> {
@Override
public void onNext(TripSummaryResponse tripSummaryResponse) {
DecimalFormat df = new DecimalFormat("0.00");
log.info(
"Trip Summary : distance {}, charge {}, time {} ",
df.format(tripSummaryResponse.getDistance()),
df.format(tripSummaryResponse.getCharge()),
df.format((double) tripSummaryResponse.getTime() / 60));
}
@Override
public void onError(Throwable cause) {
log.error("Error occurred, cause {}", cause.getMessage());
}
@Override
public void onCompleted() {
log.info("Stream completed");
}
}
In the above code,
- The
onNext
method of thisTripSummaryCallback
is called once the server sendsTripSummary
via the call-back method
.responseObserver.onNext()
- The
onError
method is called when the server sends an error. For example, the server can throw an error if it detects the wrong latitude or longitude values. - In the case of client streaming, the
onCompleted
method is called when the server callsresponseObserver.onCompleted()
.
Client Implementation
public void callServer() {
log.info("Calling Server..");
var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
TripServiceGrpc.TripServiceStub tripServiceStub = TripServiceGrpc.newStub(managedChannel);
StreamObserver<Service.TripDataRequest> tripDataRequestStreamObserver =
tripServiceStub.sendTripData(new TripSummaryCallback());
// Create stream of random 1000 calls with random lat and long, with delay of 1 sec
IntStream.range(0, 100)
.mapToObj(
n -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return Service.TripDataRequest.newBuilder()
.setRideType(
Service.RideType.newBuilder()
.setDriverId("Driver_1")
.setRideId("Ride_1")
.build())
.setLatitude(ThreadLocalRandom.current().nextDouble(-90, 90))
.setLongitude(ThreadLocalRandom.current().nextDouble(-180, 180))
.build();
})
.forEach(tripDataRequestStreamObserver::onNext);
log.info("Calling complete..");
tripDataRequestStreamObserver.onCompleted();
Thread.sleep(30000);
}
private static class TripSummaryCallback implements StreamObserver<TripSummaryResponse> {
@Override
public void onNext(TripSummaryResponse tripSummaryResponse) {
DecimalFormat df = new DecimalFormat("0.00");
log.info(
"Trip Summary : distance {}, charge {}, time {} ",
df.format(tripSummaryResponse.getDistance()),
df.format(tripSummaryResponse.getCharge()),
df.format((double) tripSummaryResponse.getTime() / 60));
}
@Override
public void onError(Throwable cause) {
log.error("Error occurred, cause {}", cause.getMessage());
}
@Override
public void onCompleted() {
log.info("Stream completed");
}
}
In the above code, IntStream.range(0, 100)
is used to simulate 100 calls to the server with random latitude and longitude values with a delay of 100 ms between each request. In the end, the client calls tripDataRequestStreamObserver.onCompleted()
to indicate to the server that the stream has been completed. We have added Thread.sleep(30000)
to allow the server to respond otherwise, the program will exit before the server send any response.
When you run the application from IDE, you’ll see logs like:
Code Example
The working code example of this article is listed on GitHub . To run the example, clone the repository and import grpc-bidirectonal-streaming-rpc as a project in your favourite IDE as a Gradle
project.
Summary
Bidirectional streaming is a powerful feature of GRPC that enables the client and server to send and receive multiple streams of data simultaneously. It provides a simple and efficient way to implement real-time, interactive applications requiring continuous client and server communication.