1
#pragma once
2

            
3
#include <memory>
4

            
5
#include "envoy/config/core/v3/config_source.pb.h"
6
#include "envoy/event/dispatcher.h"
7
#include "envoy/grpc/async_client_manager.h"
8
#include "envoy/singleton/instance.h"
9
#include "envoy/stats/scope.h"
10
#include "envoy/thread_local/thread_local.h"
11

            
12
#include "source/common/common/assert.h"
13
#include "source/common/grpc/typed_async_client.h"
14
#include "source/common/http/utility.h"
15
#include "source/common/protobuf/utility.h"
16
#include "source/common/tracing/null_span_impl.h"
17
#include "source/extensions/access_loggers/common/grpc_access_logger_clients.h"
18
#include "source/extensions/access_loggers/common/grpc_access_logger_utils.h"
19

            
20
#include "absl/container/flat_hash_map.h"
21
#include "absl/types/optional.h"
22

            
23
namespace Envoy {
24
namespace Extensions {
25
namespace AccessLoggers {
26
namespace Common {
27

            
28
enum class GrpcAccessLoggerType { TCP, HTTP };
29

            
30
namespace Detail {
31

            
32
/**
33
 * Fully specialized types of the interfaces below are available through the
34
 * `Common::GrpcAccessLogger::Interface` and `Common::GrpcAccessLoggerCache::interface`
35
 * aliases.
36
 */
37

            
38
/**
39
 * Interface for an access logger. The logger provides abstraction on top of gRPC stream, deals with
40
 * reconnects and performs batching.
41
 */
42
template <typename HttpLogProto, typename TcpLogProto> class GrpcAccessLogger {
43
public:
44
  using SharedPtr = std::shared_ptr<GrpcAccessLogger>;
45

            
46
114
  virtual ~GrpcAccessLogger() = default;
47

            
48
  /**
49
   * Log http access entry.
50
   * @param entry supplies the access log to send.
51
   */
52
  virtual void log(HttpLogProto&& entry) PURE;
53

            
54
  /**
55
   * Log tcp access entry.
56
   * @param entry supplies the access log to send.
57
   */
58
  virtual void log(TcpLogProto&& entry) PURE;
59
};
60

            
61
/**
62
 * Interface for an access logger cache. The cache deals with threading and de-duplicates loggers
63
 * for the same configuration.
64
 */
65
template <typename GrpcAccessLogger, typename ConfigProto> class GrpcAccessLoggerCache {
66
public:
67
  using SharedPtr = std::shared_ptr<GrpcAccessLoggerCache>;
68
58
  virtual ~GrpcAccessLoggerCache() = default;
69

            
70
  /**
71
   * Get existing logger or create a new one for the given configuration.
72
   * @param config supplies the configuration for the logger.
73
   * @return GrpcAccessLoggerSharedPtr ready for logging requests.
74
   */
75
  virtual typename GrpcAccessLogger::SharedPtr
76
  getOrCreateLogger(const ConfigProto& config, GrpcAccessLoggerType logger_type) PURE;
77
};
78

            
79
} // namespace Detail
80

            
81
/**
82
 * All stats for the grpc access logger. @see stats_macros.h
83
 */
84
#define ALL_GRPC_ACCESS_LOGGER_STATS(COUNTER)                                                      \
85
102
  COUNTER(logs_written)                                                                            \
86
102
  COUNTER(logs_dropped)
87

            
88
/**
89
 * Wrapper struct for the access log stats. @see stats_macros.h
90
 */
91
struct GrpcAccessLoggerStats {
92
  ALL_GRPC_ACCESS_LOGGER_STATS(GENERATE_COUNTER_STRUCT)
93
};
94

            
95
/**
96
 * Base class for defining a gRPC logger with the `HttpLogProto` and `TcpLogProto` access log
97
 * entries and `LogRequest` and `LogResponse` gRPC messages.
98
 * The log entries and messages are distinct types to support batching of multiple access log
99
 * entries in a single gRPC messages that go on the wire.
100
 */
101
template <typename HttpLogProto, typename TcpLogProto, typename LogRequest, typename LogResponse>
102
class GrpcAccessLogger : public Detail::GrpcAccessLogger<HttpLogProto, TcpLogProto> {
103
public:
104
  using Interface = Detail::GrpcAccessLogger<HttpLogProto, TcpLogProto>;
105
  GrpcAccessLogger(
106
      const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
107
      Event::Dispatcher& dispatcher, Stats::Scope& scope,
108
      absl::optional<std::string> access_log_prefix,
109
      std::unique_ptr<GrpcAccessLogClient<LogRequest, LogResponse>> client)
110
102
      : client_(std::move(client)), buffer_flush_interval_msec_(PROTOBUF_GET_MS_OR_DEFAULT(
111
102
                                        config, buffer_flush_interval, 1000)),
112
118
        flush_timer_(dispatcher.createTimer([this]() {
113
94
          flush();
114
94
          flush_timer_->enableTimer(buffer_flush_interval_msec_);
115
94
        })),
116
102
        max_buffer_size_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, 16384)) {
117
102
    flush_timer_->enableTimer(buffer_flush_interval_msec_);
118
102
    if (access_log_prefix.has_value()) {
119
84
      stats_ = std::make_unique<GrpcAccessLoggerStats>(GrpcAccessLoggerStats{
120
84
          ALL_GRPC_ACCESS_LOGGER_STATS(POOL_COUNTER_PREFIX(scope, access_log_prefix.value()))});
121
84
    }
122
102
  }
123

            
124
56
  void log(HttpLogProto&& entry) override {
125
56
    if (!canLogMore()) {
126
1
      return;
127
1
    }
128
55
    approximate_message_size_bytes_ += entry.ByteSizeLong();
129
55
    addEntry(std::move(entry));
130
55
    if (approximate_message_size_bytes_ >= max_buffer_size_bytes_) {
131
23
      flush();
132
23
    }
133
55
  }
134

            
135
29
  void log(TcpLogProto&& entry) override {
136
29
    approximate_message_size_bytes_ += entry.ByteSizeLong();
137
29
    addEntry(std::move(entry));
138
29
    if (approximate_message_size_bytes_ >= max_buffer_size_bytes_) {
139
4
      flush();
140
4
    }
141
29
  }
142

            
143
protected:
144
  std::unique_ptr<GrpcAccessLogClient<LogRequest, LogResponse>> client_;
145
  LogRequest message_;
146

            
147
private:
148
  virtual bool isEmpty() PURE;
149
  virtual void initMessage() PURE;
150
  virtual void addEntry(HttpLogProto&& entry) PURE;
151
  virtual void addEntry(TcpLogProto&& entry) PURE;
152
35
  virtual void clearMessage() { message_.Clear(); }
153

            
154
123
  void flush() {
155
123
    if (isEmpty()) {
156
      // Nothing to flush.
157
53
      return;
158
53
    }
159

            
160
70
    if (!client_->isConnected()) {
161
63
      initMessage();
162
63
    }
163

            
164
70
    if (client_->log(message_)) {
165
      // Clear the message regardless of the success.
166
68
      approximate_message_size_bytes_ = 0;
167
68
      clearMessage();
168
68
    }
169
70
  }
170

            
171
  // `canLogMore()` is only for streaming gRPC client only which could run into
172
  // AboveWriteBufferHighWatermark during `flush()` so that we tracks the log entries dropped caused
173
  // by that.
174
  //
175
  // For unary gRPC client, `canLogMore` always returns True[1][2] so `stats_` here is meaningless.
176
  //
177
  // [1]https://github.com/envoyproxy/envoy/blob/cd5ef906026160ec2cd766d8d18217e668c256d8/source/extensions/access_loggers/common/grpc_access_logger.h#L287.
178
  // [2]https://github.com/envoyproxy/envoy/blob/cd5ef906026160ec2cd766d8d18217e668c256d8/source/extensions/access_loggers/common/grpc_access_logger.h#L126
179
56
  bool canLogMore() {
180
56
    if (max_buffer_size_bytes_ == 0 || approximate_message_size_bytes_ < max_buffer_size_bytes_) {
181
54
      incLogsWrittenStats();
182
54
      return true;
183
54
    }
184
2
    flush();
185
2
    if (approximate_message_size_bytes_ < max_buffer_size_bytes_) {
186
1
      incLogsWrittenStats();
187
1
      return true;
188
1
    }
189
1
    incLogsDroppedStats();
190
1
    return false;
191
2
  }
192

            
193
1
  void incLogsDroppedStats() {
194
1
    if (stats_) {
195
1
      stats_->logs_dropped_.inc();
196
1
    }
197
1
  }
198

            
199
55
  void incLogsWrittenStats() {
200
55
    if (stats_) {
201
38
      stats_->logs_written_.inc();
202
38
    }
203
55
  }
204

            
205
  const std::chrono::milliseconds buffer_flush_interval_msec_;
206
  const Event::TimerPtr flush_timer_;
207
  const uint64_t max_buffer_size_bytes_;
208
  uint64_t approximate_message_size_bytes_ = 0;
209
  std::unique_ptr<GrpcAccessLoggerStats> stats_ = nullptr;
210
};
211

            
212
/**
213
 * Class for defining logger cache with the `GrpcAccessLogger` interface and
214
 * `ConfigProto` configuration.
215
 */
216
template <typename GrpcAccessLogger, typename ConfigProto>
217
class GrpcAccessLoggerCache : public Singleton::Instance,
218
                              public Detail::GrpcAccessLoggerCache<GrpcAccessLogger, ConfigProto> {
219
public:
220
  using Interface = Detail::GrpcAccessLoggerCache<GrpcAccessLogger, ConfigProto>;
221

            
222
  GrpcAccessLoggerCache(Grpc::AsyncClientManager& async_client_manager, Stats::Scope& scope,
223
                        ThreadLocal::SlotAllocator& tls)
224
44
      : scope_(scope), async_client_manager_(async_client_manager), tls_slot_(tls.allocateSlot()) {
225
76
    tls_slot_->set([](Event::Dispatcher& dispatcher) {
226
76
      return std::make_shared<ThreadLocalCache>(dispatcher);
227
76
    });
228
44
  }
229

            
230
  typename GrpcAccessLogger::SharedPtr
231
101
  getOrCreateLogger(const ConfigProto& config, GrpcAccessLoggerType logger_type) override {
232
    // TODO(euroelessar): Consider cleaning up loggers.
233
101
    auto& cache = tls_slot_->getTyped<ThreadLocalCache>();
234
101
    const auto cache_key = std::make_pair(MessageUtil::hash(config), logger_type);
235
101
    const auto it = cache.access_loggers_.find(cache_key);
236
101
    if (it != cache.access_loggers_.end()) {
237
14
      return it->second;
238
14
    }
239

            
240
87
    const auto logger = createLogger(config, cache.dispatcher_);
241
87
    cache.access_loggers_.emplace(cache_key, logger);
242
87
    return logger;
243
101
  }
244

            
245
protected:
246
  Stats::Scope& scope_;
247
  Grpc::AsyncClientManager& async_client_manager_;
248

            
249
private:
250
  /**
251
   * Per-thread cache.
252
   */
253
  struct ThreadLocalCache : public ThreadLocal::ThreadLocalObject {
254
76
    ThreadLocalCache(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
255

            
256
    Event::Dispatcher& dispatcher_;
257
    // Access loggers indexed by the hash of logger's configuration and logger type.
258
    absl::flat_hash_map<std::pair<std::size_t, Common::GrpcAccessLoggerType>,
259
                        typename GrpcAccessLogger::SharedPtr>
260
        access_loggers_;
261
  };
262

            
263
  // Create the specific logger type for this cache.
264
  virtual typename GrpcAccessLogger::SharedPtr createLogger(const ConfigProto& config,
265
                                                            Event::Dispatcher& dispatcher) PURE;
266

            
267
  ThreadLocal::SlotPtr tls_slot_;
268
};
269

            
270
} // namespace Common
271
} // namespace AccessLoggers
272
} // namespace Extensions
273
} // namespace Envoy