Techdozo
  • Microservices
    • All
    • GraphQL
    • gRPC
    • Spring Boot
    GraphQL Error Handling

    GraphQL Error Handling

    Pages

    Spring for GraphQL: Pagination with Code Example

    Spring for GraphQL: Interfaces and Unions

    gRPC Bidirectional Streaming with Code Example

    gRPC Bidirectional Streaming with Code Example

    gRPC Client Streaming

    gRPC Client Streaming

    Distributed transaction in microservices using Saga

    Distributed Transactions in Microservices: implementing Saga with Temporal

  • Spring Boot
    Pages

    Spring for GraphQL: Pagination with Code Example

    Spring Boot GraphQL service

    Getting started with Spring Boot GraphQL service

    Deploying a RESTful Spring Boot Microservice on Kubernetes

    Deploying a RESTful Spring Boot Microservice on Kubernetes

    RESTful Microservices with Spring Boot and Kubernetes

    RESTful Microservices with Spring Boot and Kubernetes

    RESTful API Gateway with gRPC

    RESTful API Gateway with gRPC

  • gRPC
    gRPC Bidirectional Streaming with Code Example

    gRPC Bidirectional Streaming with Code Example

    gRPC Client Streaming

    gRPC Client Streaming

    gRPC Interceptor: unary interceptor with code example

    gRPC Interceptor: unary interceptor with code example

    gRPC: synchronous and asynchronous Server streaming RPC

    gRPC: synchronous and asynchronous Server streaming RPC

    Photo by Ramón Salinero on Unsplash

    gRPC: synchronous and asynchronous unary RPC in Java

    Microservices inter-process communication using gRPC

    gRPC for microservices communication

  • GraphQL
    GraphQL Error Handling

    GraphQL Error Handling

    Pages

    Spring for GraphQL: Pagination with Code Example

    Spring for GraphQL: Interfaces and Unions

    GraphQL Directive

    GraphQL Directive

    Spring for GraphQL mutation

    Spring for GraphQL: Mutation

    Spring for GraphQL: How to solve the N+1 Problem?

    Spring for GraphQL: How to solve the N+1 Problem?

    Spring GraphQL with @Controller, @SchemaMapping and @QueryMapping

    Spring for GraphQL : @SchemaMapping and @QueryMapping

    Spring Boot GraphQL service

    Getting started with Spring Boot GraphQL service

  • Kubernetes
    Deploying a RESTful Spring Boot Microservice on Kubernetes

    Deploying a RESTful Spring Boot Microservice on Kubernetes

    Components of Kubernetes Architecture

    Components of Kubernetes Architecture

    Helm Chart: quick start your app deployment on Kubernetes

    Helm Chart: quick start your app deployment on Kubernetes

    gRPC load balancing on Kubernetes (using Headless Service)

    gRPC load balancing on Kubernetes (using Headless Service)

    Getting started with Kind: quick start a multi-node local Kubernetes cluster

    Getting started with Kind: quick start a multi-node local Kubernetes cluster

    Getting started with Minikube: deploying application on local Kubernetes cluster

    Getting started with Minikube: deploying application on local Kubernetes cluster

  • Java
    Java Streams: Stream Operation with Examples

    Java Streams: Stream Operation with Examples

    Java Streams: stream creation with examples

    Java Streams: stream creation with examples

    Garbage Collection

    Super Fast Garbage Collectors in Java

    Calculus

    Functional Programming in Java

