Techdozo
  • Microservices
    • All
    • GraphQL
    • gRPC
    • Spring Boot
    gRPC Bidirectional Streaming with Code Example

    gRPC Bidirectional Streaming with Code Example

    gRPC Client Streaming

    gRPC Client Streaming

    Distributed transaction in microservices using Saga

    Distributed Transactions in Microservices: implementing Saga with Temporal

    Temporal Workflow Orchestration

    Workflow Orchestration with Temporal and Spring Boot

    GraphQL Directive

    GraphQL Directive

    Spring for GraphQL mutation

    Spring for GraphQL: Mutation

  • Spring Boot
    Spring Boot GraphQL service

    Getting started with Spring Boot GraphQL service

    Deploying a RESTful Spring Boot Microservice on Kubernetes

    Deploying a RESTful Spring Boot Microservice on Kubernetes

    RESTful Microservices with Spring Boot and Kubernetes

    RESTful Microservices with Spring Boot and Kubernetes

    RESTful API Gateway with gRPC

    RESTful API Gateway with gRPC

  • gRPC
    gRPC Bidirectional Streaming with Code Example

    gRPC Bidirectional Streaming with Code Example

    gRPC Client Streaming

    gRPC Client Streaming

    gRPC Interceptor: unary interceptor with code example

    gRPC Interceptor: unary interceptor with code example

    gRPC: synchronous and asynchronous Server streaming RPC

    gRPC: synchronous and asynchronous Server streaming RPC

    Photo by Ramón Salinero on Unsplash

    gRPC: synchronous and asynchronous unary RPC in Java

    Microservices inter-process communication using gRPC

    gRPC for microservices communication

  • GraphQL
    GraphQL Directive

    GraphQL Directive

    Spring for GraphQL mutation

    Spring for GraphQL: Mutation

    Spring for GraphQL: How to solve the N+1 Problem?

    Spring for GraphQL: How to solve the N+1 Problem?

    Spring GraphQL with @Controller, @SchemaMapping and @QueryMapping

    Spring for GraphQL : @SchemaMapping and @QueryMapping

    Spring Boot GraphQL service

    Getting started with Spring Boot GraphQL service

  • Kubernetes
    Deploying a RESTful Spring Boot Microservice on Kubernetes

    Deploying a RESTful Spring Boot Microservice on Kubernetes

    Components of Kubernetes Architecture

    Components of Kubernetes Architecture

    Helm Chart: quick start your app deployment on Kubernetes

    Helm Chart: quick start your app deployment on Kubernetes

    gRPC load balancing on Kubernetes (using Headless Service)

    gRPC load balancing on Kubernetes (using Headless Service)

    Getting started with Kind: quick start a multi-node local Kubernetes cluster

    Getting started with Kind: quick start a multi-node local Kubernetes cluster

    Getting started with Minikube: deploying application on local Kubernetes cluster

    Getting started with Minikube: deploying application on local Kubernetes cluster

  • Java
    Java Streams: Stream Operation with Examples

    Java Streams: Stream Operation with Examples

    Java Streams: stream creation with examples

    Java Streams: stream creation with examples

    Garbage Collection

    Super Fast Garbage Collectors in Java

    Calculus

    Functional Programming in Java

