gRPC: synchronous and asynchronous Server streaming RPC

gRPC, one of the most popular RPC frameworks for inter-process microservices communication, supports both unary and streaming RPC. Contrary to unary RPC, in gRPC streaming RPC, a client sends a single request, and in return, the server sends a stream of messages.

In this article, we will see how to implement server streaming RPC and how to handle errors in streaming response.

Want to know more about gRPC? Check the below posts. 

gRPC RPC type

The gRPC supports four types of RPC:

  1. Unary RPC: the client sends a single request and receives a single response.
  2. Server streaming RPC: the client sends a single request and in return, the server sends a stream of messages.
  3. Client streaming RPC: the client sends a stream of messages and the server responds with a single message.
  4. Bidirectional streaming RPC: in bidirectional streaming, both client and server send a stream of messages.

Additionally, a gRPC RPC can be synchronous or asynchronous.

  • Synchronous: a client call waits for the server to respond.
  • Asynchronous: client makes non-blocking calls to the server, and the server returns the response asynchronously.

What is gRPC server streaming RPC? 

In the server streaming RPC, a gRPC client sends a single message, and in reply, the gRPC server sends a bunch of messages. The streams of messages are followed by a signal to notify the client about the end of the stream. This signal is sent by the server in the form of a onComplete() method call (more about this later).

gRPC Server Streaming

Server streaming service definition 

Let’s consider a use case where a client calls the server with a list of ids and the server returns the stream of messages instead of returning all responses in one go. For such use cases, you can define a service with stream keyword in return type as returns (stream <message>). One example of such service is shown below. 

With respect to the service definition, the only difference between unary and server-streaming is the use of the stream keyword, and the rest remains the same.


syntax = "proto3";
package dev.techdozo.product.api;

service ProductService {
  // A server side streaming RPC
  // streams product information for a given productId
  rpc ListProduct(ListProductRequest) returns (stream GetProductResponse);
}

message GetProductResponse {
  Product product = 1;
}

message ListProductRequest {
  repeated string productId = 1;
}

message Product {
  string name = 1;
  string description = 2;
  double price = 3;
}

Protobuf code generation 

You can use the protoc compiler to generate the stubs for the client and server.

The protobuf gradle plugin is a Gradle plugin that compiles protobuf (.proto) files and generates client and server code in java.

Running command gradlew build generates source code in the directory build/generated/source/proto/main/grpc and build/generated/source/proto/main/java. The generated java source files should be added to the sourceSet so that they can be compiled along with Java sources.


sourceSets {
    main {
        java {
            srcDirs 'build/generated/source/proto/main/grpc'
            srcDirs 'build/generated/source/proto/main/java'
        }
    }
}

Code Example

The working code example of this article is listed on GitHub   . To run the example, clone repository, and import grpc-server-streaming-rpc as a project in your favorite IDE as Gradle project.

To build the project and generate client and server stubs, run the command .\gradlew clean build. You can start the gRPC server in IDE by running the main method of the class GrpcServer. The gRPC server runs on localhost:3000.

Project Structure

Implementing server code

The gRPC server provides the implementation of service methods defined in the proto files and exposes those as RPC methods. Broadly, implementing server streaming code includes three steps.

  1. Generate server stub: the first step is to generate server stubs. For this, run the command .\gradlew clean build. The Gradle protobuf plugin generates a server stub in the directory build/generated/source/proto/.
  2. Implement service class: the next step is to implement RPC service by extending from autogenerated class ProductServiceGrpc.ProductServiceImplBase and override listProduct method (check service implemenation code below). You can delegate the call to business logic from service class. For example, the service method listService can call repository to fetch products from database.
  3. Implement server code: and the last step is to implement server code and register service. This can done as ServerBuilder.forPort(port).addService(new ProductService()).build().

Service Implementation


public class ProductService extends ProductServiceGrpc.ProductServiceImplBase {

  private final ProductRepository productRepository;

  public ProductService() {
    this.productRepository = new ProductRepository();
  }

  @Override
  public void listProduct(
      Service.ListProductRequest request, StreamObserver<GetProductResponse> responseObserver) {

    log.info("Fetching products..");

    var productIds = request.getProductIdList();

    for (var productId : productIds) {
      // Fetch Product information from repository
      Optional<Product> optionalProduct = productRepository.get(productId);
      // If found send stream response else send error code
      if (optionalProduct.isPresent()) {
        var product = optionalProduct.get();
        // If found build response
        var productResponse =
            Service.Product.newBuilder()
                .setName(product.getName())
                .setDescription(product.getDescription())
                .setPrice(product.getPrice())
                .build();
        var getProductResponse =
            GetProductResponse.newBuilder().setProduct(productResponse).build();

        responseObserver.onNext(getProductResponse);
        log.info("Called onNext for id {}", productId);
      } else {
        // Error code if product detail is not found
        responseObserver.onError(new StatusException(Status.NOT_FOUND));
      }
    }
    //Indicates that stream is done.
    responseObserver.onCompleted();
    log.info("Finished calling Product API service..");
  }
}