No Result
View All Result
  • Login
  • Microservices
    • All
    • GraphQL
    • gRPC
    • Spring Boot
    GraphQL Error Handling

    GraphQL Error Handling

    Pages

    Spring for GraphQL: Pagination with Code Example

    Spring for GraphQL: Interfaces and Unions

    gRPC Bidirectional Streaming with Code Example

    gRPC Bidirectional Streaming with Code Example

    gRPC Client Streaming

    gRPC Client Streaming

    Distributed transaction in microservices using Saga

    Distributed Transactions in Microservices: implementing Saga with Temporal

  • Spring Boot
    Pages

    Spring for GraphQL: Pagination with Code Example

    Spring Boot GraphQL service

    Getting started with Spring Boot GraphQL service

    Deploying a RESTful Spring Boot Microservice on Kubernetes

    Deploying a RESTful Spring Boot Microservice on Kubernetes

    RESTful Microservices with Spring Boot and Kubernetes

    RESTful Microservices with Spring Boot and Kubernetes

    RESTful API Gateway with gRPC

    RESTful API Gateway with gRPC

  • gRPC
    gRPC Bidirectional Streaming with Code Example

    gRPC Bidirectional Streaming with Code Example

    gRPC Client Streaming

    gRPC Client Streaming

    gRPC Interceptor: unary interceptor with code example

    gRPC Interceptor: unary interceptor with code example

    gRPC: synchronous and asynchronous Server streaming RPC

    gRPC: synchronous and asynchronous Server streaming RPC

    Photo by Ramón Salinero on Unsplash

    gRPC: synchronous and asynchronous unary RPC in Java

    Microservices inter-process communication using gRPC

    gRPC for microservices communication

  • GraphQL
    GraphQL Error Handling

    GraphQL Error Handling

    Pages

    Spring for GraphQL: Pagination with Code Example

    Spring for GraphQL: Interfaces and Unions

    GraphQL Directive

    GraphQL Directive

    Spring for GraphQL mutation

    Spring for GraphQL: Mutation

    Spring for GraphQL: How to solve the N+1 Problem?

    Spring for GraphQL: How to solve the N+1 Problem?

    Spring GraphQL with @Controller, @SchemaMapping and @QueryMapping

    Spring for GraphQL : @SchemaMapping and @QueryMapping

    Spring Boot GraphQL service

    Getting started with Spring Boot GraphQL service

  • Kubernetes
    Deploying a RESTful Spring Boot Microservice on Kubernetes

    Deploying a RESTful Spring Boot Microservice on Kubernetes

    Components of Kubernetes Architecture

    Components of Kubernetes Architecture

    Helm Chart: quick start your app deployment on Kubernetes

    Helm Chart: quick start your app deployment on Kubernetes

    gRPC load balancing on Kubernetes (using Headless Service)

    gRPC load balancing on Kubernetes (using Headless Service)

    Getting started with Kind: quick start a multi-node local Kubernetes cluster

    Getting started with Kind: quick start a multi-node local Kubernetes cluster

    Getting started with Minikube: deploying application on local Kubernetes cluster

    Getting started with Minikube: deploying application on local Kubernetes cluster

  • Java
    Java Streams: Stream Operation with Examples

    Java Streams: Stream Operation with Examples

    Java Streams: stream creation with examples

    Java Streams: stream creation with examples

    Garbage Collection

    Super Fast Garbage Collectors in Java

    Calculus

    Functional Programming in Java

No Result
View All Result
Techdozo
No Result
View All Result
Home Microservices gRPC

gRPC Bidirectional Streaming with Code Example

Pankaj by Pankaj
February 17, 2023
in gRPC, Microservices
Reading Time: 13 mins read
0
A A
0
gRPC Bidirectional Streaming with Code Example
ADVERTISEMENT

The gRPC bidirectional streaming RPC allows a client to send a stream of messages to a server, and the server responds with a stream of messages. With bidirectional streaming, the client sends multiple messages to the server, and the server can choose to respond immediately, as soon as it receives metadata, or wait for the client to start streaming. The client and server can read and write messages in any order, as the two streams are independent.

This article will explain how to implement gRPC bidirectional streaming RPC in Java.

ADVERTISEMENT

In this article

  • What is gRPC Bidirectional Streaming RPC?
  • Bidirectional Streaming Use Cases
  • Bidirectional Streaming Issues
  • Bidirectional Streaming Service Definition
  • Implementing Bidirectional Streaming Service
  • Generating Client and Server stub
  • Implementing gRPC Server
    • Implementing Service Definition
    • Registering Service
  • Implementing gRPC Client Stub
  • Code Example
  • Summary

