Techdozo
  • Microservices
    • All
    • GraphQL
    • gRPC
    • Spring Boot
    GraphQL Error Handling

    GraphQL Error Handling

    Pages

    Spring for GraphQL: Pagination with Code Example

    Spring for GraphQL: Interfaces and Unions

    gRPC Bidirectional Streaming with Code Example

    gRPC Bidirectional Streaming with Code Example

    gRPC Client Streaming

    gRPC Client Streaming

    Distributed transaction in microservices using Saga

    Distributed Transactions in Microservices: implementing Saga with Temporal

  • Spring Boot
    Pages

    Spring for GraphQL: Pagination with Code Example

    Spring Boot GraphQL service

    Getting started with Spring Boot GraphQL service

    Deploying a RESTful Spring Boot Microservice on Kubernetes

    Deploying a RESTful Spring Boot Microservice on Kubernetes

    RESTful Microservices with Spring Boot and Kubernetes

    RESTful Microservices with Spring Boot and Kubernetes

    RESTful API Gateway with gRPC

    RESTful API Gateway with gRPC

  • gRPC
    gRPC Bidirectional Streaming with Code Example

    gRPC Bidirectional Streaming with Code Example

    gRPC Client Streaming

    gRPC Client Streaming

    gRPC Interceptor: unary interceptor with code example

    gRPC Interceptor: unary interceptor with code example

    gRPC: synchronous and asynchronous Server streaming RPC

    gRPC: synchronous and asynchronous Server streaming RPC

    Photo by Ramón Salinero on Unsplash

    gRPC: synchronous and asynchronous unary RPC in Java

    Microservices inter-process communication using gRPC

    gRPC for microservices communication

  • GraphQL
    GraphQL Error Handling

    GraphQL Error Handling

    Pages

    Spring for GraphQL: Pagination with Code Example

    Spring for GraphQL: Interfaces and Unions

    GraphQL Directive

    GraphQL Directive

    Spring for GraphQL mutation

    Spring for GraphQL: Mutation

    Spring for GraphQL: How to solve the N+1 Problem?

    Spring for GraphQL: How to solve the N+1 Problem?

    Spring GraphQL with @Controller, @SchemaMapping and @QueryMapping

    Spring for GraphQL : @SchemaMapping and @QueryMapping

    Spring Boot GraphQL service

    Getting started with Spring Boot GraphQL service

  • Kubernetes
    Deploying a RESTful Spring Boot Microservice on Kubernetes

    Deploying a RESTful Spring Boot Microservice on Kubernetes

    Components of Kubernetes Architecture

    Components of Kubernetes Architecture

    Helm Chart: quick start your app deployment on Kubernetes

    Helm Chart: quick start your app deployment on Kubernetes

    gRPC load balancing on Kubernetes (using Headless Service)

    gRPC load balancing on Kubernetes (using Headless Service)

    Getting started with Kind: quick start a multi-node local Kubernetes cluster

    Getting started with Kind: quick start a multi-node local Kubernetes cluster

    Getting started with Minikube: deploying application on local Kubernetes cluster

    Getting started with Minikube: deploying application on local Kubernetes cluster

  • Java
    Java Streams: Stream Operation with Examples

    Java Streams: Stream Operation with Examples

    Java Streams: stream creation with examples

    Java Streams: stream creation with examples

    Garbage Collection

    Super Fast Garbage Collectors in Java

    Calculus

    Functional Programming in Java

