1
#pragma once
2

            
3
#include <memory>
4

            
5
#include "envoy/config/core/v3/config_source.pb.h"
6
#include "envoy/event/dispatcher.h"
7
#include "envoy/grpc/async_client_manager.h"
8
#include "envoy/singleton/instance.h"
9
#include "envoy/stats/scope.h"
10
#include "envoy/thread_local/thread_local.h"
11

            
12
#include "source/common/common/assert.h"
13
#include "source/common/grpc/typed_async_client.h"
14
#include "source/common/http/utility.h"
15
#include "source/common/protobuf/utility.h"
16
#include "source/common/tracing/null_span_impl.h"
17
#include "source/extensions/access_loggers/common/grpc_access_logger_utils.h"
18

            
19
#include "absl/container/flat_hash_map.h"
20
#include "absl/types/optional.h"
21

            
22
namespace Envoy {
23
namespace Extensions {
24
namespace AccessLoggers {
25
namespace Common {
26

            
27
template <typename LogRequest, typename LogResponse> class GrpcAccessLogClient {
28
public:
29
102
  virtual ~GrpcAccessLogClient() = default;
30
  virtual bool isConnected() PURE;
31
  virtual bool log(const LogRequest& request) PURE;
32

            
33
protected:
34
  GrpcAccessLogClient(const Grpc::RawAsyncClientSharedPtr& client,
35
                      const Protobuf::MethodDescriptor& service_method,
36
                      OptRef<const envoy::config::core::v3::RetryPolicy> retry_policy)
37
102
      : client_(client), service_method_(service_method),
38
102
        opts_(createRequestOptionsForRetry(retry_policy)) {}
39

            
40
  Grpc::AsyncClient<LogRequest, LogResponse> client_;
41
  const Protobuf::MethodDescriptor& service_method_;
42
  const Http::AsyncClient::RequestOptions opts_;
43

            
44
private:
45
  Http::AsyncClient::RequestOptions
46
102
  createRequestOptionsForRetry(OptRef<const envoy::config::core::v3::RetryPolicy> retry_policy) {
47
102
    auto opt = Http::AsyncClient::RequestOptions();
48

            
49
102
    if (!retry_policy) {
50
100
      return opt;
51
100
    }
52

            
53
2
    const auto grpc_retry_policy =
54
2
        Http::Utility::convertCoreToRouteRetryPolicy(*retry_policy, "connect-failure");
55
2
    opt.setBufferBodyForRetry(true);
56
2
    opt.setRetryPolicy(grpc_retry_policy);
57
2
    return opt;
58
102
  }
59
};
60

            
61
template <typename LogRequest, typename LogResponse>
62
class UnaryGrpcAccessLogClient : public GrpcAccessLogClient<LogRequest, LogResponse> {
63
public:
64
  using AsyncRequestCallbacksFactory = std::function<Grpc::AsyncRequestCallbacks<LogResponse>&()>;
65

            
66
  UnaryGrpcAccessLogClient(const Grpc::RawAsyncClientSharedPtr& client,
67
                           const Protobuf::MethodDescriptor& service_method,
68
                           OptRef<const envoy::config::core::v3::RetryPolicy> retry_policy,
69
                           AsyncRequestCallbacksFactory callback_factory)
70
22
      : GrpcAccessLogClient<LogRequest, LogResponse>(client, service_method, retry_policy),
71
22
        callbacks_factory_(callback_factory) {}
72

            
73
23
  bool isConnected() override { return false; }
74

            
75
23
  bool log(const LogRequest& request) override {
76
23
    GrpcAccessLogClient<LogRequest, LogResponse>::client_->send(
77
23
        GrpcAccessLogClient<LogRequest, LogResponse>::service_method_, request,
78
23
        callbacks_factory_(), Tracing::NullSpan::instance(),
79
23
        GrpcAccessLogClient<LogRequest, LogResponse>::opts_);
80
23
    return true;
81
23
  }
82

            
83
private:
84
  AsyncRequestCallbacksFactory callbacks_factory_;
85
};
86

            
87
template <typename LogRequest, typename LogResponse>
88
class StreamingGrpcAccessLogClient : public GrpcAccessLogClient<LogRequest, LogResponse> {
89
public:
90
  StreamingGrpcAccessLogClient(const Grpc::RawAsyncClientSharedPtr& client,
91
                               const Protobuf::MethodDescriptor& service_method,
92
                               OptRef<const envoy::config::core::v3::RetryPolicy> retry_policy)
93
80
      : GrpcAccessLogClient<LogRequest, LogResponse>(client, service_method, retry_policy) {}
94

            
95
public:
96
  struct LocalStream : public Grpc::AsyncStreamCallbacks<LogResponse> {
97
40
    LocalStream(StreamingGrpcAccessLogClient& parent) : parent_(parent) {}
98

            
99
    // Grpc::AsyncStreamCallbacks
100
30
    void onCreateInitialMetadata(Http::RequestHeaderMap&) override {}
101
29
    void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {}
102
3
    void onReceiveMessage(std::unique_ptr<LogResponse>&&) override {}
103
30
    void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {}
104
32
    void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override {
105
32
      ASSERT(parent_.stream_ != nullptr);
106
32
      if (parent_.stream_->stream_ != nullptr) {
107
        // Only reset if we have a stream. Otherwise we had an inline failure and we will clear the
108
        // stream data in send().
109
31
        parent_.stream_.reset();
110
31
      }
111
32
    }
112

            
113
    StreamingGrpcAccessLogClient& parent_;
114
    Grpc::AsyncStream<LogRequest> stream_{};
115
  };
116

            
117
47
  bool isConnected() override { return stream_ != nullptr && stream_->stream_ != nullptr; }
118

            
119
47
  bool log(const LogRequest& request) override {
120
47
    if (!stream_) {
121
40
      stream_ = std::make_unique<LocalStream>(*this);
122
40
    }
123

            
124
47
    if (stream_->stream_ == nullptr) {
125
40
      stream_->stream_ = GrpcAccessLogClient<LogRequest, LogResponse>::client_->start(
126
40
          GrpcAccessLogClient<LogRequest, LogResponse>::service_method_, *stream_,
127
40
          GrpcAccessLogClient<LogRequest, LogResponse>::opts_);
128
40
    }
129

            
130
47
    if (stream_->stream_ != nullptr) {
131
45
      if (stream_->stream_->isAboveWriteBufferHighWatermark()) {
132
2
        return false;
133
2
      }
134
43
      stream_->stream_->sendMessage(request, false);
135
43
    } else {
136
      // Clear out the stream data due to stream creation failure.
137
2
      stream_.reset();
138
2
    }
139
45
    return true;
140
47
  }
141

            
142
  std::unique_ptr<LocalStream> stream_;
143
};
144

            
145
} // namespace Common
146
} // namespace AccessLoggers
147
} // namespace Extensions
148
} // namespace Envoy