The gRPC is a remote procedure call (RPC) framework for inter-microservices communication. The gRPC supports four types of RPC:
- Unary RPC: the client sends a single request and receives a single response.
- Server streaming RPC: the client sends a single request; in return, the server sends a stream of messages.
- Client streaming RPC: the client sends a stream of messages, and the server responds with a single message.
- Bidirectional streaming RPC: in bidirectional streaming, both the client and server send a stream of messages.
Additionally, a gRPC unary RPC can be synchronous or asynchronous.
- Synchronous: a client call waits for the server to respond.
- Asynchronous: client makes non-blocking calls to the server, and the server returns the response asynchronously.
In this article, we will see how to implement gRPC synchronous and asynchronous unary RPC in java.
Let’s get started:
What is Unary RPC in gRPC?
In gRPC unary RPC, a client sends a single request and receives a single response.
Like many RPC frameworks, gRPC is based on the idea of defining a remote service, specifying which remote methods can be called along with their parameters and return types. In gRPC, we define remote service and methods in protocol buffers (protobuf) format in .proto
file(s).
For example, let’s consider a remote service ProductService
that exposes remote methods to create and query product information. We can define a unary RPC to get product information for a given id as:
syntax = "proto3";
package dev.techdozo.product.api;
service ProductService {
// A simple RPC.
// returns Product for a given id
rpc GetProduct(GetProductRequest) returns (GetProductResponse);
}
message GetProductRequest {
string product_id = 1;
}
message GetProductResponse {
Product product = 1;
}
message Product {
string name = 1;
string description = 2;
double price = 3;
}
Let’s understand the above protobuf definition.
- Service: service is a logical collection of remote methods the server exposes. In protobuf, we define service by using the service keyword. Likewise, we can define an RPC method by using
rpc
keyword. For example,rpc GetProduct(GetProductRequest) returns (GetProductResponse)
defines an RPC method that takes a messageGetProductRequest
and returnsGetProductResponse
. A protoc compiler takes the protobuf definition file (.proto
) and generate the client and server stubs. - Message: the message is a binary data structure that is exchanged between the client and server. Field numbers, such as
name = 1
, are used to identify fields in the binary encoded data.
To simplify, the GetProduct
RPC takes product_id
and returns the Product
. You can read more about protocol buffers in the language guide.
Protobuf Code Generation
The protoc compiler, which supports code generation in many languages, generates client and server code. In this example, we will use the protobuf gradle plugin to generate client and server code in java.
The protocol buffer plugin assembles the protobuf compiler (protoc
) command line and uses it to generate Java source files from the .proto
files. The generated java source files should be added to the sourceSet so they can be compiled along with Java sources.
sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/java'
}
}
}
Running command gradlew build
generates source code in the directory build/generated/source/proto/main/grpc
and build/generated/source/proto/main/java
.
Typically, in gRPC, the client and server share the same proto files. Thus, when you run gradlew build
command, the compiler generates stubs for both client and server.
gRPC RPC Call Flow
In gRPC, the server implements a set of methods/functions that can be invoked remotely.
In a typical gRPC call, the following steps take place.
- It all starts with a client workflow initiating an RPC call.
- Once RPC starts, the client stub encodes the message in binary format. And, then it creates an HTTP POST request with the encoded message.
- Afterwards, the encoded message is sent over the channel. A gRPC channel provides a connection to a gRPC server on a specified host and port.
- On the server side, the server hands over the encoded message to the auto-generated server stub.
- After receiving the message, the server stub deserializes the message into a language-specific data structure.
- And finally, the server stub calls the overridden service method and passes the parsed message.
Similarly, on the round trip, the response from the server is encoded and sent back to the client.
Code Example
The working code example of this article is listed on GitHub . To run the example, clone repository, and import grpc-unary-rpc as a project in your favourite IDE as a Gradle project.
To build the project and generate client and server stubs, run the command .\gradlew clean build
. You can start the gRPC server in IDE by running the main method of the class GrpcServer
. The gRPC server runs on localhost:3000.
Implementing Server Code
The gRPC server implements service
and rpc
methods defined in the proto files and expose those as RPC methods. After you run .\gradlew clean build
, Gradle protobuf plugin generates a server stub in the directory build/generated/source/proto/
.
To start the gRPC server and register service, you can create a gRPC server instance by calling ServerBuilder
as ServerBuilder.forPort(port).addService(new ProductService()).build()
.
In the above code, the ProductService
provides an implementation of gRPC service defined in protobuf as:
service ProductService {
rpc GetProduct(GetProductRequest) returns (GetProductResponse);
}
Code for starting the server and registering the service:
public class GrpcServer {
private final int port;
private final Server server;
public GrpcServer(int port) {
this.port = port;
this.server = ServerBuilder.forPort(port).addService(new ProductService()).build();
}
public void start() throws IOException {
server.start();
log.info("Server Started on port {} ", port);
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
try {
this.stop();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
}
private void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
private void blockUntilShutDown() throws InterruptedException {
if (this.server != null) {
server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
var productServer = new GrpcServer(8081);
productServer.start();
productServer.blockUntilShutDown();
}
}
If you have already adopted Spring Boot as a microservices technology stack, running the gRPC server as a Spring Boot application makes sense. You get all benefits of Spring Boot, such as dependency injection, security, sleuth, etc., and a much cleaner code. Furthermore, defining a service is as simple as adding annotation @GrpcService
.
Implementing Service Definition
There are four main steps associated with implementing the RPC method rpc GetProduct(GetProductRequest) returns (GetProductResponse)
:
- Implement class, say
ProductService
, that extends from autogeneratedabstract
classProductServiceGrpc.ProductServiceImplBase
. - Override
getProduct
method and implement business logic. - Once the business logic call completes, call
responseObserver.onNext(getProductResponse)
to pass the response back to the client. Finally, call responseObserver.onCompleted(). - In case of error, call
responseObserver.onError(new StatusException(Status.NOT_FOUND))
.
We have tried to simplify error handling here, but you must be aware that handling error in gRPC is not very straightforward.
Service implementation:
public class ProductService extends ProductServiceGrpc.ProductServiceImplBase {
private final ProductRepository productRepository;
public ProductService() {
this.productRepository = new ProductRepository();
}
@Override
public void getProduct(
GetProductRequest request, StreamObserver<GetProductResponse> responseObserver) {
var productId = request.getProductId();
//Fetch Product information from repository
Optional<Product> optionalProduct = productRepository.get(productId);
if (optionalProduct.isPresent()) {
var product = optionalProduct.get();
//If found build response
var productResponse =
Service.Product.newBuilder()
.setName(product.getName())
.setDescription(product.getDescription())
.setPrice(product.getPrice())
.build();
var getProductResponse = GetProductResponse.newBuilder().setProduct(productResponse).build();
responseObserver.onNext(getProductResponse);
responseObserver.onCompleted();
} else {
responseObserver.onError(new StatusException(Status.NOT_FOUND));
}
log.info("Finished calling Product API service..");
}
}
Running gRPC Server
Since this application is meant for understanding concepts only, we can start with running the gRPC server from IDE. For that, you can run the main method of dev.techdozo.product.GrpcServer
. However, for production, you may want to deploy the server as a container or as a stand-alone application.
Implementing Client Code
You can generate the client stubs using the proto files on the client side. Once client stubs are generated, you need to implement a channel. A channel represents a connection to the server. After the channel is created, you need to create a blocking or non-blocking client stub, and then you can call the server by passing a request message.
Implementing the gRPC Client Channel
You can create a gRPC channel specifying the server address and port as ManagedChannelBuilder.forAddress(host, port).usePlaintext().build()
. The channel represents a virtual connection to an endpoint to perform RPC.
Creating a channel is expensive, so make sure to create a channel once and reuse it.
You can create the client stub using the newly created channel:
var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
gRPC supports two types of client stubs:
- Blocking/synchronous stub: The RPC call waits for the server to respond in this stub.
- non-blocking/asynchronous stub: client makes non-blocking calls to the server, where the response is returned asynchronously.
Implementing Blocking Client Stub
To create a synchronous/blocking client stub, use the newBlockingStub
static method of ProductServiceGrpc
.
var productServiceBlockingStub = ProductServiceGrpc.newBlockingStub(managedChannel);
var productRequest = GetProductRequest.newBuilder().setProductId("apple-123").build();
var productResponse = productServiceBlockingStub.getProduct(productRequest);
To run the blocking client example, run the main method of class dev.techdozo.order.client.UnaryGrpcBlockingClient
from IDE. At the same time, make sure that the gRPC server is running. Once run, the client prints log like:
[INFO ] 2021-09-07 20:32:53.899 [main] UnaryGrpcBlockingClient - Calling Server..
[INFO ] 2021-09-07 20:32:56.485 [main] UnaryGrpcBlockingClient - Received Product from server, info product {
name: "Apple iPhone 12 Pro (128GB)"
description: "Apple iPhone 12 Pro (128GB) - Graphite"
price: 1617.29
}
As you can infer from logs, both request and response are in the same thread [main
]. In other words, the client blocks until the server return the response.
Implementing Asynchronous Client Stub
For most use cases, a blocking operation suffices. But, as you can see, blocking RPC waits for the server to return a response, thus wasting CPU cycles. An asynchronous client stub solves this problem by registering a callback. This callback is called, in a different thread, once the server sends to send the response. At the same time, the client can continue doing other work.
To implement asynchronous client stubs, use the newStub
static method of ProductServiceGrpc
.
var productServiceAsyncStub = ProductServiceGrpc.newStub(managedChannel);
and register a callback as:
var productRequest = GetProductRequest.newBuilder().setProductId("apple-123").build();
productServiceAsyncStub.getProduct(productRequest, new ProductCallback());
where callback is defined as:
class ProductCallback implements StreamObserver<GetProductResponse> {
@Override
public void onNext(GetProductResponse value) {
log.info("Received product, {}", value);
}
@Override
public void onError(Throwable cause) {
log.error("Error occurred, cause {}", cause.getMessage());
}
@Override
public void onCompleted() {
log.info("Stream completed");
}
}
To run the asynchronous client example, run the main method of class dev.techdozo.order.client.UnaryGrpcAsynClient
from IDE. Once run, the client prints logs like this:
[INFO ] 2021-09-10 15:05:40.188 [main] UnaryGrpcAsynClient - Calling Server..
[INFO ] 2021-09-10 15:05:42.300 [grpc-default-executor-0] UnaryGrpcAsynClient - Received product, product {
name: "Apple iPhone 12 Pro (128GB)"
description: "Apple iPhone 12 Pro (128GB) - Graphite"
price: 1617.29
}
[INFO ] 2021-09-10 15:05:42.301 [grpc-default-executor-0] UnaryGrpcAsynClient - Stream completed
Did you notice that the callback happens in a different thread grpc-default-executor-0 than the main thread?
For the callback, gRPC uses a cached thread pool that creates new threads as needed but will reuse previously constructed threads when they are available. If you want, you can provide your thread pool as:
var executorService = Executors.newFixedThreadPool(10);
var managedChannel =
ManagedChannelBuilder.forAddress(host, port)
.executor(executorService)
.usePlaintext()
.build();
Implementing Asynchronous Future Stub
Another asynchronous RPC option is to use the Future
stub. To define future sub, call the static method newFutureStub(Channel)
of ProductServiceGrpc
as:
// Create a new future stub
var productServiceFutureStub = ProductServiceGrpc.newFutureStub(managedChannel);
Like asynchronous stub, you can register a callback using Futures.addCallback(..) as:
var productRequest = GetProductRequest.newBuilder().setProductId("apple-123").build();
ListenableFuture<GetProductResponse> listenableFuture =
productServiceFutureStub.getProduct(productRequest);
Futures.addCallback(listenableFuture, new ProductCallback(), fixedThreadPool);
Alternatively, you can register a runnable as:
listenableFuture.addListener(this::notifyListener, fixedThreadPool);
Unlike Futures
, the Runnable
does not return anything. So, this can be useful when an RPC method returns an empty response, and you want to notify a subscriber.
For example, you may want to notify a subscriber with a ProductDeleted event when deleting a product.
rpc DeleteProduct(DeleteProductRequest) returns (google.protobuf.Empty);
To run the asynchronous client example, run the main method of class dev.techdozo.order.client.UnaryGrpcFutureClient
from IDE. Once run, the client prints logs like this:
[INFO ] 2021-09-10 15:15:37.754 [main] UnaryGrpcFutureClient - Calling Server..
[INFO ] 2021-09-10 15:15:39.474 [pool-2-thread-1] UnaryGrpcFutureClient - Received product f1 product {
name: "Apple iPhone 12 Pro (128GB)"
description: "Apple iPhone 12 Pro (128GB) - Graphite"
price: 1617.29
}
Summary
gRPC, a remote procedure call (RPC) framework, is used for inter-microservices communication. The gRPC supports both unary RPC and streaming RPC. In gRPC unary RPC, a client sends a single request and receives a single response. Additionally, an RPC in gRPC can be synchronous or asynchronous. In synchronous RPC, a client call waits for the server to respond. As the name suggests, in asynchronous RPC, the server returns the response asynchronously.
Discussion about this post