No Result
View All Result
  • Login
  • Microservices
    • All
    • GraphQL
    • gRPC
    • Spring Boot
    GraphQL Error Handling

    GraphQL Error Handling

    Pages

    Spring for GraphQL: Pagination with Code Example

    Spring for GraphQL: Interfaces and Unions

    gRPC Bidirectional Streaming with Code Example

    gRPC Bidirectional Streaming with Code Example

    gRPC Client Streaming

    gRPC Client Streaming

    Distributed transaction in microservices using Saga

    Distributed Transactions in Microservices: implementing Saga with Temporal

  • Spring Boot
    Pages

    Spring for GraphQL: Pagination with Code Example

    Spring Boot GraphQL service

    Getting started with Spring Boot GraphQL service

    Deploying a RESTful Spring Boot Microservice on Kubernetes

    Deploying a RESTful Spring Boot Microservice on Kubernetes

    RESTful Microservices with Spring Boot and Kubernetes

    RESTful Microservices with Spring Boot and Kubernetes

    RESTful API Gateway with gRPC

    RESTful API Gateway with gRPC

  • gRPC
    gRPC Bidirectional Streaming with Code Example

    gRPC Bidirectional Streaming with Code Example

    gRPC Client Streaming

    gRPC Client Streaming

    gRPC Interceptor: unary interceptor with code example

    gRPC Interceptor: unary interceptor with code example

    gRPC: synchronous and asynchronous Server streaming RPC

    gRPC: synchronous and asynchronous Server streaming RPC

    Photo by Ramón Salinero on Unsplash

    gRPC: synchronous and asynchronous unary RPC in Java

    Microservices inter-process communication using gRPC

    gRPC for microservices communication

  • GraphQL
    GraphQL Error Handling

    GraphQL Error Handling

    Pages

    Spring for GraphQL: Pagination with Code Example

    Spring for GraphQL: Interfaces and Unions

    GraphQL Directive

    GraphQL Directive

    Spring for GraphQL mutation

    Spring for GraphQL: Mutation

    Spring for GraphQL: How to solve the N+1 Problem?

    Spring for GraphQL: How to solve the N+1 Problem?

    Spring GraphQL with @Controller, @SchemaMapping and @QueryMapping

    Spring for GraphQL : @SchemaMapping and @QueryMapping

    Spring Boot GraphQL service

    Getting started with Spring Boot GraphQL service

  • Kubernetes
    Deploying a RESTful Spring Boot Microservice on Kubernetes

    Deploying a RESTful Spring Boot Microservice on Kubernetes

    Components of Kubernetes Architecture

    Components of Kubernetes Architecture

    Helm Chart: quick start your app deployment on Kubernetes

    Helm Chart: quick start your app deployment on Kubernetes

    gRPC load balancing on Kubernetes (using Headless Service)

    gRPC load balancing on Kubernetes (using Headless Service)

    Getting started with Kind: quick start a multi-node local Kubernetes cluster

    Getting started with Kind: quick start a multi-node local Kubernetes cluster

    Getting started with Minikube: deploying application on local Kubernetes cluster

    Getting started with Minikube: deploying application on local Kubernetes cluster

  • Java
    Java Streams: Stream Operation with Examples

    Java Streams: Stream Operation with Examples

    Java Streams: stream creation with examples

    Java Streams: stream creation with examples

    Garbage Collection

    Super Fast Garbage Collectors in Java

    Calculus

    Functional Programming in Java

No Result
View All Result
Techdozo
No Result
View All Result
Home Microservices gRPC

gRPC Client Streaming

Pankaj by Pankaj
January 20, 2023
in gRPC, Microservices
Reading Time: 14 mins read
0
A A
0
gRPC Client Streaming
ADVERTISEMENT

While REST is the de facto standard for public API, gRPC is increasingly becoming one of the most popular choices for inter-process microservices communication. gRPC is a strongly typed, polyglot, efficient, and fast RPC framework that supports unary (request and reply) and streaming (both client and server-side) communication patterns.

The gRPC client streaming RPC allows a client to send a stream of messages to a server. This is different from the unary RPC, where the client sends a single request, and the server sends back a single response. With client streaming, the client sends multiple messages to the server, and the server can process them as they arrive before sending a single response back to the client. This is useful for scenarios where the client has a lot of data to send to the server and wants to stream it in chunks rather than sending it all at once.

ADVERTISEMENT

In this article

  • What is gRPC Client Streaming RPC?
  • Client Streaming Use Cases
  • Client Streaming Service Definition
  • Code Generation
  • gRPC Server
    • Implementing Service Definition
    • Registering Service
  • Implementing gRPC Client Stub
  • Code Example
  • Summary

In this article, we will understand how to implement gRPC client streaming RPC in Java.

Let’s get started!

Want to know more about the gRPC? Check other posts:

  • gRPC: synchronous and asynchronous unary RPC in Java
  • gRPC: synchronous and asynchronous Server streaming RPC
  • gRPC Interceptor: unary interceptor with code example
  • gRPC for microservices communication

What is gRPC Client Streaming RPC?