In the above code, the call to the responseObserver.onCompleted() indicates that the stream response has completed.

Server code


public class GrpcServer {
  private final int port;
  private final Server server;

  public GrpcServer(int port) {
    this.port = port;
    var productService = new ProductService();
    this.server = ServerBuilder.forPort(port).addService(productService).build();
  }

  public void start() throws IOException {
    log.info("Starting Server..");
    server.start();
    log.info("Server Started on port {} ", port);

    Runtime.getRuntime()
        .addShutdownHook(
            new Thread(
                () -> {
                  try {
                    this.stop();
                  } catch (InterruptedException e) {
                    e.printStackTrace();
                  }
                }));
  }

  private void stop() throws InterruptedException {
    log.info("Stopping Server..");
    if (server != null) {
      server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
    }
  }

  private void blockUntilShutDown() throws InterruptedException {
    if (this.server != null) {
      server.awaitTermination();
    }
  }

  public static void main(String[] args) throws IOException, InterruptedException {
    var productServer = new GrpcServer(3000);
    productServer.start();
    productServer.blockUntilShutDown();
  }
}

Running gRPC server

You can start the gRPC server from IDE. For that, you can run the main method of dev.techdozo.product.GrpcServer. However, for production, you may want to deploy the server as a container or as a stand-alone application.

Implementing gRPC client channel

A gRPC channel represents a virtual connection to an endpoint to perform RPC.

You can create a gRPC channel specifying the server address and port as ManagedChannelBuilder.forAddress(host, port).usePlaintext().build().


var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();

Creation of a channel is expensive, so make sure to create a channel once and reuse it.

gRPC supports two types of client stubs:

  • blocking/synchronous stub: in this stub, RPC call waits for the server to respond.
  • non-blocking/asynchronous stub: client makes non-blocking calls to the server, where the response is returned asynchronously.

Implementing blocking client stub

To create a synchronous/blocking client stub, use the newBlockingStub static method of auto-generated ProductServiceGrpc.


var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
var productServiceBlockingStub = ProductServiceGrpc.newBlockingStub(managedChannel);

var productRequest =
    ListProductRequest.newBuilder()
        .addAllProductId(List.of("apple-123", "apple-124", "apple-125"))
        .build();

var productResponse = productServiceBlockingStub.listProduct(productRequest);

try {
  while (productResponse.hasNext()) {
    log.info("Received Product from server, info {}", productResponse.next());
  }
} catch (StatusRuntimeException statusRuntimeException) {
  log.error("Error, message {}", statusRuntimeException.getMessage());
}

To run the blocking client example, run the main method of the class dev.techdozo.order.client.UnaryGrpcBlockingClient from IDE. At the same time, make sure that the gRPC server is running. Once run, the client prints log like:


[INFO ] 2021-12-19 15:11:19.618 [main] UnaryGrpcBlockingClient - Received Product from server, info product {
  name: "Apple iPhone 12 Pro (128GB)"
  description: "Apple iPhone 12 Pro (128GB) - Graphite"
  price: 1617.29
}

As you can infer from logs, both request and response are in the same thread [main]. In other words, the client blocks until the response are returned by the server.

Implementing asynchronous client stub

For most use cases a blocking RPC suffices. However, the blocking RPC waits for the server to return a response and thus wasting precious CPU cycles. You can use an asynchronous client stub to overcome this problem by registering a callback. This callback is called in a different thread, once the server sends the response.

To implement asynchronous client stubs, use the newStub static method of ProductServiceGrpc.


var productServiceAsyncStub = ProductServiceGrpc.newStub(managedChannel);

and register a callback as:


var productRequest =
    Service.ListProductRequest.newBuilder()
        .addAllProductId(List.of("apple-123", "apple-124", "apple-125"))
        .build();
productServiceAsyncStub.listProduct(productRequest, new ProductCallback());

where callback is defined as:


class ProductCallback implements StreamObserver<GetProductResponse> {

  @Override
  public void onNext(GetProductResponse value) {
    log.info("Received product, {}", value);
  }

  @Override
  public void onError(Throwable cause) {
    log.error("Error occurred, cause {}", cause.getMessage());
  }

  @Override
  public void onCompleted() {
    log.info("Stream completed");
  }
}

To run the asynchronous client example, run the main method of class dev.techdozo.order.client.UnaryGrpcAsynClient from IDE. Once run, the client prints log like:


[INFO ] 2021-12-19 15:16:11.798 [main] UnaryGrpcAsynClient - Calling Server..
[INFO ] 2021-12-19 15:16:14.046 [grpc-default-executor-1] UnaryGrpcAsynClient - Received product, product {
  name: "Apple iPhone 12 Pro Max (128GB)"
  description: "Apple iPhone 12 Pro (128GB) - Red"
  price: 1752.59
}