Let’s get started!

Want to know more about the gRPC? Check other posts:

  • gRPC: synchronous and asynchronous unary RPC in Java
  • gRPC: synchronous and asynchronous Server streaming RPC
  • gRPC Interceptor: unary interceptor with code example
  • gRPC for microservices communication
  • gRPC Client Streaming

What is gRPC Bidirectional Streaming RPC?

In the bidirectional streaming RPC, a gRPC client sends a stream of messages, and the gRPC server responds with a stream of messages. The client can send a stream of messages and, optionally, a signal to notify the server about the end of the stream. The client sends this signal as an onComplete()method call (more about this later). Similarly, the server can send a stream of messages and, optionally, a signal about the end of the stream.

It’s important to note that the client and server streams are completely independent. Therefore, it is completely up to the client and server to decide their communication pattern once the initial connection is established. Also, streams of messages are ordered, i.e. messages sent from the client or server are received in the same order.

gRPC bidirectional Streaming
gRPC bidirectional Streaming

Bidirectional Streaming Use Cases

You can use bidirectional streaming for advanced use cases, such as:

ADVERTISEMENT
  • For full duplex communication between server and client.
  • Bulk upload, where a client sends chunks of data, and the server responds with an acknowledgement or resumption key.
  • Application level flow control where a client or server can ask for more data as and when it needs.

Bidirectional Streaming Issues

The bidirectional streaming RPC is a powerful feature allowing full duplex communication between the client and server. However, it requires careful handling to avoid issues such as race conditions or deadlocks. Additionally, bidirectional streaming may not be necessary for simpler use cases.

Bidirectional Streaming Service Definition

In gRPC, a bidirectional streaming service is defined with stream keyword in both request and response as :


rpc BidirectionalRPC(stream Request) returns (stream Response);

Whereas other RPCs are defined as:


rpc UnaryRPC(Request) returns (Response);
rpc ClientStreamingRPC(stream Request) returns (Response);
rpc ServerStreamingRPC(Request) returns (stream Response);

Implementing a Bidirectional Streaming Service

Let’s consider a use case of a ride-hailing app. In our example use case, when a driver starts a trip, the client application (mobile app used by the driver) sends the real-time location to the server. The server periodically responds by sending an estimated charge, distance covered and time to complete the trip. When the trip finishes, the server summarises and sends the total time and distance covered and the final charge.

For the above use case, we can define the gRPC service as:


syntax = "proto3";
package ride_sharing.trip;

option java_package = "dev.techdozo.ride_sharing.trip";

message TripDataRequest {
  RideType ride_type = 1;
  double latitude = 2;
  double longitude = 3;
}

message RideType {
  string driver_id = 1;
  string ride_id = 2;
}

message TripSummaryResponse {
  double distance = 1;
  double charge = 2;
  double time = 3;
}

service TripService {
  rpc SendTripData(stream TripDataRequest) returns (stream TripSummaryResponse);
}


Generating Client and Server stub

For the above protobuf service definition, we can generate client and server stubs using the potoc compiler. The protoc compiler supports code generation in many different languages.

For our example, we will use Protobuf Gradle Plugin to generate source code in Java. The protocol buffer plugin assembles the Protobuf Compiler (protoc) command line and uses it to generate Java source files from the proto files. The generated java source files should be added to the sourceSet , in the build.gradle, so they can be packaged along with Java classes.


sourceSets {
    main {
        java {
            srcDirs 'build/generated/source/proto/main/grpc'
            srcDirs 'build/generated/source/proto/main/java'
        }
    }
}


Running the command gradlew build generates source code in the directory build/generated/source/proto/main/grpc and build/generated/source/proto/main/java.

Implementing gRPC Server

The gRPC server implements services defined in the proto files and exposes those as RPC API.

