While REST is the de facto standard for public API, gRPC is increasingly becoming one of the most popular choices for inter-process microservices communication. gRPC is a strongly typed, polyglot, efficient, and fast RPC framework that supports unary (request and reply) and streaming (both client and server-side) communication patterns.
The gRPC client streaming RPC allows a client to send a stream of messages to a server. This is different from the unary RPC, where the client sends a single request, and the server sends back a single response. With client streaming, the client sends multiple messages to the server, and the server can process them as they arrive before sending a single response back to the client. This is useful for scenarios where the client has a lot of data to send to the server and wants to stream it in chunks rather than sending it all at once.
In this article, we will understand how to implement gRPC client streaming RPC in Java.
Let’s get started!
Want to know more about the gRPC? Check other posts:
What is gRPC Client Streaming RPC?
In the client streaming RPC, a gRPC client sends a stream of messages, and the gRPC server responds with a single message. The streams of messages can be followed by a signal to notify the server about the end of the stream. The client sends this signal as an onComplete()
method call (more about this later).
Client Streaming Use Cases
The gRPC client streaming can be used to address all streaming scenarios, such as:
- Real-time location data sharing.
- Multiplayer games.
- When a client needs to send lots of data to the server.
- Real-time driver location in ride-hailing apps.
The gRPC client streaming is used by Lyft to share real-time driver locations with the user.
Client Streaming Service Definition
Let’s consider a use case of a ride-hailing app. In this example, as soon as the driver starts the trip, the client application (mobile app used by the driver) sends the real-time location to the server. When the trip finishes, the server summarises and sends the total distance covered and the charge.
For the above use case, we can define the gRPC service with stream keyword in request as:
syntax = "proto3";
package ride_sharing.trip;
option java_package = "dev.techdozo.ride_sharing.trip";
message TripDataRequest {
RideType ride_type = 1;
double latitude = 2;
double longitude = 3;
}
message RideType {
string driver_id = 1;
string ride_id = 2;
}
message TripSummaryResponse {
double distance = 1;
double charge = 2;
}
service TripService {
rpc SendTripData(stream TripDataRequest) returns (TripSummaryResponse);
}
Code Generation
For the above protobuf service definition, we can generate client and server stubs using the potoc compiler. The protoc
compiler supports code generation in many different languages.
For our example, we will use Protobuf Gradle Plugin to generate source code in Java. The protocol buffer plugin assembles the Protobuf Compiler (protoc
) command line and uses it to generate Java source files from the proto files. The generated java source files should be added to the sourceSet
, in the build.gradle
, so they can be compiled along with Java classes.
sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/java'
}
}
}
Running the command gradlew build
generates source code in the directory build/generated/source/proto/main/grpc
and build/generated/source/proto/main/java
.
gRPC Server
The gRPC server implements services defined in the proto files and expose those as RPC API.
In our case, the gRPC server has one RPC method rpc SendTripData(stream TripDataRequest) returns (TripSummaryResponse)
. Let’s, see how to implement server-side service gRPC API for client streaming.
Implementing Service Definition
Once the server stub is generated, we need to override sendTripData
method in the autogenerated abstract
class TripServiceGrpc.TripServiceImplBase
and return StreamObserver
object as:
public class RideSharingAPI extends TripServiceGrpc.TripServiceImplBase {
@Override
public StreamObserver<TripDataRequest> sendTripData(
StreamObserver<TripSummaryResponse> responseObserver) {
return new StreamObserver<TripDataRequest>() {
private String rideId;
@Override
public void onNext(TripDataRequest request) {
// called when a client sends a message
//....
}
@Override
public void onError(Throwable t) {
// called when a client sends an error
//....
}
@Override
public void onCompleted() {
// called when client completes
//....
}
};
}
}
In the above code,
- The
StreamObserver
’sonNext
method is called every time client makes a streaming request. Here, we can store the streaming data in the database for trip summary calculation. - The
StreamObserver
’sonError
method is when a client sends an error. - The
StreamObserver
’sonCompleted
method is when a client completes the streaming call by callingtripDataRequestStreamObserver.onCompleted()
. In this method, the server sends trip summary data to the client.
Server Implementation
public class RideSharingAPI extends TripServiceGrpc.TripServiceImplBase {
private final TripRepository tripRepository;
private final TripSummaryService tripSummaryService;
@Override
public StreamObserver<TripDataRequest> sendTripData(
StreamObserver<TripSummaryResponse> responseObserver) {
return new StreamObserver<TripDataRequest>() {
private String rideId;
@Override
public void onNext(TripDataRequest request) {
// Save Ride data
tripRepository.saveTripData(
new RideData(
request.getRideType().getDriverId(),
request.getRideType().getRideId(),
request.getLatitude(),
request.getLongitude()));
log.info("Driver Id {} - object {}", request.getRideType().getDriverId(), this);
this.rideId = request.getRideType().getRideId();
}
@Override
public void onError(Throwable t) {
log.error("Error while processing request ");
}
@Override
public void onCompleted() {
// Once the trip is completed then, generate trip summary
var tripSummary = tripSummaryService.getTripSummary(rideId);
responseObserver.onNext(
TripSummaryResponse.newBuilder()
.setDistance(tripSummary.getDistance())
.setCharge(tripSummary.getCharge())
.build());
log.info("Request completed");
}
};
}
}
Registering Service
To expose the RPC service RideSharingAPI
, we need to create a gRPC server instance and register the service by calling the addService
method. The server listens to the specified port and dispatches all requests to the relevant service.
public RideSharingServer(int port) {
this.port = port;
var rideSharingAPI = new RideSharingAPI(...);
this.server = ServerBuilder.forPort(port).addService(rideSharingAPI).build();
}
Implementing gRPC Client Stub
The first thing we need to do to implement a gRPC client is to generate client stubs using the proto files and then create a gRPC channel specifying the server address and port as ManagedChannelBuilder.forAddress(host, port).usePlaintext().build()
.
The channel represents a virtual connection to an endpoint to perform RPC.
You can create the client stub using the newly created channel:
var managedChannel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext().build();
TripServiceGrpc.TripServiceStub tripServiceStub = TripServiceGrpc
.newStub(managedChannel);
There are two types of client stubs:
- Blocking: The
BlockingStub
, which waits until it receives a server response. - Non-Blocking: The
NonBlockingStub
doesn’t wait for a server response but instead registers an observer to receive the response.
As the client streaming API is always async, we need to create an async stub as TripServiceGrpc.newStub(managedChannel)
.
public class RideSharingClient {
private final String host;
private final int port;
public RideSharingClient(String host, int port) {
this.host = host;
this.port = port;
}
public void callServer() {
log.info("Calling Server..");
var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
TripServiceGrpc.TripServiceStub tripServiceStub = TripServiceGrpc.newStub(managedChannel);
//...
//...
}
}
Similar to the unary asynchronous client stub, we can register a callback as:
StreamObserver<Service.TripDataRequest> tripDataRequestStreamObserver =
tripServiceStub.sendTripData(new TripSummaryCallback());
TripSummaryCallback
is of type StreamObserver
, which has three methods – onNext
, onError
and onCompleted
. The callback methods are called when the server needs to send some data. For example, the onNext
method is called when the server calls the onCompleted
method.
TripSummaryCallback
Implementation
private static class TripSummaryCallback implements StreamObserver<TripSummaryResponse> {
@Override
public void onNext(TripSummaryResponse tripSummaryResponse) {
log.info(
"Trip Summary : distance {}, charge {} ",
tripSummaryResponse.getDistance(),
tripSummaryResponse.getCharge());
}
@Override
public void onError(Throwable cause) {
log.error("Error occurred, cause {}", cause.getMessage());
}
@Override
public void onCompleted() {
log.info("Stream completed");
}
}
In the above code,
- The
onNext
method of thisTripSummaryCallback
is called once the server sendsTripSummary
via the call-back methodonCompleted
. - The
onError
method is called when the server sends an error. For example, the server can throw an error if it detects the wrong latitude or longitude values. - In the case of client streaming, the
onCompleted
method is never called. As the same API is used for bidirectional and client streaming, we can leaveonCompleted
unimplemented.
Client Implementation
public void callServer() {
log.info("Calling Server..");
var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
TripServiceGrpc.TripServiceStub tripServiceStub = TripServiceGrpc.newStub(managedChannel);
StreamObserver<Service.TripDataRequest> tripDataRequestStreamObserver =
tripServiceStub.sendTripData(new TripSummaryCallback());
IntStream.range(0, 1000)
.mapToObj(
n ->
Service.TripDataRequest.newBuilder()
.setRideType(
Service.RideType.newBuilder()
.setDriverId("Driver_1")
.setRideId("Ride_" + n)
.build())
.setLatitude(ThreadLocalRandom.current().nextDouble(-90, 90))
.setLongitude(ThreadLocalRandom.current().nextDouble(-180, 180))
.build())
.forEach(tripDataRequestStreamObserver::onNext);
log.info("Calling complete..");
tripDataRequestStreamObserver.onCompleted();
Thread.sleep(30000);
}
- We have used
IntStream.range(0, 1000)...forEach(tripDataRequestStreamObserver::onNext)
to demonstrate the client streaming scenario. In real life, this call will be made by a mobile client by reading GPS data. - Once the trip completes, the client indicates to the server by calling
tripDataRequestStreamObserver.onCompleted()
. Then the server computes the trip summary and calls the callback methodonCompleted
, and the client receives the trip summary inonNext
method ofTripSummaryCallback
. Thread.sleep(30000)
so the program doesn’t exit before the client receives the callback.
Code Example
The working code example of this article is listed on GitHub . To run the example, clone the repository and import grpc-client-streaming as a project in your favourite IDE as a Gradle
project.
Summary
gRPC has built-in streaming support. The gRPC client streaming can be used for many use cases, such as real-time location data sharing, multiplayer games, when a client needs to send lots of data to the server, etc.