In the client streaming RPC, a gRPC client sends a stream of messages, and the gRPC server responds with a single message. The streams of messages can be followed by a signal to notify the server about the end of the stream. The client sends this signal as an onComplete()method call (more about this later).

gRPC Client Streaming

Client Streaming Use Cases

The gRPC client streaming can be used to address all streaming scenarios, such as:

  1. Real-time location data sharing.
  2. Multiplayer games.
  3. When a client needs to send lots of data to the server.
  4. Real-time driver location in ride-hailing apps.

The gRPC client streaming is used by Lyft to share real-time driver locations with the user.

ADVERTISEMENT

Client Streaming Service Definition

Let’s consider a use case of a ride-hailing app. In this example, as soon as the driver starts the trip, the client application (mobile app used by the driver) sends the real-time location to the server. When the trip finishes, the server summarises and sends the total distance covered and the charge.

For the above use case, we can define the gRPC service with stream keyword in request as:


syntax = "proto3";
package ride_sharing.trip;

option java_package = "dev.techdozo.ride_sharing.trip";

message TripDataRequest {
  RideType ride_type = 1;
  double latitude = 2;
  double longitude = 3;
}

message RideType {
  string driver_id = 1;
  string ride_id = 2;
}

message TripSummaryResponse {
  double distance = 1;
  double charge = 2;
}

service TripService {
  rpc SendTripData(stream TripDataRequest) returns (TripSummaryResponse);
}

Code Generation

For the above protobuf service definition, we can generate client and server stubs using the potoc compiler. The protoc compiler supports code generation in many different languages.

For our example, we will use Protobuf Gradle Plugin to generate source code in Java. The protocol buffer plugin assembles the Protobuf Compiler (protoc) command line and uses it to generate Java source files from the proto files. The generated java source files should be added to the sourceSet , in the build.gradle, so they can be compiled along with Java classes.


sourceSets {
    main {
        java {
            srcDirs 'build/generated/source/proto/main/grpc'
            srcDirs 'build/generated/source/proto/main/java'
        }
    }
}


Running the command gradlew build generates source code in the directory build/generated/source/proto/main/grpc and build/generated/source/proto/main/java.

gRPC Server

The gRPC server implements services defined in the proto files and expose those as RPC API.

In our case, the gRPC server has one RPC method rpc SendTripData(stream TripDataRequest) returns (TripSummaryResponse). Let’s, see how to implement server-side service gRPC API for client streaming.

Implementing Service Definition

Once the server stub is generated, we need to override sendTripData method in the autogenerated abstract class TripServiceGrpc.TripServiceImplBase and return StreamObserver object as:


public class RideSharingAPI extends TripServiceGrpc.TripServiceImplBase {

  @Override
  public StreamObserver<TripDataRequest> sendTripData(
      StreamObserver<TripSummaryResponse> responseObserver) {

    return new StreamObserver<TripDataRequest>() {
      private String rideId;

      @Override
      public void onNext(TripDataRequest request) {
        // called when a client sends a message
        //....
      }

      @Override
      public void onError(Throwable t) {
        // called when a client sends an error
        //....
      }

      @Override
      public void onCompleted() {
        // called when client completes 
        //....
      }
    };
  }
}

In the above code,

  1. The StreamObserver’s onNext method is called every time client makes a streaming request. Here, we can store the streaming data in the database for trip summary calculation.
  2. The StreamObserver’s onError method is when a client sends an error.
  3. The StreamObserver’s onCompleted method is when a client completes the streaming call by calling tripDataRequestStreamObserver.onCompleted(). In this method, the server sends trip summary data to the client.

Server Implementation

public class RideSharingAPI extends TripServiceGrpc.TripServiceImplBase {

  private final TripRepository tripRepository;
  private final TripSummaryService tripSummaryService;