No Result
View All Result
  • Login
  • Microservices
    • All
    • GraphQL
    • gRPC
    • Spring Boot
    gRPC Bidirectional Streaming with Code Example

    gRPC Bidirectional Streaming with Code Example

    gRPC Client Streaming

    gRPC Client Streaming

    Distributed transaction in microservices using Saga

    Distributed Transactions in Microservices: implementing Saga with Temporal

    Temporal Workflow Orchestration

    Workflow Orchestration with Temporal and Spring Boot

    GraphQL Directive

    GraphQL Directive

    Spring for GraphQL mutation

    Spring for GraphQL: Mutation

  • Spring Boot
    Spring Boot GraphQL service

    Getting started with Spring Boot GraphQL service

    Deploying a RESTful Spring Boot Microservice on Kubernetes

    Deploying a RESTful Spring Boot Microservice on Kubernetes

    RESTful Microservices with Spring Boot and Kubernetes

    RESTful Microservices with Spring Boot and Kubernetes

    RESTful API Gateway with gRPC

    RESTful API Gateway with gRPC

  • gRPC
    gRPC Bidirectional Streaming with Code Example

    gRPC Bidirectional Streaming with Code Example

    gRPC Client Streaming

    gRPC Client Streaming

    gRPC Interceptor: unary interceptor with code example

    gRPC Interceptor: unary interceptor with code example

    gRPC: synchronous and asynchronous Server streaming RPC

    gRPC: synchronous and asynchronous Server streaming RPC

    Photo by Ramón Salinero on Unsplash

    gRPC: synchronous and asynchronous unary RPC in Java

    Microservices inter-process communication using gRPC

    gRPC for microservices communication

  • GraphQL
    GraphQL Directive

    GraphQL Directive

    Spring for GraphQL mutation

    Spring for GraphQL: Mutation

    Spring for GraphQL: How to solve the N+1 Problem?

    Spring for GraphQL: How to solve the N+1 Problem?

    Spring GraphQL with @Controller, @SchemaMapping and @QueryMapping

    Spring for GraphQL : @SchemaMapping and @QueryMapping

    Spring Boot GraphQL service

    Getting started with Spring Boot GraphQL service

  • Kubernetes
    Deploying a RESTful Spring Boot Microservice on Kubernetes

    Deploying a RESTful Spring Boot Microservice on Kubernetes

    Components of Kubernetes Architecture

    Components of Kubernetes Architecture

    Helm Chart: quick start your app deployment on Kubernetes

    Helm Chart: quick start your app deployment on Kubernetes

    gRPC load balancing on Kubernetes (using Headless Service)

    gRPC load balancing on Kubernetes (using Headless Service)

    Getting started with Kind: quick start a multi-node local Kubernetes cluster

    Getting started with Kind: quick start a multi-node local Kubernetes cluster

    Getting started with Minikube: deploying application on local Kubernetes cluster

    Getting started with Minikube: deploying application on local Kubernetes cluster

  • Java
    Java Streams: Stream Operation with Examples

    Java Streams: Stream Operation with Examples

    Java Streams: stream creation with examples

    Java Streams: stream creation with examples

    Garbage Collection

    Super Fast Garbage Collectors in Java

    Calculus

    Functional Programming in Java

No Result
View All Result
Techdozo
No Result
View All Result
Home Microservices gRPC

gRPC: synchronous and asynchronous Server streaming RPC

Pankaj by Pankaj
December 30, 2021
in gRPC, Microservices
Reading Time: 17 mins read
1
A A
0
gRPC: synchronous and asynchronous Server streaming RPC
0
SHARES
10.1k
VIEWS
Share on FacebookShare on TwitterShare on Linkedin

gRPC, one of the most popular RPC frameworks for inter-process microservices communication. It supports both unary and streaming RPC. Contrary to unary RPC, in gRPC streaming RPC, a client sends a single request; in return, the server sends a stream of messages.

In this article, we will see how to implement server streaming RPC and how to handle errors in streaming response.

Want to know more about gRPC? Check the below posts. 

Why gRPC

Unary gRPC

gRPC RPC Type

The gRPC supports four types of RPC:

  1. Unary RPC: the client sends a single request and receives a single response.
  2. Server streaming RPC: the client sends a single request; in return, the server sends a stream of messages.
  3. Client streaming RPC: the client sends a stream of messages, and the server responds with a single message.
  4. Bidirectional streaming RPC: in bidirectional streaming, both the client and server send a stream of messages.

Additionally, a gRPC RPC can be synchronous or asynchronous.

  • Synchronous: a client call waits for the server to respond.
  • Asynchronous: client makes non-blocking calls to the server, and the server returns the response asynchronously.

What is gRPC Server Streaming RPC?

In the server streaming RPC, a gRPC client sends a single message, and in reply, the gRPC server sends a bunch of messages. The streams of messages are followed by a signal to notify the client about the end of the stream. The server sends this signal as a onComplete() method call (more about this later).

gRPC Server Streaming

Server Streaming Service Definition

