1
#pragma once
2

            
3
#include <memory>
4

            
5
#include "envoy/config/core/v3/base.pb.h"
6
#include "envoy/config/core/v3/grpc_service.pb.h"
7
#include "envoy/config/route/v3/route_components.pb.h"
8
#include "envoy/grpc/async_client.h"
9
#include "envoy/stream_info/stream_info.h"
10

            
11
#include "source/common/common/linked_object.h"
12
#include "source/common/grpc/codec.h"
13
#include "source/common/grpc/typed_async_client.h"
14
#include "source/common/http/async_client_impl.h"
15
#include "source/common/router/header_parser.h"
16

            
17
namespace Envoy {
18
namespace Grpc {
19

            
20
class AsyncRequestImpl;
21

            
22
class AsyncStreamImpl;
23
using AsyncStreamImplPtr = std::unique_ptr<AsyncStreamImpl>;
24

            
25
class AsyncClientImpl final : public RawAsyncClient {
26
public:
27
  static absl::StatusOr<std::unique_ptr<AsyncClientImpl>>
28
  create(const envoy::config::core::v3::GrpcService& config,
29
         Server::Configuration::CommonFactoryContext& context);
30
  ~AsyncClientImpl() override;
31

            
32
  // Grpc::AsyncClient
33
  AsyncRequest* sendRaw(absl::string_view service_full_name, absl::string_view method_name,
34
                        Buffer::InstancePtr&& request, RawAsyncRequestCallbacks& callbacks,
35
                        Tracing::Span& parent_span,
36
                        const Http::AsyncClient::RequestOptions& options) override;
37
  RawAsyncStream* startRaw(absl::string_view service_full_name, absl::string_view method_name,
38
                           RawAsyncStreamCallbacks& callbacks,
39
                           const Http::AsyncClient::StreamOptions& options) override;
40
3
  absl::string_view destination() override { return remote_cluster_name_; }
41

            
42
2451
  const Router::RetryPolicyConstSharedPtr& retryPolicy() { return retry_policy_; }
43

            
44
protected:
45
  AsyncClientImpl(const envoy::config::core::v3::GrpcService& config,
46
                  Server::Configuration::CommonFactoryContext& context,
47
                  absl::Status& creation_status);
48

            
49
private:
50
  const uint32_t max_recv_message_length_;
51
  const bool skip_envoy_headers_;
52
  Upstream::ClusterManager& cm_;
53
  const std::string remote_cluster_name_;
54
  // The host header value in the http transport.
55
  const std::string host_name_;
56
  std::list<AsyncStreamImplPtr> active_streams_;
57
  TimeSource& time_source_;
58
  Router::HeaderParserPtr metadata_parser_;
59
  // Default per service retry policy.
60
  Router::RetryPolicyConstSharedPtr retry_policy_;
61

            
62
  friend class AsyncRequestImpl;
63
  friend class AsyncStreamImpl;
64
};
65

            
66
class AsyncStreamImpl : public RawAsyncStream,
67
                        Http::AsyncClient::StreamCallbacks,
68
                        public Event::DeferredDeletable,
69
                        public LinkedObject<AsyncStreamImpl> {
70
public:
71
  AsyncStreamImpl(AsyncClientImpl& parent, absl::string_view service_full_name,
72
                  absl::string_view method_name, RawAsyncStreamCallbacks& callbacks,
73
                  const Http::AsyncClient::StreamOptions& options);
74
  ~AsyncStreamImpl() override;
75

            
76
  virtual void initialize(bool buffer_body_for_retry);
77

            
78
  // Http::AsyncClient::StreamCallbacks
79
  void onHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override;
80
  void onData(Buffer::Instance& data, bool end_stream) override;
81
  void onTrailers(Http::ResponseTrailerMapPtr&& trailers) override;
82
  void onComplete() override;
83
  void onReset() override;
84
  void waitForRemoteCloseAndDelete() override;
85

            
86
  // Grpc::AsyncStream
87
  void sendMessageRaw(Buffer::InstancePtr&& request, bool end_stream) override;
88
  void closeStream() override;
89
  void resetStream() override;
90
23
  bool isAboveWriteBufferHighWatermark() const override {
91
23
    return stream_ && stream_->isAboveWriteBufferHighWatermark();
92
23
  }
93

            
94
2618
  bool hasResetStream() const { return http_reset_; }
95
59
  const StreamInfo::StreamInfo& streamInfo() const override { return stream_->streamInfo(); }
96
3351
  StreamInfo::StreamInfo& streamInfo() override { return stream_->streamInfo(); }
97

            
98
1
  void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) override {
99
1
    stream_->setWatermarkCallbacks(callbacks);
100
1
  }
101

            
102
415
  void removeWatermarkCallbacks() override {
103
415
    if (options_.sidestream_watermark_callbacks != nullptr) {
104
415
      stream_->removeWatermarkCallbacks();
105
415
      options_.sidestream_watermark_callbacks = nullptr;
106
415
    }
107
415
  }
108

            
109
protected:
110
  Upstream::ClusterInfoConstSharedPtr cluster_info_;
111

            
112
private:
113
  void streamError(Status::GrpcStatus grpc_status, const std::string& message);
114
1113
  void streamError(Status::GrpcStatus grpc_status) { streamError(grpc_status, EMPTY_STRING); }
115

            
116
  void cleanup();
117
  void trailerResponse(absl::optional<Status::GrpcStatus> grpc_status,
118
                       const std::string& grpc_message);
119

            
120
  // Deliver notification and update span when the connection closes.
121
  void notifyRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message);
122

            
123
protected:
124
  Event::Dispatcher* dispatcher_{};
125
  Http::RequestMessagePtr headers_message_;
126
  AsyncClientImpl& parent_;
127
  std::string service_full_name_;
128
  std::string method_name_;
129
  Tracing::SpanPtr current_span_;
130

            
131
  RawAsyncStreamCallbacks& callbacks_;
132
  Http::AsyncClient::StreamOptions options_;
133
  bool http_reset_{};
134
  bool waiting_to_delete_on_remote_close_{};
135
  Http::AsyncClient::Stream* stream_{};
136
  Decoder decoder_;
137
  // This is a member to avoid reallocation on every onData().
138
  std::vector<Frame> decoded_frames_;
139
  Event::TimerPtr remote_close_timer_;
140

            
141
  friend class AsyncClientImpl;
142
};
143

            
144
class AsyncRequestImpl : public AsyncRequest, public AsyncStreamImpl, RawAsyncStreamCallbacks {
145
public:
146
  AsyncRequestImpl(AsyncClientImpl& parent, absl::string_view service_full_name,
147
                   absl::string_view method_name, Buffer::InstancePtr&& request,
148
                   RawAsyncRequestCallbacks& callbacks, Tracing::Span& parent_span,
149
                   const Http::AsyncClient::RequestOptions& options);
150

            
151
  void initialize(bool buffer_body_for_retry) override;
152

            
153
  // Grpc::AsyncRequest
154
  void cancel() override;
155
  const StreamInfo::StreamInfo& streamInfo() const override;
156
  void detach() override;
157

            
158
private:
159
  using AsyncStreamImpl::streamInfo;
160
  // Grpc::AsyncStreamCallbacks
161
  void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override;
162
  void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override;
163
  bool onReceiveMessageRaw(Buffer::InstancePtr&& response) override;
164
  void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override;
165
  void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override;
166

            
167
  Buffer::InstancePtr request_;
168
  RawAsyncRequestCallbacks& callbacks_;
169
  Tracing::SpanPtr current_span_;
170
  Buffer::InstancePtr response_;
171
};
172

            
173
} // namespace Grpc
174
} // namespace Envoy