In our case, the gRPC server has one RPC method rpc SendTripData(stream TripDataRequest) returns (stream TripSummaryResponse). Let’s, see how to implement server-side gRPC API for bidirectional streaming.

Implementing Service Definition

The bidirectional streaming server implementation is not very different from the client streaming implementation.

Similar to the client streaming implementation, we need to override sendTripData method in the autogenerated abstract class TripServiceGrpc.TripServiceImplBase and return StreamObserver object as:


public class RideSharingAPI extends TripServiceGrpc.TripServiceImplBase {

  @Override
  public StreamObserver<TripDataRequest> sendTripData(
      StreamObserver<TripSummaryResponse> responseObserver) {

    return new StreamObserver<TripDataRequest>() {
      private String rideId;

      @Override
      public void onNext(TripDataRequest request) {
        // called when a client sends a message
        //....
      }

      @Override
      public void onError(Throwable t) {
        // called when a client sends an error
        //....
      }

      @Override
      public void onCompleted() {
        // called when the client completes
        //....
      }
    };
  }
}


In the above code,

  1. The StreamObserver’s onNext method is called every time client makes a streaming request. In this call, we can store the streaming data in the database for trip summary calculation and send the response back to the client with an estimated distance and charge.
  2. The StreamObserver’s onError method is called when a client sends an error.
  3. The StreamObserver’s onCompleted method is called when a client completes the streaming call by calling tripDataRequestStreamObserver.onCompleted(). In this method, the server sends trip summary data to the client.

Server Implementation

public class RideSharingAPI extends TripServiceGrpc.TripServiceImplBase {

  public static final int ESTIMATED = 1;
  public static final int FINAL = 2;
  private final TripRepository tripRepository;
  private final TripSummaryService tripSummaryService;

  @Override
  public StreamObserver<TripDataRequest> sendTripData(
      StreamObserver<TripSummaryResponse> responseObserver) {

    return new StreamObserver<TripDataRequest>() {
      private String rideId;

      @Override
      public void onNext(TripDataRequest request) {
        // Save Ride data
        tripRepository.saveTripData(
            new RideData(
                request.getRideType().getDriverId(),
                request.getRideType().getRideId(),
                request.getLatitude(),
                request.getLongitude()));
        
        //Send a response to the client after every call
        //We may decide to batch the server response
        this.rideId = request.getRideType().getRideId();
        var tripSummary = tripSummaryService.getTripSummary(rideId);
        responseObserver.onNext(
                TripSummaryResponse.newBuilder()
                        .setDistance(tripSummary.getDistance())
                        .setCharge(tripSummary.getCharge())
                        .setTime((int) tripSummary.getTime())
                        .setStatus(ESTIMATED)
                        .build());

      }

      @Override
      public void onError(Throwable t) {
        log.error("Error while processing request ");
      }

      @Override
      public void onCompleted() {
        // Once Trip is completed then generate Trip summary
        var tripSummary = tripSummaryService.getTripSummary(rideId);
        responseObserver.onNext(
                TripSummaryResponse.newBuilder()
                        .setDistance(tripSummary.getDistance())
                        .setCharge(tripSummary.getCharge())
                        .setTime((int) tripSummary.getTime())
                        .setStatus(FINAL)
                        .build());
        responseObserver.onCompleted();
        log.info("Request completed");
      }
    };
  }
}


Registering Service

To expose the RPC service RideSharingAPI, we need to create a gRPC server instance and register the service by calling the addService method. The server listens to the specified port and dispatches all requests to the relevant service.


public RideSharingServer(int port) {
  this.port = port;
  var rideSharingAPI = new RideSharingAPI(...);
  this.server = ServerBuilder.forPort(port).addService(rideSharingAPI).build();
}


Implementing gRPC Client Stub

The first thing we need to do to implement a gRPC client is to generate client stubs using the proto files and then create a gRPC channel specifying the server address and port as ManagedChannelBuilder.forAddress(host, port).usePlaintext().build().

The channel represents a virtual connection to an endpoint to perform RPC.