Let’s consider a use case where a client calls the server with a list of ids, and the server returns the stream of messages instead of returning all responses in one go. For such use cases, you can define a service with stream keyword in return type as returns (stream <message>). One example of such a service is shown below. 

As far as the service definition is concerned, the only difference between unary and server-streaming is the use of the stream keyword, and the rest remains the same.


syntax = "proto3";
package dev.techdozo.product.api;

service ProductService {
  // A server side streaming RPC
  // streams product information for a given productId
  rpc ListProduct(ListProductRequest) returns (stream GetProductResponse);
}

message GetProductResponse {
  Product product = 1;
}

message ListProductRequest {
  repeated string productId = 1;
}

message Product {
  string name = 1;
  string description = 2;
  double price = 3;
}

Protobuf Code Generation 

You can use the protoc compiler to generate the stubs for the client and server.

The protobuf Gradle plugin is a Gradle plugin that compiles protobuf (.proto) files and generates client and server code in java.

Running command gradlew build generates source code in the directory build/generated/source/proto/main/grpc and build/generated/source/proto/main/java. The generated java source files should be added to the sourceSet so that they can be compiled along with Java sources.


sourceSets {
    main {
        java {
            srcDirs 'build/generated/source/proto/main/grpc'
            srcDirs 'build/generated/source/proto/main/java'
        }
    }
}

Code Example

The working code example of this article is listed on GitHub   . To run the example, clone the repository and import grpc-server-streaming-rpc as a project in your favourite IDE as a Gradle project.

To build the project and generate client and server stubs, run the command .\gradlew clean build. You can start the gRPC server in IDE by running the main method of the class GrpcServer. The gRPC server runs on localhost:3000.

Project Structure

Implementing Server Code

The gRPC server provides the implementation of service methods defined in the proto files and exposes those as RPC methods. Broadly, implementing server streaming code includes three steps.

  1. Generate server stub: the first step is to generate server stubs. For this, run the command .\gradlew clean build. The Gradle protobuf plugin generates a server stub in the directory build/generated/source/proto/.
  2. Implement service class: the next step is to implement RPC service by extending from autogenerated class ProductServiceGrpc.ProductServiceImplBase and override listProduct method (check service implemenation code below). You can delegate the call to business logic from service class. For example, the service method listService can call repository to fetch products from database.
  3. Implement server code: and the last step is to implement server code and register service. This can done as ServerBuilder.forPort(port).addService(new ProductService()).build().


Service Implementation

public class ProductService extends ProductServiceGrpc.ProductServiceImplBase {

  private final ProductRepository productRepository;

  public ProductService() {
    this.productRepository = new ProductRepository();
  }

  @Override
  public void listProduct(
      Service.ListProductRequest request, StreamObserver<GetProductResponse> responseObserver) {

    log.info("Fetching products..");

    var productIds = request.getProductIdList();

    for (var productId : productIds) {
      // Fetch Product information from repository
      Optional<Product> optionalProduct = productRepository.get(productId);
      // If found send stream response else send error code
      if (optionalProduct.isPresent()) {
        var product = optionalProduct.get();
        // If found build response
        var productResponse =
            Service.Product.newBuilder()
                .setName(product.getName())
                .setDescription(product.getDescription())
                .setPrice(product.getPrice())
                .build();
        var getProductResponse =
            GetProductResponse.newBuilder().setProduct(productResponse).build();

        responseObserver.onNext(getProductResponse);
        log.info("Called onNext for id {}", productId);
      } else {
        // Error code if product detail is not found
        responseObserver.onError(new StatusException(Status.NOT_FOUND));
      }
    }
    //Indicates that stream is done.
    responseObserver.onCompleted();
    log.info("Finished calling Product API service..");
  }
}

In the above code, the call to the responseObserver.onCompleted() indicates that the stream response has completed.


Server code

public class GrpcServer {
  private final int port;
  private final Server server;

  public GrpcServer(int port) {
    this.port = port;
    var productService = new ProductService();
    this.server = ServerBuilder.forPort(port).addService(productService).build();
  }