  @Override
  public StreamObserver<TripDataRequest> sendTripData(
      StreamObserver<TripSummaryResponse> responseObserver) {

    return new StreamObserver<TripDataRequest>() {
      private String rideId;

      @Override
      public void onNext(TripDataRequest request) {
        // Save Ride data
        tripRepository.saveTripData(
            new RideData(
                request.getRideType().getDriverId(),
                request.getRideType().getRideId(),
                request.getLatitude(),
                request.getLongitude()));
        log.info("Driver Id {} - object {}", request.getRideType().getDriverId(), this);
        this.rideId = request.getRideType().getRideId();
      }

      @Override
      public void onError(Throwable t) {
        log.error("Error while processing request ");
      }

      @Override
      public void onCompleted() {
        // Once the trip is completed then, generate trip summary
        var tripSummary = tripSummaryService.getTripSummary(rideId);
        responseObserver.onNext(
            TripSummaryResponse.newBuilder()
                .setDistance(tripSummary.getDistance())
                .setCharge(tripSummary.getCharge())
                .build());
        log.info("Request completed");
      }
    };
  }
}


Registering Service

To expose the RPC service RideSharingAPI, we need to create a gRPC server instance and register the service by calling the addService method. The server listens to the specified port and dispatches all requests to the relevant service.


public RideSharingServer(int port) {
  this.port = port;
  var rideSharingAPI = new RideSharingAPI(...);
  this.server = ServerBuilder.forPort(port).addService(rideSharingAPI).build();
}

Implementing gRPC Client Stub

The first thing we need to do to implement a gRPC client is to generate client stubs using the proto files and then create a gRPC channel specifying the server address and port as ManagedChannelBuilder.forAddress(host, port).usePlaintext().build().

The channel represents a virtual connection to an endpoint to perform RPC.

You can create the client stub using the newly created channel:


var managedChannel = ManagedChannelBuilder.forAddress(host, port)
              .usePlaintext().build();
 TripServiceGrpc.TripServiceStub tripServiceStub = TripServiceGrpc
              .newStub(managedChannel);

There are two types of client stubs:

  • Blocking: The BlockingStub, which waits until it receives a server response.
  • Non-Blocking: The NonBlockingStub doesn’t wait for a server response but instead registers an observer to receive the response.

As the client streaming API is always async, we need to create an async stub as TripServiceGrpc.newStub(managedChannel).


public class RideSharingClient {
  private final String host;
  private final int port;

  public RideSharingClient(String host, int port) {
    this.host = host;
    this.port = port;
  }

  public void callServer() {

    log.info("Calling Server..");
    var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
    TripServiceGrpc.TripServiceStub tripServiceStub = TripServiceGrpc.newStub(managedChannel);
    //...
    //...

  }

}

Similar to the unary asynchronous client stub, we can register a callback as:


StreamObserver<Service.TripDataRequest> tripDataRequestStreamObserver =
        tripServiceStub.sendTripData(new TripSummaryCallback());

TripSummaryCallback is of type StreamObserver, which has three methods – onNext, onError and onCompleted. The callback methods are called when the server needs to send some data. For example, the onNext method is called when the server calls the onCompleted method.


TripSummaryCallback Implementation

private static class TripSummaryCallback implements StreamObserver<TripSummaryResponse> {

  @Override
  public void onNext(TripSummaryResponse tripSummaryResponse) {
    log.info(
        "Trip Summary : distance {}, charge {} ",
        tripSummaryResponse.getDistance(),
        tripSummaryResponse.getCharge());
  }

  @Override
  public void onError(Throwable cause) {
    log.error("Error occurred, cause {}", cause.getMessage());
  }

  @Override
  public void onCompleted() {
    log.info("Stream completed");
  }
}

In the above code,

  1. The onNext method of this TripSummaryCallback is called once the server sends TripSummary via the call-back method onCompleted.
  2. The onError method is called when the server sends an error. For example, the server can throw an error if it detects the wrong latitude or longitude values.
  3. In the case of client streaming, the onCompleted method is never called. As the same API is used for bidirectional and client streaming, we can leave onCompleted unimplemented.

Client Implementation

