Line data Source code
1 : #pragma once 2 : 3 : #include <memory> 4 : 5 : #include "envoy/config/core/v3/base.pb.h" 6 : #include "envoy/config/core/v3/grpc_service.pb.h" 7 : #include "envoy/grpc/async_client.h" 8 : #include "envoy/stream_info/stream_info.h" 9 : 10 : #include "source/common/common/linked_object.h" 11 : #include "source/common/grpc/codec.h" 12 : #include "source/common/grpc/typed_async_client.h" 13 : #include "source/common/http/async_client_impl.h" 14 : #include "source/common/router/header_parser.h" 15 : 16 : namespace Envoy { 17 : namespace Grpc { 18 : 19 : class AsyncRequestImpl; 20 : 21 : class AsyncStreamImpl; 22 : using AsyncStreamImplPtr = std::unique_ptr<AsyncStreamImpl>; 23 : 24 : class AsyncClientImpl final : public RawAsyncClient { 25 : public: 26 : AsyncClientImpl(Upstream::ClusterManager& cm, const envoy::config::core::v3::GrpcService& config, 27 : TimeSource& time_source); 28 : ~AsyncClientImpl() override; 29 : 30 : // Grpc::AsyncClient 31 : AsyncRequest* sendRaw(absl::string_view service_full_name, absl::string_view method_name, 32 : Buffer::InstancePtr&& request, RawAsyncRequestCallbacks& callbacks, 33 : Tracing::Span& parent_span, 34 : const Http::AsyncClient::RequestOptions& options) override; 35 : RawAsyncStream* startRaw(absl::string_view service_full_name, absl::string_view method_name, 36 : RawAsyncStreamCallbacks& callbacks, 37 : const Http::AsyncClient::StreamOptions& options) override; 38 0 : absl::string_view destination() override { return remote_cluster_name_; } 39 : 40 : private: 41 : Upstream::ClusterManager& cm_; 42 : const std::string remote_cluster_name_; 43 : // The host header value in the http transport. 44 : const std::string host_name_; 45 : std::list<AsyncStreamImplPtr> active_streams_; 46 : TimeSource& time_source_; 47 : Router::HeaderParserPtr metadata_parser_; 48 : 49 : friend class AsyncRequestImpl; 50 : friend class AsyncStreamImpl; 51 : }; 52 : 53 : class AsyncStreamImpl : public RawAsyncStream, 54 : Http::AsyncClient::StreamCallbacks, 55 : public Event::DeferredDeletable, 56 : public LinkedObject<AsyncStreamImpl> { 57 : public: 58 : AsyncStreamImpl(AsyncClientImpl& parent, absl::string_view service_full_name, 59 : absl::string_view method_name, RawAsyncStreamCallbacks& callbacks, 60 : const Http::AsyncClient::StreamOptions& options); 61 : 62 : virtual void initialize(bool buffer_body_for_retry); 63 : 64 : // Http::AsyncClient::StreamCallbacks 65 : void onHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override; 66 : void onData(Buffer::Instance& data, bool end_stream) override; 67 : void onTrailers(Http::ResponseTrailerMapPtr&& trailers) override; 68 : void onComplete() override; 69 : void onReset() override; 70 : 71 : // Grpc::AsyncStream 72 : void sendMessageRaw(Buffer::InstancePtr&& request, bool end_stream) override; 73 : void closeStream() override; 74 : void resetStream() override; 75 0 : bool isAboveWriteBufferHighWatermark() const override { 76 0 : return stream_ && stream_->isAboveWriteBufferHighWatermark(); 77 0 : } 78 : 79 68 : bool hasResetStream() const { return http_reset_; } 80 122 : const StreamInfo::StreamInfo& streamInfo() const override { return stream_->streamInfo(); } 81 : 82 : private: 83 : void streamError(Status::GrpcStatus grpc_status, const std::string& message); 84 28 : void streamError(Status::GrpcStatus grpc_status) { streamError(grpc_status, EMPTY_STRING); } 85 : 86 : void cleanup(); 87 : void trailerResponse(absl::optional<Status::GrpcStatus> grpc_status, 88 : const std::string& grpc_message); 89 : 90 : Event::Dispatcher* dispatcher_{}; 91 : Http::RequestMessagePtr headers_message_; 92 : AsyncClientImpl& parent_; 93 : std::string service_full_name_; 94 : std::string method_name_; 95 : RawAsyncStreamCallbacks& callbacks_; 96 : Http::AsyncClient::StreamOptions options_; 97 : bool http_reset_{}; 98 : Http::AsyncClient::Stream* stream_{}; 99 : Decoder decoder_; 100 : // This is a member to avoid reallocation on every onData(). 101 : std::vector<Frame> decoded_frames_; 102 : 103 : friend class AsyncClientImpl; 104 : }; 105 : 106 : class AsyncRequestImpl : public AsyncRequest, public AsyncStreamImpl, RawAsyncStreamCallbacks { 107 : public: 108 : AsyncRequestImpl(AsyncClientImpl& parent, absl::string_view service_full_name, 109 : absl::string_view method_name, Buffer::InstancePtr&& request, 110 : RawAsyncRequestCallbacks& callbacks, Tracing::Span& parent_span, 111 : const Http::AsyncClient::RequestOptions& options); 112 : 113 : void initialize(bool buffer_body_for_retry) override; 114 : 115 : // Grpc::AsyncRequest 116 : void cancel() override; 117 : 118 : private: 119 : // Grpc::AsyncStreamCallbacks 120 : void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override; 121 : void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override; 122 : bool onReceiveMessageRaw(Buffer::InstancePtr&& response) override; 123 : void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override; 124 : void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override; 125 : 126 : Buffer::InstancePtr request_; 127 : RawAsyncRequestCallbacks& callbacks_; 128 : Tracing::SpanPtr current_span_; 129 : Buffer::InstancePtr response_; 130 : }; 131 : 132 : } // namespace Grpc 133 : } // namespace Envoy