  public void start() throws IOException {
    log.info("Starting Server..");
    server.start();
    log.info("Server Started on port {} ", port);

    Runtime.getRuntime()
        .addShutdownHook(
            new Thread(
                () -> {
                  try {
                    this.stop();
                  } catch (InterruptedException e) {
                    e.printStackTrace();
                  }
                }));
  }

  private void stop() throws InterruptedException {
    log.info("Stopping Server..");
    if (server != null) {
      server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
    }
  }

  private void blockUntilShutDown() throws InterruptedException {
    if (this.server != null) {
      server.awaitTermination();
    }
  }

  public static void main(String[] args) throws IOException, InterruptedException {
    var productServer = new GrpcServer(3000);
    productServer.start();
    productServer.blockUntilShutDown();
  }
}

Running gRPC Server

You can start the gRPC server from IDE. For that, you can run the main method of dev.techdozo.product.GrpcServer. However, for production, you may want to deploy the server as a container or as a stand-alone application.

Implementing gRPC Client Channel

A gRPC channel represents a virtual connection to an endpoint to perform RPC.

You can create a gRPC channel specifying the server address and port as ManagedChannelBuilder.forAddress(host, port).usePlaintext().build().


var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();

Creation of a channel is expensive, so make sure to create a channel once and reuse it.

gRPC supports two types of client stubs:

  • blocking/synchronous stub: The RPC call waits for the server to respond in this stub.
  • non-blocking/asynchronous stub: client makes non-blocking calls to the server, where the response is returned asynchronously.

Implementing Blocking Client Stub

To create a synchronous/blocking client stub, use the newBlockingStub static method of auto-generated ProductServiceGrpc.


var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
var productServiceBlockingStub = ProductServiceGrpc.newBlockingStub(managedChannel);

var productRequest =
    ListProductRequest.newBuilder()
        .addAllProductId(List.of("apple-123", "apple-124", "apple-125"))
        .build();

var productResponse = productServiceBlockingStub.listProduct(productRequest);

try {
  while (productResponse.hasNext()) {
    log.info("Received Product from server, info {}", productResponse.next());
  }
} catch (StatusRuntimeException statusRuntimeException) {
  log.error("Error, message {}", statusRuntimeException.getMessage());
}

To run the blocking client example, run the main method of the class dev.techdozo.order.client.UnaryGrpcBlockingClient from IDE. At the same time, make sure that the gRPC server is running. Once run, the client prints log like:


[INFO ] 2021-12-19 15:11:19.618 [main] UnaryGrpcBlockingClient - Received Product from server, info product {
  name: "Apple iPhone 12 Pro (128GB)"
  description: "Apple iPhone 12 Pro (128GB) - Graphite"
  price: 1617.29
}

As you can infer from logs, both request and response are in the same thread [main]. In other words, the client blocks until the server return the response.

Implementing Asynchronous Client Stub

For most use cases, a blocking RPC suffices. However, the blocking RPC waits for the server to return a response and thus wasting precious CPU cycles. You can use an asynchronous client stub to overcome this problem by registering a callback. This callback is called in a different thread, once the server sends the response.

To implement asynchronous client stubs, use the newStub static method of ProductServiceGrpc.


var productServiceAsyncStub = ProductServiceGrpc.newStub(managedChannel);

and register a callback as:


var productRequest =
    Service.ListProductRequest.newBuilder()
        .addAllProductId(List.of("apple-123", "apple-124", "apple-125"))
        .build();
productServiceAsyncStub.listProduct(productRequest, new ProductCallback());

where callback is defined as:


class ProductCallback implements StreamObserver<GetProductResponse> {

  @Override
  public void onNext(GetProductResponse value) {
    log.info("Received product, {}", value);
  }

  @Override
  public void onError(Throwable cause) {
    log.error("Error occurred, cause {}", cause.getMessage());
  }

  @Override
  public void onCompleted() {
    log.info("Stream completed");
  }
}

To run the asynchronous client example, run the main method of class dev.techdozo.order.client.UnaryGrpcAsynClient from IDE. Once run, the client prints logs like this:


[INFO ] 2021-12-19 15:16:11.798 [main] UnaryGrpcAsynClient - Calling Server..
[INFO ] 2021-12-19 15:16:14.046 [grpc-default-executor-1] UnaryGrpcAsynClient - Received product, product {
  name: "Apple iPhone 12 Pro Max (128GB)"
  description: "Apple iPhone 12 Pro (128GB) - Red"
  price: 1752.59
}

Did you notice that the callback happens in a different thread grpc-default-executor-1 than the main thread?

For the callback, gRPC uses a cached thread pool that creates new threads as needed but will reuse previously constructed threads when they are available. If you want you can provide your own thread pool as:


var executorService = Executors.newFixedThreadPool(10);
var managedChannel =
    ManagedChannelBuilder.forAddress(host, port)
        .executor(executorService)
        .usePlaintext()
        .build();

Handling Errors in Stream Response

If you recall, we sent the error response Status.NOT_FOUND in case the product is not found for a given id. Sending an error response terminates the stream. This can be a problem if you want to read the complete stream response even in case of an error and handle the error separately.


public void listProduct(
    Service.ListProductRequest request, StreamObserver<GetProductResponse> responseObserver) {

  var productIds = request.getProductIdList();

  for (var productId : productIds) {
    
	Optional<Product> optionalProduct = productRepository.get(productId);
    
    if (optionalProduct.isPresent()) {
      var product = optionalProduct.get();
      // Prepare response
      ....
      responseObserver.onNext(getProductResponse);
    } else {
      // Error code if product detail is not found
      responseObserver.onError(new StatusException(Status.NOT_FOUND));
    }
  }
  // Indicates that stream is done.
  responseObserver.onCompleted();
}

Consider a scenario when you call server with product ids product-1, product- 2, product- 3 where product-2 is NOT_FOUND on the server and server responds as responseObserver.onError(new StatusException(Status.NOT_FOUND)). In this case, the server will throw an error for product-3 as “Stream was terminated by error, no further calls are allowed” and on the client-side, you will not get the response of product-3 even if you are interested.


SEVERE: Exception while executing runnable io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@42f8c429
java.lang.IllegalStateException: Stream was terminated by error, no further calls are allowed
	at com.google.common.base.Preconditions.checkState(Preconditions.java:511)

One important point to note here is that calling onError on the server terminates the stream, and it has no impact on the gRPC connection. If you want, you can reuse the same ManagedChannel, which represents a virtual connection to the server, and call the server again.

How to Handle Errors in gRPC Stream?

As you know by now, sending an error response terminates the stream. So clearly, you can’t call the responseObserver.onError method on the gRPC server in the case of business errors such as NOT_FOUND. To handle such errors, you need to send the error response as part of the stream message itself. This is where the protobuf oneOf feature comes in handy.

Using the protobuf oneOf, you can specify that the server either sends an error response or a valid response as:


message GetProductResponse {
  oneof product_response {
    Product product = 1;
    google.rpc.Status error = 2;
  }
}

On the gRPC server, you need to make changes to send either a valid response or an error response as:


for (var productId : productIds) {
  
  // Fetch Product information from repository
  Optional<Product> optionalProduct = productRepository.get(productId);
  // If found send stream response else send error code

  Service.GetProductResponse getProductResponse;
  if (optionalProduct.isPresent()) {
    var product = optionalProduct.get();
    // If found build response
    var productResponse =
        Service.Product.newBuilder()
            .setName(product.getName())
            .setDescription(product.getDescription())
            .setPrice(product.getPrice())
            .build();
    getProductResponse = GetProductResponse.newBuilder().setProduct(productResponse).build();

  } else {
    com.google.rpc.Status status =
        com.google.rpc.Status.newBuilder()
            .setCode(Code.NOT_FOUND.getNumber())
            .setMessage("Product id not found")
            .build();
    getProductResponse = GetProductResponse.newBuilder().setError(status).build();
  }
  responseObserver.onNext(getProductResponse);
}
// Indicates that stream is done.
responseObserver.onCompleted();

On gRPC client, you can take appropriate action as:


var productResponse = productServiceBlockingStub.listProduct(productRequest);

