Line data Source code
1 : #pragma once
2 :
3 : #include <memory>
4 :
5 : #include "envoy/config/core/v3/config_source.pb.h"
6 : #include "envoy/event/dispatcher.h"
7 : #include "envoy/grpc/async_client_manager.h"
8 : #include "envoy/singleton/instance.h"
9 : #include "envoy/stats/scope.h"
10 : #include "envoy/thread_local/thread_local.h"
11 :
12 : #include "source/common/common/assert.h"
13 : #include "source/common/grpc/typed_async_client.h"
14 : #include "source/common/http/utility.h"
15 : #include "source/common/protobuf/utility.h"
16 : #include "source/common/tracing/null_span_impl.h"
17 : #include "source/extensions/access_loggers/common/grpc_access_logger_utils.h"
18 :
19 : #include "absl/container/flat_hash_map.h"
20 : #include "absl/types/optional.h"
21 :
22 : namespace Envoy {
23 : namespace Extensions {
24 : namespace AccessLoggers {
25 : namespace Common {
26 :
27 : enum class GrpcAccessLoggerType { TCP, HTTP };
28 :
29 : namespace Detail {
30 :
31 : /**
32 : * Fully specialized types of the interfaces below are available through the
33 : * `Common::GrpcAccessLogger::Interface` and `Common::GrpcAccessLoggerCache::interface`
34 : * aliases.
35 : */
36 :
37 : /**
38 : * Interface for an access logger. The logger provides abstraction on top of gRPC stream, deals with
39 : * reconnects and performs batching.
40 : */
41 : template <typename HttpLogProto, typename TcpLogProto> class GrpcAccessLogger {
42 : public:
43 : using SharedPtr = std::shared_ptr<GrpcAccessLogger>;
44 :
45 0 : virtual ~GrpcAccessLogger() = default;
46 :
47 : /**
48 : * Log http access entry.
49 : * @param entry supplies the access log to send.
50 : */
51 : virtual void log(HttpLogProto&& entry) PURE;
52 :
53 : /**
54 : * Log tcp access entry.
55 : * @param entry supplies the access log to send.
56 : */
57 : virtual void log(TcpLogProto&& entry) PURE;
58 : };
59 :
60 : /**
61 : * Interface for an access logger cache. The cache deals with threading and de-duplicates loggers
62 : * for the same configuration.
63 : */
64 : template <typename GrpcAccessLogger, typename ConfigProto> class GrpcAccessLoggerCache {
65 : public:
66 : using SharedPtr = std::shared_ptr<GrpcAccessLoggerCache>;
67 0 : virtual ~GrpcAccessLoggerCache() = default;
68 :
69 : /**
70 : * Get existing logger or create a new one for the given configuration.
71 : * @param config supplies the configuration for the logger.
72 : * @return GrpcAccessLoggerSharedPtr ready for logging requests.
73 : */
74 : virtual typename GrpcAccessLogger::SharedPtr
75 : getOrCreateLogger(const ConfigProto& config, GrpcAccessLoggerType logger_type) PURE;
76 : };
77 :
78 : template <typename LogRequest, typename LogResponse> class GrpcAccessLogClient {
79 : public:
80 0 : virtual ~GrpcAccessLogClient() = default;
81 : virtual bool isConnected() PURE;
82 : virtual bool log(const LogRequest& request) PURE;
83 :
84 : protected:
85 : GrpcAccessLogClient(const Grpc::RawAsyncClientSharedPtr& client,
86 : const Protobuf::MethodDescriptor& service_method,
87 : OptRef<const envoy::config::core::v3::RetryPolicy> retry_policy)
88 : : client_(client), service_method_(service_method),
89 0 : opts_(createRequestOptionsForRetry(retry_policy)) {}
90 :
91 : Grpc::AsyncClient<LogRequest, LogResponse> client_;
92 : const Protobuf::MethodDescriptor& service_method_;
93 : const Http::AsyncClient::RequestOptions opts_;
94 :
95 : private:
96 : Http::AsyncClient::RequestOptions
97 0 : createRequestOptionsForRetry(OptRef<const envoy::config::core::v3::RetryPolicy> retry_policy) {
98 0 : auto opt = Http::AsyncClient::RequestOptions();
99 :
100 0 : if (!retry_policy) {
101 0 : return opt;
102 0 : }
103 :
104 0 : const auto grpc_retry_policy =
105 0 : Http::Utility::convertCoreToRouteRetryPolicy(*retry_policy, "connect-failure");
106 0 : opt.setBufferBodyForRetry(true);
107 0 : opt.setRetryPolicy(grpc_retry_policy);
108 0 : return opt;
109 0 : }
110 : };
111 :
112 : template <typename LogRequest, typename LogResponse>
113 : class UnaryGrpcAccessLogClient : public GrpcAccessLogClient<LogRequest, LogResponse> {
114 : public:
115 : UnaryGrpcAccessLogClient(const Grpc::RawAsyncClientSharedPtr& client,
116 : const Protobuf::MethodDescriptor& service_method,
117 : OptRef<const envoy::config::core::v3::RetryPolicy> retry_policy)
118 0 : : GrpcAccessLogClient<LogRequest, LogResponse>(client, service_method, retry_policy) {}
119 :
120 0 : bool isConnected() override { return false; }
121 :
122 0 : bool log(const LogRequest& request) override {
123 0 : GrpcAccessLogClient<LogRequest, LogResponse>::client_->send(
124 0 : GrpcAccessLogClient<LogRequest, LogResponse>::service_method_, request, request_cb_,
125 0 : Tracing::NullSpan::instance(), GrpcAccessLogClient<LogRequest, LogResponse>::opts_);
126 0 : return true;
127 0 : }
128 :
129 : struct RequestCallbacks : public Grpc::AsyncRequestCallbacks<LogResponse> {
130 : // Grpc::AsyncRequestCallbacks
131 0 : void onSuccess(Grpc::ResponsePtr<LogResponse>&&, Tracing::Span&) override {}
132 0 : void onCreateInitialMetadata(Http::RequestHeaderMap&) override {}
133 0 : void onFailure(Grpc::Status::GrpcStatus, const std::string&, Tracing::Span&) override {}
134 : };
135 :
136 : private:
137 : RequestCallbacks request_cb_;
138 : };
139 :
140 : template <typename LogRequest, typename LogResponse>
141 : class StreamingGrpcAccessLogClient : public GrpcAccessLogClient<LogRequest, LogResponse> {
142 : public:
143 : StreamingGrpcAccessLogClient(const Grpc::RawAsyncClientSharedPtr& client,
144 : const Protobuf::MethodDescriptor& service_method,
145 : OptRef<const envoy::config::core::v3::RetryPolicy> retry_policy)
146 0 : : GrpcAccessLogClient<LogRequest, LogResponse>(client, service_method, retry_policy) {}
147 :
148 : public:
149 : struct LocalStream : public Grpc::AsyncStreamCallbacks<LogResponse> {
150 0 : LocalStream(StreamingGrpcAccessLogClient& parent) : parent_(parent) {}
151 :
152 : // Grpc::AsyncStreamCallbacks
153 0 : void onCreateInitialMetadata(Http::RequestHeaderMap&) override {}
154 0 : void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {}
155 0 : void onReceiveMessage(std::unique_ptr<LogResponse>&&) override {}
156 0 : void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {}
157 0 : void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override {
158 0 : ASSERT(parent_.stream_ != nullptr);
159 0 : if (parent_.stream_->stream_ != nullptr) {
160 : // Only reset if we have a stream. Otherwise we had an inline failure and we will clear the
161 : // stream data in send().
162 0 : parent_.stream_.reset();
163 0 : }
164 0 : }
165 :
166 : StreamingGrpcAccessLogClient& parent_;
167 : Grpc::AsyncStream<LogRequest> stream_{};
168 : };
169 :
170 0 : bool isConnected() override { return stream_ != nullptr && stream_->stream_ != nullptr; }
171 :
172 0 : bool log(const LogRequest& request) override {
173 0 : if (!stream_) {
174 0 : stream_ = std::make_unique<LocalStream>(*this);
175 0 : }
176 :
177 0 : if (stream_->stream_ == nullptr) {
178 0 : stream_->stream_ = GrpcAccessLogClient<LogRequest, LogResponse>::client_->start(
179 0 : GrpcAccessLogClient<LogRequest, LogResponse>::service_method_, *stream_,
180 0 : GrpcAccessLogClient<LogRequest, LogResponse>::opts_);
181 0 : }
182 :
183 0 : if (stream_->stream_ != nullptr) {
184 0 : if (stream_->stream_->isAboveWriteBufferHighWatermark()) {
185 0 : return false;
186 0 : }
187 0 : stream_->stream_->sendMessage(request, false);
188 0 : } else {
189 : // Clear out the stream data due to stream creation failure.
190 0 : stream_.reset();
191 0 : }
192 0 : return true;
193 0 : }
194 :
195 : std::unique_ptr<LocalStream> stream_;
196 : };
197 :
198 : } // namespace Detail
199 :
200 : /**
201 : * All stats for the grpc access logger. @see stats_macros.h
202 : */
203 : #define ALL_GRPC_ACCESS_LOGGER_STATS(COUNTER) \
204 : COUNTER(logs_written) \
205 : COUNTER(logs_dropped)
206 :
207 : /**
208 : * Wrapper struct for the access log stats. @see stats_macros.h
209 : */
210 : struct GrpcAccessLoggerStats {
211 : ALL_GRPC_ACCESS_LOGGER_STATS(GENERATE_COUNTER_STRUCT)
212 : };
213 :
214 : /**
215 : * Base class for defining a gRPC logger with the `HttpLogProto` and `TcpLogProto` access log
216 : * entries and `LogRequest` and `LogResponse` gRPC messages.
217 : * The log entries and messages are distinct types to support batching of multiple access log
218 : * entries in a single gRPC messages that go on the wire.
219 : */
220 : template <typename HttpLogProto, typename TcpLogProto, typename LogRequest, typename LogResponse>
221 : class GrpcAccessLogger : public Detail::GrpcAccessLogger<HttpLogProto, TcpLogProto> {
222 : public:
223 : using Interface = Detail::GrpcAccessLogger<HttpLogProto, TcpLogProto>;
224 : GrpcAccessLogger(
225 : const Grpc::RawAsyncClientSharedPtr& client,
226 : const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
227 : Event::Dispatcher& dispatcher, Stats::Scope& scope, std::string access_log_prefix,
228 : const Protobuf::MethodDescriptor& service_method, bool stream = true)
229 : : buffer_flush_interval_msec_(
230 : PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_interval, 1000)),
231 0 : flush_timer_(dispatcher.createTimer([this]() {
232 0 : flush();
233 0 : flush_timer_->enableTimer(buffer_flush_interval_msec_);
234 0 : })),
235 : max_buffer_size_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, 16384)),
236 0 : stats_({ALL_GRPC_ACCESS_LOGGER_STATS(POOL_COUNTER_PREFIX(scope, access_log_prefix))}) {
237 0 : if (stream) {
238 0 : client_ = std::make_unique<Detail::StreamingGrpcAccessLogClient<LogRequest, LogResponse>>(
239 0 : client, service_method, GrpcCommon::optionalRetryPolicy(config));
240 0 : } else {
241 0 : client_ = std::make_unique<Detail::UnaryGrpcAccessLogClient<LogRequest, LogResponse>>(
242 0 : client, service_method, GrpcCommon::optionalRetryPolicy(config));
243 0 : }
244 0 : flush_timer_->enableTimer(buffer_flush_interval_msec_);
245 0 : }
246 :
247 0 : void log(HttpLogProto&& entry) override {
248 0 : if (!canLogMore()) {
249 0 : return;
250 0 : }
251 0 : approximate_message_size_bytes_ += entry.ByteSizeLong();
252 0 : addEntry(std::move(entry));
253 0 : if (approximate_message_size_bytes_ >= max_buffer_size_bytes_) {
254 0 : flush();
255 0 : }
256 0 : }
257 :
258 0 : void log(TcpLogProto&& entry) override {
259 0 : approximate_message_size_bytes_ += entry.ByteSizeLong();
260 0 : addEntry(std::move(entry));
261 0 : if (approximate_message_size_bytes_ >= max_buffer_size_bytes_) {
262 0 : flush();
263 0 : }
264 0 : }
265 :
266 : protected:
267 : std::unique_ptr<Detail::GrpcAccessLogClient<LogRequest, LogResponse>> client_;
268 : LogRequest message_;
269 :
270 : private:
271 : virtual bool isEmpty() PURE;
272 : virtual void initMessage() PURE;
273 : virtual void addEntry(HttpLogProto&& entry) PURE;
274 : virtual void addEntry(TcpLogProto&& entry) PURE;
275 0 : virtual void clearMessage() { message_.Clear(); }
276 :
277 0 : void flush() {
278 0 : if (isEmpty()) {
279 : // Nothing to flush.
280 0 : return;
281 0 : }
282 :
283 0 : if (!client_->isConnected()) {
284 0 : initMessage();
285 0 : }
286 :
287 0 : if (client_->log(message_)) {
288 : // Clear the message regardless of the success.
289 0 : approximate_message_size_bytes_ = 0;
290 0 : clearMessage();
291 0 : }
292 0 : }
293 :
294 0 : bool canLogMore() {
295 0 : if (max_buffer_size_bytes_ == 0 || approximate_message_size_bytes_ < max_buffer_size_bytes_) {
296 0 : stats_.logs_written_.inc();
297 0 : return true;
298 0 : }
299 0 : flush();
300 0 : if (approximate_message_size_bytes_ < max_buffer_size_bytes_) {
301 0 : stats_.logs_written_.inc();
302 0 : return true;
303 0 : }
304 0 : stats_.logs_dropped_.inc();
305 0 : return false;
306 0 : }
307 :
308 : const std::chrono::milliseconds buffer_flush_interval_msec_;
309 : const Event::TimerPtr flush_timer_;
310 : const uint64_t max_buffer_size_bytes_;
311 : uint64_t approximate_message_size_bytes_ = 0;
312 : GrpcAccessLoggerStats stats_;
313 : };
314 :
315 : /**
316 : * Class for defining logger cache with the `GrpcAccessLogger` interface and
317 : * `ConfigProto` configuration.
318 : */
319 : template <typename GrpcAccessLogger, typename ConfigProto>
320 : class GrpcAccessLoggerCache : public Singleton::Instance,
321 : public Detail::GrpcAccessLoggerCache<GrpcAccessLogger, ConfigProto> {
322 : public:
323 : using Interface = Detail::GrpcAccessLoggerCache<GrpcAccessLogger, ConfigProto>;
324 :
325 : GrpcAccessLoggerCache(Grpc::AsyncClientManager& async_client_manager, Stats::Scope& scope,
326 : ThreadLocal::SlotAllocator& tls)
327 0 : : scope_(scope), async_client_manager_(async_client_manager), tls_slot_(tls.allocateSlot()) {
328 0 : tls_slot_->set([](Event::Dispatcher& dispatcher) {
329 0 : return std::make_shared<ThreadLocalCache>(dispatcher);
330 0 : });
331 0 : }
332 :
333 : typename GrpcAccessLogger::SharedPtr
334 0 : getOrCreateLogger(const ConfigProto& config, GrpcAccessLoggerType logger_type) override {
335 : // TODO(euroelessar): Consider cleaning up loggers.
336 0 : auto& cache = tls_slot_->getTyped<ThreadLocalCache>();
337 0 : const auto cache_key = std::make_pair(MessageUtil::hash(config), logger_type);
338 0 : const auto it = cache.access_loggers_.find(cache_key);
339 0 : if (it != cache.access_loggers_.end()) {
340 0 : return it->second;
341 0 : }
342 :
343 0 : const auto logger = createLogger(config, cache.dispatcher_);
344 0 : cache.access_loggers_.emplace(cache_key, logger);
345 0 : return logger;
346 0 : }
347 :
348 : protected:
349 : Stats::Scope& scope_;
350 : Grpc::AsyncClientManager& async_client_manager_;
351 :
352 : private:
353 : /**
354 : * Per-thread cache.
355 : */
356 : struct ThreadLocalCache : public ThreadLocal::ThreadLocalObject {
357 0 : ThreadLocalCache(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
358 :
359 : Event::Dispatcher& dispatcher_;
360 : // Access loggers indexed by the hash of logger's configuration and logger type.
361 : absl::flat_hash_map<std::pair<std::size_t, Common::GrpcAccessLoggerType>,
362 : typename GrpcAccessLogger::SharedPtr>
363 : access_loggers_;
364 : };
365 :
366 : // Create the specific logger type for this cache.
367 : virtual typename GrpcAccessLogger::SharedPtr createLogger(const ConfigProto& config,
368 : Event::Dispatcher& dispatcher) PURE;
369 :
370 : ThreadLocal::SlotPtr tls_slot_;
371 : };
372 :
373 : } // namespace Common
374 : } // namespace AccessLoggers
375 : } // namespace Extensions
376 : } // namespace Envoy
|