public void callServer() {

  log.info("Calling Server..");
  var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
  TripServiceGrpc.TripServiceStub tripServiceStub = TripServiceGrpc.newStub(managedChannel);

  StreamObserver<Service.TripDataRequest> tripDataRequestStreamObserver =
      tripServiceStub.sendTripData(new TripSummaryCallback());

  IntStream.range(0, 1000)
      .mapToObj(
          n ->
              Service.TripDataRequest.newBuilder()
                  .setRideType(
                      Service.RideType.newBuilder()
                          .setDriverId("Driver_1")
                          .setRideId("Ride_" + n)
                          .build())
                  .setLatitude(ThreadLocalRandom.current().nextDouble(-90, 90))
                  .setLongitude(ThreadLocalRandom.current().nextDouble(-180, 180))
                  .build())
      .forEach(tripDataRequestStreamObserver::onNext);

  log.info("Calling complete..");
  tripDataRequestStreamObserver.onCompleted();
  Thread.sleep(30000);
}

  1. We have used IntStream.range(0, 1000)...forEach(tripDataRequestStreamObserver::onNext) to demonstrate the client streaming scenario. In real life, this call will be made by a mobile client by reading GPS data.
  2. Once the trip completes, the client indicates to the server by calling tripDataRequestStreamObserver.onCompleted(). Then the server computes the trip summary and calls the callback method onCompleted, and the client receives the trip summary in onNext method of TripSummaryCallback.
  3. Thread.sleep(30000) so the program doesn’t exit before the client receives the callback.

Code Example

The working code example of this article is listed on GitHub . To run the example, clone the repository and import grpc-client-streaming as a project in your favourite IDE as a Gradle project.

Summary

gRPC has built-in streaming support. The gRPC client streaming can be used for many use cases, such as real-time location data sharing, multiplayer games, when a client needs to send lots of data to the server, etc.

Tags: grpcmicroservices
Previous Post

Distributed Transactions in Microservices: implementing Saga with Temporal

Next Post

gRPC Bidirectional Streaming with Code Example

Pankaj

Pankaj

Software Architect @ Schlumberger ``` Cloud | Microservices | Programming | Kubernetes | Architecture | Machine Learning | Java | Python ```

Related Posts

GraphQL Error Handling
GraphQL

GraphQL Error Handling

August 9, 2023
Pages
GraphQL

Spring for GraphQL: Pagination with Code Example

July 26, 2023
GraphQL

Spring for GraphQL: Interfaces and Unions

May 17, 2023
gRPC Bidirectional Streaming with Code Example
gRPC

gRPC Bidirectional Streaming with Code Example

February 17, 2023

Recent Articles

GraphQL Error Handling

GraphQL Error Handling

August 9, 2023
Pages

Spring for GraphQL: Pagination with Code Example

July 26, 2023

Spring for GraphQL: Interfaces and Unions

May 17, 2023
gRPC Bidirectional Streaming with Code Example

gRPC Bidirectional Streaming with Code Example

February 17, 2023
  • Trending
  • Comments
  • Latest
gRPC Bidirectional Streaming with Code Example

gRPC Bidirectional Streaming with Code Example

February 17, 2023
Deploying a RESTful Spring Boot Microservice on Kubernetes

Deploying a RESTful Spring Boot Microservice on Kubernetes

August 18, 2021
gRPC Interceptor: unary interceptor with code example

gRPC Interceptor: unary interceptor with code example

April 30, 2022
Temporal Workflow Orchestration

Workflow Orchestration with Temporal and Spring Boot

October 29, 2022
Calculus

Functional Programming in Java

0
Java Streams: stream creation with examples

Java Streams: stream creation with examples

0
Garbage Collection

Super Fast Garbage Collectors in Java

0
Java Streams: Stream Operation with Examples

Java Streams: Stream Operation with Examples

0
GraphQL Error Handling

GraphQL Error Handling

August 9, 2023
Pages

Spring for GraphQL: Pagination with Code Example

July 26, 2023

Spring for GraphQL: Interfaces and Unions

May 17, 2023
gRPC Bidirectional Streaming with Code Example

gRPC Bidirectional Streaming with Code Example

February 17, 2023
Facebook Twitter Pinterest

TECHDOZO

Simplifying modern tech stack!

Browse by Category

  • Bitesize
  • GraphQL
  • gRPC
  • Java
  • Kubernetes
  • Microservices
  • Spring Boot

Recent Articles

GraphQL Error Handling

GraphQL Error Handling

August 9, 2023
Pages

Spring for GraphQL: Pagination with Code Example

July 26, 2023

© 2023 Techdozo.

Welcome Back!

Sign In with Google
OR

Login to your account below

Forgotten Password?

Retrieve your password

Please enter your username or email address to reset your password.

Log In
No Result
View All Result
  • Home
  • gRPC
  • Kubernetes
  • Microservices
  • GraphQL

© 2023 Techdozo.