gRPC: synchronous and asynchronous unary RPC in Java

The gRPC is a remote procedure call (RPC) framework that’s used for inter microservices communication. The gRPC supports four types of RPC:

  1. Unary RPC: the client sends a single request and receives a single response.
  2. Server streaming RPC: the client sends a single request and in return, the server sends a stream of messages.
  3. Client streaming RPC: the client sends a stream of messages and the server responds with a single message.
  4. Bidirectional streaming RPC: in bidirectional streaming, both client and server send a stream of messages.

Additionally, a gRPC unary RPC can be synchronous or asynchronous.

  • Synchronous: a client call waits for the server to respond.
  • Asynchronous: client makes non-blocking calls to the server, and the server returns the response asynchronously.

In this article, we will see how to implement gRPC synchronous and asynchronous unary RPC in java.

Let’s get started:

What is unary RPC in gRPC?

In gRPC unary RPC, a client sends a single request and receives a single response.

gRPC Unary RPC

Like many RPC frameworks, gRPC is based on the idea of defining a remote service, specifying which remote methods can be called along with their parameters and return types. In gRPC, we define remote service and methods in protocol buffers (protobuf) format in .proto file(s).

For example, let’s consider a remote service ProductService that exposes remote methods to create and query product information. We can define a unary RPC, to get product information for a given id, as:


syntax = "proto3";
package dev.techdozo.product.api;

service ProductService {
  // A simple RPC.
  // returns Product for a given id
  rpc GetProduct(GetProductRequest) returns (GetProductResponse);
}

message GetProductRequest {
  string product_id = 1;
}

message GetProductResponse {
  Product product = 1;
}

message Product {
  string name = 1;
  string description = 2;
  double price = 3;
}

Let’s understand the above protobuf definition.

  • Service: service is a logical collection of remote methods that are exposed by the server. In protobuf, we define service by using the service keyword. Likewise, we can define an RPC method is by using rpc keyword. For example, rpc GetProduct(GetProductRequest) returns (GetProductResponse) defines an RPC method that takes a message GetProductRequest and returns GetProductResponse.
  • A protoc compiler takes the protobuf definition file (.proto) and generate the client and server stubs.
  • Message: message is a binary data structure that is exchanged between client and server. Field numbers, such as name = 1, are used to identify fields in the binary encoded data.

To simplify, the GetProduct RPC takes product_id and returns Product. You can read more about protocol buffers in the language guide.

Protobuf code generation

The protoc compiler, which supports code generation in many different languages, is used for generating client and server code. In this example, we will use the protobuf gradle plugin to generate client and server code in java.

The protocol buffer plugin assembles the protobuf compiler (protoc) command line and uses it to generate Java source files from the .proto files. The generated java source files should be added to the sourceSet so that they can be compiled along with Java sources.


sourceSets {
    main {
        java {
            srcDirs 'build/generated/source/proto/main/grpc'
            srcDirs 'build/generated/source/proto/main/java'
        }
    }
}

Running command gradlew build generates source code in the directory build/generated/source/proto/main/grpc and build/generated/source/proto/main/java.

Typically, in gRPC, the client and server share the same proto files. Thus, when you run gradlew build, the compiler generates stubs for both client and server.

gRPC RPC call flow

In gRPC, the server implements a set of methods/functions that can be invoked remotely.

gRPC RPC call

In a typical gRPC call, the following steps take place.

  1. It all starts with a client workflow initiating an RPC call.
  2. Once RPC starts, the client stub encodes the message in binary format. And, then it creates an HTTP POST request with the encoded message.
  3. Afterward, the encoded message is sent over the channel. A gRPC channel provides a connection to a gRPC server on a specified host and port.
  4. On the server-side, the server hands over the encoded message to the auto-generated server stub.
  5. After receiving the message, the server stub deserializes the message into a language-specific data structure.
  6. And finally, the server stub makes a call to the overridden service method and passes the parsed message.

Similarly, on the round trip, the response from the server is encoded and sent back to the client.

Code Example

The working code example of this article is listed on GitHub  . To run the example, clone repository, and import grpc-unary-rpc as a project in your favorite IDE as Gradle project.

To build the project and generate client and server stubs, run the command .\gradlew clean build. You can start the gRPC server in IDE by running the main method of the class GrpcServer. The gRPC server runs on localhost:3000.

gRPC project structure

Implementing server code

The gRPC server implements service and rpc methods defined in the proto files and expose those as RPC methods. After you run .\gradlew clean build, Gradle protobuf plugin generates a server stub in the directory build/generated/source/proto/.

Autogenerated Server stub

To start the gRPC server and register service, you can create a gRPC server instance by calling ServerBuilder as ServerBuilder.forPort(port).addService(new ProductService()).build().

In the above code, the ProductService provides an implementation of gRPC service defined in protobuf as:


service ProductService {
  rpc GetProduct(GetProductRequest) returns (GetProductResponse);
}

Complete code of starting the server and registering service:


public class GrpcServer {

  private final int port;
  private final Server server;

  public GrpcServer(int port) {
    this.port = port;
    this.server = ServerBuilder.forPort(port).addService(new ProductService()).build();
  }

  public void start() throws IOException {
    server.start();
    log.info("Server Started on port {} ", port);
    Runtime.getRuntime()
        .addShutdownHook(
            new Thread(
                () -> {
                  try {
                    this.stop();
                  } catch (InterruptedException e) {
                    e.printStackTrace();
                  }
                }));
  }

  private void stop() throws InterruptedException {
    if (server != null) {
      server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
    }
  }

  private void blockUntilShutDown() throws InterruptedException {
    if (this.server != null) {
      server.awaitTermination();
    }
  }

  public static void main(String[] args) throws IOException, InterruptedException {
    var productServer = new GrpcServer(8081);
    productServer.start();
    productServer.blockUntilShutDown();
  }
}

If you have already adopted Spring Boot as a microservices technology stack, then it makes sense to run the gRPC server as a Spring Boot application. Not only do you get all benefits of Spring Boot, such as dependency injection, security, sleuth, etc, but also you get a much cleaner code. Furthermore, defining a service is as simple as adding annotation @GrpcService.

Implementing service definition

There are four main steps associated with implementing the RPC method rpc GetProduct(GetProductRequest) returns (GetProductResponse):

  1. Implement class, say ProductService, that extends from autogenerated abstract class ProductServiceGrpc.ProductServiceImplBase.
  2. Override getProduct method and implement business logic.
  3. Once the business logic call completes, call responseObserver.onNext(getProductResponse) to pass the response back to the client. Finally, call responseObserver.onCompleted().
  4. In case of error, call responseObserver.onError(new StatusException(Status.NOT_FOUND)).

We have tried to simplify error handling here, but you must be aware that handling error in gRPC is not very straightforward.

Service implementation:


public class ProductService extends ProductServiceGrpc.ProductServiceImplBase {

  private final ProductRepository productRepository;

  public ProductService() {
    this.productRepository = new ProductRepository();
  }

  @Override
  public void getProduct(
      GetProductRequest request, StreamObserver<GetProductResponse> responseObserver) {
    var productId = request.getProductId();
    //Fetch Product information from repository
    Optional<Product> optionalProduct = productRepository.get(productId);

    if (optionalProduct.isPresent()) {
      var product = optionalProduct.get();
      //If found build response
      var productResponse =
          Service.Product.newBuilder()
              .setName(product.getName())
              .setDescription(product.getDescription())
              .setPrice(product.getPrice())
              .build();
      var getProductResponse = GetProductResponse.newBuilder().setProduct(productResponse).build();

      responseObserver.onNext(getProductResponse);
      responseObserver.onCompleted();
    } else {
      responseObserver.onError(new StatusException(Status.NOT_FOUND));
    }
    log.info("Finished calling Product API service..");
  }
}

Running gRPC server

Since this application is meant for understanding concepts only, we can start with running the gRPC server from IDE. For that, you can run the main method of dev.techdozo.product.GrpcServer. However, for production, you may want to deploy the server as a container or as a stand-alone application.

Implementing client code

On the client side, you need to start with the generation of the client stubs by using the proto files. Once client stubs are generated, you need to implement a channel. A channel represents a connection to the server. After the channel is created, you need to create a blocking or non-blocking client stub, and then you can call the server passing a request message.

Implementing gRPC client channel

You can create a gRPC channel specifying the server address and port as ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(). The channel represents a virtual connection to an endpoint to perform RPC.

The creation of a channel is expensive, so make sure to create a channel once and reuse it.

You can create the client stub using the newly created channel as:


var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();

gRPC supports two types of client stubs:

  • blocking/synchronous stub: in this stub, RPC call waits for the server to respond.
  • non-blocking/asynchronous stub: client makes non-blocking calls to the server, where the response is returned asynchronously.

Implementing blocking client stub

To create a synchronous/blocking client stub, use the newBlockingStub static method of ProductServiceGrpc.


var productServiceBlockingStub = ProductServiceGrpc.newBlockingStub(managedChannel);
var productRequest = GetProductRequest.newBuilder().setProductId("apple-123").build();

var productResponse = productServiceBlockingStub.getProduct(productRequest);

To run the blocking client example, run the main method of class dev.techdozo.order.client.UnaryGrpcBlockingClient from IDE. At the same time, make sure that the gRPC server is running. Once run, the client prints log like:


[INFO ] 2021-09-07 20:32:53.899 [main] UnaryGrpcBlockingClient - Calling Server..
[INFO ] 2021-09-07 20:32:56.485 [main] UnaryGrpcBlockingClient - Received Product from server, info product {
  name: "Apple iPhone 12 Pro (128GB)"
  description: "Apple iPhone 12 Pro (128GB) - Graphite"
  price: 1617.29
}

As you can infer from logs, both request and response are in the same thread [main]. In other words, the client blocks until the response are returned by the server.

Implementing asynchronous client stub

For most use cases a blocking operation suffices. But, as you can see blocking RPC waits for the server to return a response and thus wasting CPU cycles. Asynchronous client stub solves this problem by registering a callback. This callback is called, in a different thread, once the server sends to send the response. At the same time, the client can continue doing other work.

To implement asynchronous client stubs, use the newStub static method of ProductServiceGrpc.


var productServiceAsyncStub = ProductServiceGrpc.newStub(managedChannel);

and register a callback as:


var productRequest = GetProductRequest.newBuilder().setProductId("apple-123").build();
productServiceAsyncStub.getProduct(productRequest, new ProductCallback());

where callback is defined as:


class ProductCallback implements StreamObserver<GetProductResponse> {

  @Override
  public void onNext(GetProductResponse value) {
    log.info("Received product, {}", value);
  }

  @Override
  public void onError(Throwable cause) {
    log.error("Error occurred, cause {}", cause.getMessage());
  }

  @Override
  public void onCompleted() {
    log.info("Stream completed");
  }
}

To run the asynchronous client example, run the main method of class dev.techdozo.order.client.UnaryGrpcAsynClient from IDE. Once run, the client prints log like:


[INFO ] 2021-09-10 15:05:40.188 [main] UnaryGrpcAsynClient - Calling Server..
[INFO ] 2021-09-10 15:05:42.300 [grpc-default-executor-0] UnaryGrpcAsynClient - Received product, product {
  name: "Apple iPhone 12 Pro (128GB)"
  description: "Apple iPhone 12 Pro (128GB) - Graphite"
  price: 1617.29
}

[INFO ] 2021-09-10 15:05:42.301 [grpc-default-executor-0] UnaryGrpcAsynClient - Stream completed

Did you notice that the callback happens in a different thread grpc-default-executor-0 than the main thread?

For the callback, gRPC uses a cached thread pool that creates new threads as needed but will reuse previously constructed threads when they are available. If you want you can provide your own thread pool as:


var executorService = Executors.newFixedThreadPool(10);
var managedChannel =
    ManagedChannelBuilder.forAddress(host, port)
        .executor(executorService)
        .usePlaintext()
        .build();

Implementing asynchronous future stub

Another asynchronous RPC option is to use the Future stub. To define future sub, call the static method newFutureStub(Channel) of ProductServiceGrpc as:


// Create a new future stub
var productServiceFutureStub = ProductServiceGrpc.newFutureStub(managedChannel);

Like asynchronous stub, you can register a callback using Futures.addCallback(..) as:


var productRequest = GetProductRequest.newBuilder().setProductId("apple-123").build();
ListenableFuture<GetProductResponse> listenableFuture =
  productServiceFutureStub.getProduct(productRequest);
Futures.addCallback(listenableFuture, new ProductCallback(), fixedThreadPool);

Alternatively, you can register a runnable as:


listenableFuture.addListener(this::notifyListener, fixedThreadPool);

Unlike Futures, the Runnable does return anything. So, this can be useful when an RPC method returns an empty response and you want to notify a subscriber.

For example, in the case of deleting a product, you may want to notify a subscriber with a ProductDeleted event.


rpc DeleteProduct(DeleteProductRequest) returns (google.protobuf.Empty);

To run the asynchronous client example, run the main method of class dev.techdozo.order.client.UnaryGrpcFutureClient from IDE. Once run, the client prints log like:


[INFO ] 2021-09-10 15:15:37.754 [main] UnaryGrpcFutureClient - Calling Server..
[INFO ] 2021-09-10 15:15:39.474 [pool-2-thread-1] UnaryGrpcFutureClient - Received product f1 product {
  name: "Apple iPhone 12 Pro (128GB)"
  description: "Apple iPhone 12 Pro (128GB) - Graphite"
  price: 1617.29
}

Summary

gRPC, a remote procedure call (RPC) framework, is used for inter microservices communication. The gRPC supports both unary RPC and streaming RPC. In gRPC unary RPC, a client sends a single request and receives a single response. Additionally, an RPC in gRPC can be synchronous or asynchronous. In synchronous RPC, a client call waits for the server to respond. As the name suggests, in asynchronous RPC the server returns the response asynchronously.

Social Share !
Default image
Pankaj
Software Architect @ Schlumberger ``` Cloud | Microservices | Programming | Kubernetes | Architecture | Machine Learning | Java | Python ```