Did you notice that the callback happens in a different thread grpc-default-executor-1 than the main thread?

For the callback, gRPC uses a cached thread pool that creates new threads as needed but will reuse previously constructed threads when they are available. If you want you can provide your own thread pool as:


var executorService = Executors.newFixedThreadPool(10);
var managedChannel =
    ManagedChannelBuilder.forAddress(host, port)
        .executor(executorService)
        .usePlaintext()
        .build();

Handling error in stream response

If you recall, we sent the error response Status.NOT_FOUND in case the product is not found for a given id. Sending error response terminates the stream. This can be a problem if you want to read complete stream response even in case of error and handle error separately.


public void listProduct(
    Service.ListProductRequest request, StreamObserver<GetProductResponse> responseObserver) {

  var productIds = request.getProductIdList();

  for (var productId : productIds) {
    
	Optional<Product> optionalProduct = productRepository.get(productId);
    
    if (optionalProduct.isPresent()) {
      var product = optionalProduct.get();
      // Prepare response
      ....
      responseObserver.onNext(getProductResponse);
    } else {
      // Error code if product detail is not found
      responseObserver.onError(new StatusException(Status.NOT_FOUND));
    }
  }
  // Indicates that stream is done.
  responseObserver.onCompleted();
}

Consider a scenario when you call server with product ids product-1, product- 2, product- 3 where product-2 is NOT_FOUND on the server and server responds as responseObserver.onError(new StatusException(Status.NOT_FOUND)). In this case, the server will throw an error for product-3 as “Stream was terminated by error, no further calls are allowed” and on the client-side, you will not get the response of product-3 even if you are interested.


SEVERE: Exception while executing runnable io.grpc.internal.Ser[email protected]42f8c429
java.lang.IllegalStateException: Stream was terminated by error, no further calls are allowed
	at com.google.common.base.Preconditions.checkState(Preconditions.java:511)

One important point to note here is that calling onError on the server just terminates the stream and it has no impact on the gRPC connection. If you want, you can reuse the same ManagedChannel, which represents a virtual connection to the server, and call the server again.

How to handle errors in gRPC stream?

As you know by now, sending an error response terminates the stream. So clearly, you can’t call the responseObserver.onError method on the gRPC server in the case of business errors such as NOT_FOUND. To handle such errors, you need to send the error response as part of the stream message itself. This is where the protobuf oneOf feature comes in handy.

Using the protobuf oneOf, you can specify that the server either sends an error response or a valid response as:


message GetProductResponse {
  oneof product_response {
    Product product = 1;
    google.rpc.Status error = 2;
  }
}

On the gRPC server, you need to make changes to send either a valid response or error response as:


for (var productId : productIds) {
  
  // Fetch Product information from repository
  Optional<Product> optionalProduct = productRepository.get(productId);
  // If found send stream response else send error code

  Service.GetProductResponse getProductResponse;
  if (optionalProduct.isPresent()) {
    var product = optionalProduct.get();
    // If found build response
    var productResponse =
        Service.Product.newBuilder()
            .setName(product.getName())
            .setDescription(product.getDescription())
            .setPrice(product.getPrice())
            .build();
    getProductResponse = GetProductResponse.newBuilder().setProduct(productResponse).build();

  } else {
    com.google.rpc.Status status =
        com.google.rpc.Status.newBuilder()
            .setCode(Code.NOT_FOUND.getNumber())
            .setMessage("Product id not found")
            .build();
    getProductResponse = GetProductResponse.newBuilder().setError(status).build();
  }
  responseObserver.onNext(getProductResponse);
}
// Indicates that stream is done.
responseObserver.onCompleted();

On gRPC client, you can take appropriate action as:


var productResponse = productServiceBlockingStub.listProduct(productRequest);

while (productResponse.hasNext()) {
  var getProductResponse = productResponse.next();
  var productResponseCase = getProductResponse.getProductResponseCase();

  if (productResponseCase == PRODUCT) {
    log.info("Received Product from server, info {}", getProductResponse);
  } else if (productResponseCase == ERROR)
    // Some business logic
    log.info("Received Error {}", getProductResponse.getError());
}

Summary

gRPC is a popular remote procedure call (RPC) framework. The gRPC supports both unary RPC and streaming RPC. In streaming RPC, a client sends a single request and receives a bunch of messages as the response.

A streaming RPC in gRPC can be synchronous or asynchronous. In synchronous RPC, a client call waits for the server to respond. As the name suggests, in asynchronous RPC the server returns the response asynchronously.

Handling business errors using the normal construct of calling responseObserver.onError() doesn’t work. We can use protobuf oneOf to send either a valid response or an error response back to the client.

if you like this article, then please follow me on LinkedIn  for more tips on #software architecture.

Social Share !
Default image
Pankaj
Software Architect @ Schlumberger ``` Cloud | Microservices | Programming | Kubernetes | Architecture | Machine Learning | Java | Python ```