while (productResponse.hasNext()) {
  var getProductResponse = productResponse.next();
  var productResponseCase = getProductResponse.getProductResponseCase();

  if (productResponseCase == PRODUCT) {
    log.info("Received Product from server, info {}", getProductResponse);
  } else if (productResponseCase == ERROR)
    // Some business logic
    log.info("Received Error {}", getProductResponse.getError());
}

Summary

gRPC is a popular remote procedure call (RPC) framework. The gRPC supports both unary RPC and streaming RPC. In streaming RPC, a client sends a single request and receives a bunch of messages as the response.

A streaming RPC in gRPC can be synchronous or asynchronous. In synchronous RPC, a client call waits for the server to respond. As the name suggests, in asynchronous RPC the server returns the response asynchronously.

Handling business errors using the normal construct of calling responseObserver.onError() doesn’t work. We can use protobuf oneOf to send either a valid response or an error response back to the client.

if you like this article, then please follow me on LinkedIn  for more tips on #software architecture.

Tags: grpcmicroservices
Previous Post

gRPC: synchronous and asynchronous unary RPC in Java

Next Post

gRPC Interceptor: unary interceptor with code example

Pankaj

Pankaj

Software Architect @ Schlumberger ``` Cloud | Microservices | Programming | Kubernetes | Architecture | Machine Learning | Java | Python ```

Related Posts

gRPC Bidirectional Streaming with Code Example
gRPC

gRPC Bidirectional Streaming with Code Example

February 17, 2023
gRPC Client Streaming
gRPC

gRPC Client Streaming

January 20, 2023
Distributed transaction in microservices using Saga
Microservices

Distributed Transactions in Microservices: implementing Saga with Temporal

November 8, 2022
Temporal Workflow Orchestration
Microservices

Workflow Orchestration with Temporal and Spring Boot

October 29, 2022

Discussion about this post

Recent Articles

gRPC Bidirectional Streaming with Code Example

gRPC Bidirectional Streaming with Code Example

February 17, 2023
gRPC Client Streaming

gRPC Client Streaming

January 20, 2023
Distributed transaction in microservices using Saga

Distributed Transactions in Microservices: implementing Saga with Temporal

November 8, 2022
Temporal Workflow Orchestration

Workflow Orchestration with Temporal and Spring Boot

October 29, 2022
  • Trending
  • Comments
  • Latest
Deploying a RESTful Spring Boot Microservice on Kubernetes

Deploying a RESTful Spring Boot Microservice on Kubernetes

August 18, 2021
Temporal Workflow Orchestration

Workflow Orchestration with Temporal and Spring Boot

October 29, 2022
gRPC Interceptor: unary interceptor with code example

gRPC Interceptor: unary interceptor with code example

April 30, 2022
Microservices inter-process communication using gRPC

gRPC for microservices communication

August 29, 2021
Calculus

Functional Programming in Java

0
Java Streams: stream creation with examples

Java Streams: stream creation with examples

0
Garbage Collection

Super Fast Garbage Collectors in Java

0
Java Streams: Stream Operation with Examples

Java Streams: Stream Operation with Examples

0
gRPC Bidirectional Streaming with Code Example

gRPC Bidirectional Streaming with Code Example

February 17, 2023
gRPC Client Streaming

gRPC Client Streaming

January 20, 2023
Distributed transaction in microservices using Saga

Distributed Transactions in Microservices: implementing Saga with Temporal

November 8, 2022
Temporal Workflow Orchestration

Workflow Orchestration with Temporal and Spring Boot

October 29, 2022
Facebook Twitter Pinterest

TECHDOZO

Simplifying modern tech stack!

Browse by Category

  • Bitesize
  • GraphQL
  • gRPC
  • Java
  • Kubernetes
  • Microservices
  • Spring Boot

Recent Articles

gRPC Bidirectional Streaming with Code Example

gRPC Bidirectional Streaming with Code Example

February 17, 2023
gRPC Client Streaming

gRPC Client Streaming

January 20, 2023

© 2023 Techdozo.

No Result
View All Result
  • Home
  • gRPC
  • Kubernetes
  • Microservices
  • GraphQL

© 2023 Techdozo.

Welcome Back!

Sign In with Google
OR

Login to your account below

Forgotten Password?

Retrieve your password

Please enter your username or email address to reset your password.

Log In