We can create the client stub from the managed channel as:


var managedChannel = ManagedChannelBuilder.forAddress(host, port)
              .usePlaintext().build();
 TripServiceGrpc.TripServiceStub tripServiceStub = TripServiceGrpc
              .newStub(managedChannel);


Similar to client streaming, the bidirectional streaming RPC is always async. So, we need to create an async stub as TripServiceGrpc.newStub(managedChannel).


public class RideSharingClient {
  private final String host;
  private final int port;

  public RideSharingClient(String host, int port) {
    this.host = host;
    this.port = port;
  }

  public void callServer() {

    log.info("Calling Server..");
    var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
    TripServiceGrpc.TripServiceStub tripServiceStub = TripServiceGrpc.newStub(managedChannel);
    //...
    //...

  }

}


Then, we can register a callback as:


StreamObserver<Service.TripDataRequest> tripDataRequestStreamObserver =
        tripServiceStub.sendTripData(new TripSummaryCallback());


TripSummaryCallback is of type StreamObserver, which has three methods – onNext, onError and onCompleted. The callback methods are called when the server needs to send some data. For example, the onNext method is called when the server calls the responseObserver.onNext() method.


TripSummaryCallback Implementation

private static class TripSummaryCallback implements StreamObserver<TripSummaryResponse> {

  @Override
  public void onNext(TripSummaryResponse tripSummaryResponse) {
    DecimalFormat df = new DecimalFormat("0.00");
    log.info(
        "Trip Summary : distance {}, charge {}, time {} ",
        df.format(tripSummaryResponse.getDistance()),
        df.format(tripSummaryResponse.getCharge()),
        df.format((double) tripSummaryResponse.getTime() / 60));
  }

  @Override
  public void onError(Throwable cause) {
    log.error("Error occurred, cause {}", cause.getMessage());
  }

  @Override
  public void onCompleted() {
    log.info("Stream completed");
  }
}


In the above code,

  1. The onNext method of this TripSummaryCallback is called once the server sends TripSummary via the call-back method responseObserver.onNext().
  2. The onError method is called when the server sends an error. For example, the server can throw an error if it detects the wrong latitude or longitude values.
  3. In the case of client streaming, the onCompleted method is called when the server calls responseObserver.onCompleted() .

Client Implementation

public void callServer() {

  log.info("Calling Server..");
  var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
  TripServiceGrpc.TripServiceStub tripServiceStub = TripServiceGrpc.newStub(managedChannel);

  StreamObserver<Service.TripDataRequest> tripDataRequestStreamObserver =
      tripServiceStub.sendTripData(new TripSummaryCallback());

  // Create stream of random 1000 calls with random lat and long, with delay of 1 sec

  IntStream.range(0, 100)
      .mapToObj(
          n -> {
            try {
              Thread.sleep(100);
            } catch (InterruptedException e) {
              throw new RuntimeException(e);
            }
            return Service.TripDataRequest.newBuilder()
                .setRideType(
                    Service.RideType.newBuilder()
                        .setDriverId("Driver_1")
                        .setRideId("Ride_1")
                        .build())
                .setLatitude(ThreadLocalRandom.current().nextDouble(-90, 90))
                .setLongitude(ThreadLocalRandom.current().nextDouble(-180, 180))
                .build();
          })
      .forEach(tripDataRequestStreamObserver::onNext);

  log.info("Calling complete..");
  tripDataRequestStreamObserver.onCompleted();
  Thread.sleep(30000);
}

private static class TripSummaryCallback implements StreamObserver<TripSummaryResponse> {

  @Override
  public void onNext(TripSummaryResponse tripSummaryResponse) {
    DecimalFormat df = new DecimalFormat("0.00");
    log.info(
        "Trip Summary : distance {}, charge {}, time {} ",
        df.format(tripSummaryResponse.getDistance()),
        df.format(tripSummaryResponse.getCharge()),
        df.format((double) tripSummaryResponse.getTime() / 60));
  }

