gRPC, one of the most popular RPC frameworks for inter-process microservices communication. It supports both unary and streaming RPC. Contrary to unary RPC, in gRPC streaming RPC, a client sends a single request; in return, the server sends a stream of messages.
In this article, we will see how to implement server streaming RPC and how to handle errors in streaming response.
Want to know more about gRPC? Check the below posts.
gRPC RPC Type
The gRPC supports four types of RPC:
- Unary RPC: the client sends a single request and receives a single response.
- Server streaming RPC: the client sends a single request; in return, the server sends a stream of messages.
- Client streaming RPC: the client sends a stream of messages, and the server responds with a single message.
- Bidirectional streaming RPC: in bidirectional streaming, both the client and server send a stream of messages.
Additionally, a gRPC RPC can be synchronous or asynchronous.
- Synchronous: a client call waits for the server to respond.
- Asynchronous: client makes non-blocking calls to the server, and the server returns the response asynchronously.
What is gRPC Server Streaming RPC?
In the server streaming RPC, a gRPC client sends a single message, and in reply, the gRPC server sends a bunch of messages. The streams of messages are followed by a signal to notify the client about the end of the stream. The server sends this signal as a onComplete()
method call (more about this later).
Server Streaming Service Definition
Let’s consider a use case where a client calls the server with a list of ids, and the server returns the stream of messages instead of returning all responses in one go. For such use cases, you can define a service with stream
keyword in return type as returns (stream <message>)
. One example of such a service is shown below.
As far as the service definition is concerned, the only difference between unary and server-streaming is the use of the stream
keyword, and the rest remains the same.
syntax = "proto3";
package dev.techdozo.product.api;
service ProductService {
// A server side streaming RPC
// streams product information for a given productId
rpc ListProduct(ListProductRequest) returns (stream GetProductResponse);
}
message GetProductResponse {
Product product = 1;
}
message ListProductRequest {
repeated string productId = 1;
}
message Product {
string name = 1;
string description = 2;
double price = 3;
}
Protobuf Code Generation
You can use the protoc compiler to generate the stubs for the client and server.
The protobuf Gradle plugin is a Gradle plugin that compiles protobuf (.proto) files and generates client and server code in java.
Running command gradlew build
generates source code in the directory build/generated/source/proto/main/grpc
and build/generated/source/proto/main/java
. The generated java source files should be added to the sourceSet so that they can be compiled along with Java sources.
sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/java'
}
}
}
Code Example
The working code example of this article is listed on GitHub . To run the example, clone the repository and import grpc-server-streaming-rpc as a project in your favourite IDE as a Gradle project.
To build the project and generate client and server stubs, run the command .\gradlew clean build
. You can start the gRPC server in IDE by running the main method of the class GrpcServer
. The gRPC server runs on localhost:3000.
Implementing Server Code
The gRPC server provides the implementation of service
methods defined in the proto files and exposes those as RPC methods. Broadly, implementing server streaming code includes three steps.
- Generate server stub: the first step is to generate server stubs. For this, run the command
.\gradlew clean build
. The Gradle protobuf plugin generates a server stub in the directorybuild/generated/source/proto/
. - Implement service class: the next step is to implement RPC service by extending from autogenerated class
ProductServiceGrpc.ProductServiceImplBase
and overridelistProduct
method (check service implemenation code below). You can delegate the call to business logic from service class. For example, the service methodlistService
can callrepository
to fetchproducts
from database. - Implement server code: and the last step is to implement server code and register service. This can done as
ServerBuilder.forPort(port).addService(new ProductService()).build()
.
Service Implementation
public class ProductService extends ProductServiceGrpc.ProductServiceImplBase {
private final ProductRepository productRepository;
public ProductService() {
this.productRepository = new ProductRepository();
}
@Override
public void listProduct(
Service.ListProductRequest request, StreamObserver<GetProductResponse> responseObserver) {
log.info("Fetching products..");
var productIds = request.getProductIdList();
for (var productId : productIds) {
// Fetch Product information from repository
Optional<Product> optionalProduct = productRepository.get(productId);
// If found send stream response else send error code
if (optionalProduct.isPresent()) {
var product = optionalProduct.get();
// If found build response
var productResponse =
Service.Product.newBuilder()
.setName(product.getName())
.setDescription(product.getDescription())
.setPrice(product.getPrice())
.build();
var getProductResponse =
GetProductResponse.newBuilder().setProduct(productResponse).build();
responseObserver.onNext(getProductResponse);
log.info("Called onNext for id {}", productId);
} else {
// Error code if product detail is not found
responseObserver.onError(new StatusException(Status.NOT_FOUND));
}
}
//Indicates that stream is done.
responseObserver.onCompleted();
log.info("Finished calling Product API service..");
}
}
In the above code, the call to the responseObserver.onCompleted()
indicates that the stream response has completed.
Server code
public class GrpcServer {
private final int port;
private final Server server;
public GrpcServer(int port) {
this.port = port;
var productService = new ProductService();
this.server = ServerBuilder.forPort(port).addService(productService).build();
}
public void start() throws IOException {
log.info("Starting Server..");
server.start();
log.info("Server Started on port {} ", port);
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
try {
this.stop();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
}
private void stop() throws InterruptedException {
log.info("Stopping Server..");
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
private void blockUntilShutDown() throws InterruptedException {
if (this.server != null) {
server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
var productServer = new GrpcServer(3000);
productServer.start();
productServer.blockUntilShutDown();
}
}
Running gRPC Server
You can start the gRPC server from IDE. For that, you can run the main method of dev.techdozo.product.GrpcServer
. However, for production, you may want to deploy the server as a container or as a stand-alone application.
Implementing gRPC Client Channel
A gRPC channel represents a virtual connection to an endpoint to perform RPC.
You can create a gRPC channel specifying the server address and port as ManagedChannelBuilder.forAddress(host, port).usePlaintext().build()
.
var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
Creation of a channel is expensive, so make sure to create a channel once and reuse it.
gRPC supports two types of client stubs:
- blocking/synchronous stub: The RPC call waits for the server to respond in this stub.
- non-blocking/asynchronous stub: client makes non-blocking calls to the server, where the response is returned asynchronously.
Implementing Blocking Client Stub
To create a synchronous/blocking client stub, use the newBlockingStub
static method of auto-generated ProductServiceGrpc
.
var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
var productServiceBlockingStub = ProductServiceGrpc.newBlockingStub(managedChannel);
var productRequest =
ListProductRequest.newBuilder()
.addAllProductId(List.of("apple-123", "apple-124", "apple-125"))
.build();
var productResponse = productServiceBlockingStub.listProduct(productRequest);
try {
while (productResponse.hasNext()) {
log.info("Received Product from server, info {}", productResponse.next());
}
} catch (StatusRuntimeException statusRuntimeException) {
log.error("Error, message {}", statusRuntimeException.getMessage());
}
To run the blocking client example, run the main method of the class dev.techdozo.order.client.UnaryGrpcBlockingClient
from IDE. At the same time, make sure that the gRPC server is running. Once run, the client prints log like:
[INFO ] 2021-12-19 15:11:19.618 [main] UnaryGrpcBlockingClient - Received Product from server, info product {
name: "Apple iPhone 12 Pro (128GB)"
description: "Apple iPhone 12 Pro (128GB) - Graphite"
price: 1617.29
}
As you can infer from logs, both request and response are in the same thread [main
]. In other words, the client blocks until the server return the response.
Implementing Asynchronous Client Stub
For most use cases, a blocking RPC suffices. However, the blocking RPC waits for the server to return a response and thus wasting precious CPU cycles. You can use an asynchronous client stub to overcome this problem by registering a callback. This callback is called in a different thread, once the server sends the response.
To implement asynchronous client stubs, use the newStub
static method of ProductServiceGrpc
.
var productServiceAsyncStub = ProductServiceGrpc.newStub(managedChannel);
and register a callback as:
var productRequest =
Service.ListProductRequest.newBuilder()
.addAllProductId(List.of("apple-123", "apple-124", "apple-125"))
.build();
productServiceAsyncStub.listProduct(productRequest, new ProductCallback());
where callback is defined as:
class ProductCallback implements StreamObserver<GetProductResponse> {
@Override
public void onNext(GetProductResponse value) {
log.info("Received product, {}", value);
}
@Override
public void onError(Throwable cause) {
log.error("Error occurred, cause {}", cause.getMessage());
}
@Override
public void onCompleted() {
log.info("Stream completed");
}
}
To run the asynchronous client example, run the main method of class dev.techdozo.order.client.UnaryGrpcAsynClient
from IDE. Once run, the client prints logs like this:
[INFO ] 2021-12-19 15:16:11.798 [main] UnaryGrpcAsynClient - Calling Server..
[INFO ] 2021-12-19 15:16:14.046 [grpc-default-executor-1] UnaryGrpcAsynClient - Received product, product {
name: "Apple iPhone 12 Pro Max (128GB)"
description: "Apple iPhone 12 Pro (128GB) - Red"
price: 1752.59
}
Did you notice that the callback happens in a different thread grpc-default-executor-1 than the main thread?
For the callback, gRPC uses a cached thread pool that creates new threads as needed but will reuse previously constructed threads when they are available. If you want you can provide your own thread pool as:
var executorService = Executors.newFixedThreadPool(10);
var managedChannel =
ManagedChannelBuilder.forAddress(host, port)
.executor(executorService)
.usePlaintext()
.build();
Handling Errors in Stream Response
If you recall, we sent the error response Status.NOT_FOUND
in case the product is not found for a given id. Sending an error response terminates the stream. This can be a problem if you want to read the complete stream response even in case of an error and handle the error separately.
public void listProduct(
Service.ListProductRequest request, StreamObserver<GetProductResponse> responseObserver) {
var productIds = request.getProductIdList();
for (var productId : productIds) {
Optional<Product> optionalProduct = productRepository.get(productId);
if (optionalProduct.isPresent()) {
var product = optionalProduct.get();
// Prepare response
....
responseObserver.onNext(getProductResponse);
} else {
// Error code if product detail is not found
responseObserver.onError(new StatusException(Status.NOT_FOUND));
}
}
// Indicates that stream is done.
responseObserver.onCompleted();
}
Consider a scenario when you call server with product ids product-1
, product- 2
, product- 3
where product-2
is NOT_FOUND
on the server and server responds as responseObserver.onError(new StatusException(Status.NOT_FOUND))
. In this case, the server will throw an error for product-3
as “Stream was terminated by error, no further calls are allowed” and on the client-side, you will not get the response of product-3
even if you are interested.
SEVERE: Exception while executing runnable io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@42f8c429
java.lang.IllegalStateException: Stream was terminated by error, no further calls are allowed
at com.google.common.base.Preconditions.checkState(Preconditions.java:511)
One important point to note here is that calling onError on the server terminates the stream, and it has no impact on the gRPC connection. If you want, you can reuse the same ManagedChannel, which represents a virtual connection to the server, and call the server again.
How to Handle Errors in gRPC Stream?
As you know by now, sending an error response terminates the stream. So clearly, you can’t call the responseObserver.onError
method on the gRPC server in the case of business errors such as NOT_FOUND
. To handle such errors, you need to send the error response as part of the stream message itself. This is where the protobuf oneOf
feature comes in handy.
Using the protobuf oneOf
, you can specify that the server either sends an error response or a valid response as:
message GetProductResponse {
oneof product_response {
Product product = 1;
google.rpc.Status error = 2;
}
}
On the gRPC server, you need to make changes to send either a valid response or an error response as:
for (var productId : productIds) {
// Fetch Product information from repository
Optional<Product> optionalProduct = productRepository.get(productId);
// If found send stream response else send error code
Service.GetProductResponse getProductResponse;
if (optionalProduct.isPresent()) {
var product = optionalProduct.get();
// If found build response
var productResponse =
Service.Product.newBuilder()
.setName(product.getName())
.setDescription(product.getDescription())
.setPrice(product.getPrice())
.build();
getProductResponse = GetProductResponse.newBuilder().setProduct(productResponse).build();
} else {
com.google.rpc.Status status =
com.google.rpc.Status.newBuilder()
.setCode(Code.NOT_FOUND.getNumber())
.setMessage("Product id not found")
.build();
getProductResponse = GetProductResponse.newBuilder().setError(status).build();
}
responseObserver.onNext(getProductResponse);
}
// Indicates that stream is done.
responseObserver.onCompleted();
On gRPC client, you can take appropriate action as:
var productResponse = productServiceBlockingStub.listProduct(productRequest);
while (productResponse.hasNext()) {
var getProductResponse = productResponse.next();
var productResponseCase = getProductResponse.getProductResponseCase();
if (productResponseCase == PRODUCT) {
log.info("Received Product from server, info {}", getProductResponse);
} else if (productResponseCase == ERROR)
// Some business logic
log.info("Received Error {}", getProductResponse.getError());
}
Summary
gRPC is a popular remote procedure call (RPC) framework. The gRPC supports both unary RPC and streaming RPC. In streaming RPC, a client sends a single request and receives a bunch of messages as the response.
A streaming RPC in gRPC can be synchronous or asynchronous. In synchronous RPC, a client call waits for the server to respond. As the name suggests, in asynchronous RPC the server returns the response asynchronously.
Handling business errors using the normal construct of calling responseObserver.onError()
doesn’t work. We can use protobuf oneOf
to send either a valid response or an error response back to the client.
if you like this article, then please follow me on LinkedIn for more tips on #software architecture.
Discussion about this post