  @Override
  public void onError(Throwable cause) {
    log.error("Error occurred, cause {}", cause.getMessage());
  }

  @Override
  public void onCompleted() {
    log.info("Stream completed");
  }
}


In the above code, IntStream.range(0, 100) is used to simulate 100 calls to the server with random latitude and longitude values with a delay of 100 ms between each request. In the end, the client calls tripDataRequestStreamObserver.onCompleted() to indicate to the server that the stream has been completed. We have added Thread.sleep(30000) to allow the server to respond otherwise, the program will exit before the server send any response.

When you run the application from IDE, you’ll see logs like:

Code Example

The working code example of this article is listed on GitHub . To run the example, clone the repository and import grpc-bidirectonal-streaming-rpc as a project in your favourite IDE as a Gradle project.

Summary

Bidirectional streaming is a powerful feature of GRPC that enables the client and server to send and receive multiple streams of data simultaneously. It provides a simple and efficient way to implement real-time, interactive applications requiring continuous client and server communication.

Tags: grpcmicroservices
Previous Post

gRPC Client Streaming

Next Post

Spring for GraphQL: Interfaces and Unions

Pankaj

Pankaj

Software Architect @ Schlumberger ``` Cloud | Microservices | Programming | Kubernetes | Architecture | Machine Learning | Java | Python ```

Related Posts

GraphQL Error Handling
GraphQL

GraphQL Error Handling

August 9, 2023
Pages
GraphQL

Spring for GraphQL: Pagination with Code Example

July 26, 2023
GraphQL

Spring for GraphQL: Interfaces and Unions

May 17, 2023
gRPC Client Streaming
gRPC

gRPC Client Streaming

January 20, 2023

Recent Articles

GraphQL Error Handling

GraphQL Error Handling

August 9, 2023
Pages

Spring for GraphQL: Pagination with Code Example

July 26, 2023

Spring for GraphQL: Interfaces and Unions

May 17, 2023
gRPC Bidirectional Streaming with Code Example

gRPC Bidirectional Streaming with Code Example

February 17, 2023
  • Trending
  • Comments
  • Latest
gRPC Bidirectional Streaming with Code Example

gRPC Bidirectional Streaming with Code Example

February 17, 2023
Deploying a RESTful Spring Boot Microservice on Kubernetes

Deploying a RESTful Spring Boot Microservice on Kubernetes

August 18, 2021
gRPC Interceptor: unary interceptor with code example

gRPC Interceptor: unary interceptor with code example

April 30, 2022
Temporal Workflow Orchestration

Workflow Orchestration with Temporal and Spring Boot

October 29, 2022
Calculus

Functional Programming in Java

0
Java Streams: stream creation with examples

Java Streams: stream creation with examples

0
Garbage Collection

Super Fast Garbage Collectors in Java

0
Java Streams: Stream Operation with Examples

Java Streams: Stream Operation with Examples

0
GraphQL Error Handling

GraphQL Error Handling

August 9, 2023
Pages

Spring for GraphQL: Pagination with Code Example

July 26, 2023

Spring for GraphQL: Interfaces and Unions

May 17, 2023
gRPC Bidirectional Streaming with Code Example

gRPC Bidirectional Streaming with Code Example

February 17, 2023
Facebook Twitter Pinterest

TECHDOZO

Simplifying modern tech stack!

Browse by Category

  • Bitesize
  • GraphQL
  • gRPC
  • Java
  • Kubernetes
  • Microservices
  • Spring Boot

Recent Articles

GraphQL Error Handling

GraphQL Error Handling

August 9, 2023
Pages

Spring for GraphQL: Pagination with Code Example

July 26, 2023

© 2023 Techdozo.

Welcome Back!

Sign In with Google
OR

Login to your account below

Forgotten Password?

Retrieve your password

Please enter your username or email address to reset your password.

Log In
No Result
View All Result
  • Home
  • gRPC
  • Kubernetes
  • Microservices
  • GraphQL

© 2023 Techdozo.