LCOV - code coverage report
Current view: top level - source/extensions/access_loggers/common - grpc_access_logger.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 132 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 62 0.0 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include <memory>
       4             : 
       5             : #include "envoy/config/core/v3/config_source.pb.h"
       6             : #include "envoy/event/dispatcher.h"
       7             : #include "envoy/grpc/async_client_manager.h"
       8             : #include "envoy/singleton/instance.h"
       9             : #include "envoy/stats/scope.h"
      10             : #include "envoy/thread_local/thread_local.h"
      11             : 
      12             : #include "source/common/common/assert.h"
      13             : #include "source/common/grpc/typed_async_client.h"
      14             : #include "source/common/http/utility.h"
      15             : #include "source/common/protobuf/utility.h"
      16             : #include "source/common/tracing/null_span_impl.h"
      17             : #include "source/extensions/access_loggers/common/grpc_access_logger_utils.h"
      18             : 
      19             : #include "absl/container/flat_hash_map.h"
      20             : #include "absl/types/optional.h"
      21             : 
      22             : namespace Envoy {
      23             : namespace Extensions {
      24             : namespace AccessLoggers {
      25             : namespace Common {
      26             : 
      27             : enum class GrpcAccessLoggerType { TCP, HTTP };
      28             : 
      29             : namespace Detail {
      30             : 
      31             : /**
      32             :  * Fully specialized types of the interfaces below are available through the
      33             :  * `Common::GrpcAccessLogger::Interface` and `Common::GrpcAccessLoggerCache::interface`
      34             :  * aliases.
      35             :  */
      36             : 
      37             : /**
      38             :  * Interface for an access logger. The logger provides abstraction on top of gRPC stream, deals with
      39             :  * reconnects and performs batching.
      40             :  */
      41             : template <typename HttpLogProto, typename TcpLogProto> class GrpcAccessLogger {
      42             : public:
      43             :   using SharedPtr = std::shared_ptr<GrpcAccessLogger>;
      44             : 
      45           0 :   virtual ~GrpcAccessLogger() = default;
      46             : 
      47             :   /**
      48             :    * Log http access entry.
      49             :    * @param entry supplies the access log to send.
      50             :    */
      51             :   virtual void log(HttpLogProto&& entry) PURE;
      52             : 
      53             :   /**
      54             :    * Log tcp access entry.
      55             :    * @param entry supplies the access log to send.
      56             :    */
      57             :   virtual void log(TcpLogProto&& entry) PURE;
      58             : };
      59             : 
      60             : /**
      61             :  * Interface for an access logger cache. The cache deals with threading and de-duplicates loggers
      62             :  * for the same configuration.
      63             :  */
      64             : template <typename GrpcAccessLogger, typename ConfigProto> class GrpcAccessLoggerCache {
      65             : public:
      66             :   using SharedPtr = std::shared_ptr<GrpcAccessLoggerCache>;
      67           0 :   virtual ~GrpcAccessLoggerCache() = default;
      68             : 
      69             :   /**
      70             :    * Get existing logger or create a new one for the given configuration.
      71             :    * @param config supplies the configuration for the logger.
      72             :    * @return GrpcAccessLoggerSharedPtr ready for logging requests.
      73             :    */
      74             :   virtual typename GrpcAccessLogger::SharedPtr
      75             :   getOrCreateLogger(const ConfigProto& config, GrpcAccessLoggerType logger_type) PURE;
      76             : };
      77             : 
      78             : template <typename LogRequest, typename LogResponse> class GrpcAccessLogClient {
      79             : public:
      80           0 :   virtual ~GrpcAccessLogClient() = default;
      81             :   virtual bool isConnected() PURE;
      82             :   virtual bool log(const LogRequest& request) PURE;
      83             : 
      84             : protected:
      85             :   GrpcAccessLogClient(const Grpc::RawAsyncClientSharedPtr& client,
      86             :                       const Protobuf::MethodDescriptor& service_method,
      87             :                       OptRef<const envoy::config::core::v3::RetryPolicy> retry_policy)
      88             :       : client_(client), service_method_(service_method),
      89           0 :         opts_(createRequestOptionsForRetry(retry_policy)) {}
      90             : 
      91             :   Grpc::AsyncClient<LogRequest, LogResponse> client_;
      92             :   const Protobuf::MethodDescriptor& service_method_;
      93             :   const Http::AsyncClient::RequestOptions opts_;
      94             : 
      95             : private:
      96             :   Http::AsyncClient::RequestOptions
      97           0 :   createRequestOptionsForRetry(OptRef<const envoy::config::core::v3::RetryPolicy> retry_policy) {
      98           0 :     auto opt = Http::AsyncClient::RequestOptions();
      99             : 
     100           0 :     if (!retry_policy) {
     101           0 :       return opt;
     102           0 :     }
     103             : 
     104           0 :     const auto grpc_retry_policy =
     105           0 :         Http::Utility::convertCoreToRouteRetryPolicy(*retry_policy, "connect-failure");
     106           0 :     opt.setBufferBodyForRetry(true);
     107           0 :     opt.setRetryPolicy(grpc_retry_policy);
     108           0 :     return opt;
     109           0 :   }
     110             : };
     111             : 
     112             : template <typename LogRequest, typename LogResponse>
     113             : class UnaryGrpcAccessLogClient : public GrpcAccessLogClient<LogRequest, LogResponse> {
     114             : public:
     115             :   UnaryGrpcAccessLogClient(const Grpc::RawAsyncClientSharedPtr& client,
     116             :                            const Protobuf::MethodDescriptor& service_method,
     117             :                            OptRef<const envoy::config::core::v3::RetryPolicy> retry_policy)
     118           0 :       : GrpcAccessLogClient<LogRequest, LogResponse>(client, service_method, retry_policy) {}
     119             : 
     120           0 :   bool isConnected() override { return false; }
     121             : 
     122           0 :   bool log(const LogRequest& request) override {
     123           0 :     GrpcAccessLogClient<LogRequest, LogResponse>::client_->send(
     124           0 :         GrpcAccessLogClient<LogRequest, LogResponse>::service_method_, request, request_cb_,
     125           0 :         Tracing::NullSpan::instance(), GrpcAccessLogClient<LogRequest, LogResponse>::opts_);
     126           0 :     return true;
     127           0 :   }
     128             : 
     129             :   struct RequestCallbacks : public Grpc::AsyncRequestCallbacks<LogResponse> {
     130             :     // Grpc::AsyncRequestCallbacks
     131           0 :     void onSuccess(Grpc::ResponsePtr<LogResponse>&&, Tracing::Span&) override {}
     132           0 :     void onCreateInitialMetadata(Http::RequestHeaderMap&) override {}
     133           0 :     void onFailure(Grpc::Status::GrpcStatus, const std::string&, Tracing::Span&) override {}
     134             :   };
     135             : 
     136             : private:
     137             :   RequestCallbacks request_cb_;
     138             : };
     139             : 
     140             : template <typename LogRequest, typename LogResponse>
     141             : class StreamingGrpcAccessLogClient : public GrpcAccessLogClient<LogRequest, LogResponse> {
     142             : public:
     143             :   StreamingGrpcAccessLogClient(const Grpc::RawAsyncClientSharedPtr& client,
     144             :                                const Protobuf::MethodDescriptor& service_method,
     145             :                                OptRef<const envoy::config::core::v3::RetryPolicy> retry_policy)
     146           0 :       : GrpcAccessLogClient<LogRequest, LogResponse>(client, service_method, retry_policy) {}
     147             : 
     148             : public:
     149             :   struct LocalStream : public Grpc::AsyncStreamCallbacks<LogResponse> {
     150           0 :     LocalStream(StreamingGrpcAccessLogClient& parent) : parent_(parent) {}
     151             : 
     152             :     // Grpc::AsyncStreamCallbacks
     153           0 :     void onCreateInitialMetadata(Http::RequestHeaderMap&) override {}
     154           0 :     void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {}
     155           0 :     void onReceiveMessage(std::unique_ptr<LogResponse>&&) override {}
     156           0 :     void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {}
     157           0 :     void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override {
     158           0 :       ASSERT(parent_.stream_ != nullptr);
     159           0 :       if (parent_.stream_->stream_ != nullptr) {
     160             :         // Only reset if we have a stream. Otherwise we had an inline failure and we will clear the
     161             :         // stream data in send().
     162           0 :         parent_.stream_.reset();
     163           0 :       }
     164           0 :     }
     165             : 
     166             :     StreamingGrpcAccessLogClient& parent_;
     167             :     Grpc::AsyncStream<LogRequest> stream_{};
     168             :   };
     169             : 
     170           0 :   bool isConnected() override { return stream_ != nullptr && stream_->stream_ != nullptr; }
     171             : 
     172           0 :   bool log(const LogRequest& request) override {
     173           0 :     if (!stream_) {
     174           0 :       stream_ = std::make_unique<LocalStream>(*this);
     175           0 :     }
     176             : 
     177           0 :     if (stream_->stream_ == nullptr) {
     178           0 :       stream_->stream_ = GrpcAccessLogClient<LogRequest, LogResponse>::client_->start(
     179           0 :           GrpcAccessLogClient<LogRequest, LogResponse>::service_method_, *stream_,
     180           0 :           GrpcAccessLogClient<LogRequest, LogResponse>::opts_);
     181           0 :     }
     182             : 
     183           0 :     if (stream_->stream_ != nullptr) {
     184           0 :       if (stream_->stream_->isAboveWriteBufferHighWatermark()) {
     185           0 :         return false;
     186           0 :       }
     187           0 :       stream_->stream_->sendMessage(request, false);
     188           0 :     } else {
     189             :       // Clear out the stream data due to stream creation failure.
     190           0 :       stream_.reset();
     191           0 :     }
     192           0 :     return true;
     193           0 :   }
     194             : 
     195             :   std::unique_ptr<LocalStream> stream_;
     196             : };
     197             : 
     198             : } // namespace Detail
     199             : 
     200             : /**
     201             :  * All stats for the grpc access logger. @see stats_macros.h
     202             :  */
     203             : #define ALL_GRPC_ACCESS_LOGGER_STATS(COUNTER)                                                      \
     204             :   COUNTER(logs_written)                                                                            \
     205             :   COUNTER(logs_dropped)
     206             : 
     207             : /**
     208             :  * Wrapper struct for the access log stats. @see stats_macros.h
     209             :  */
     210             : struct GrpcAccessLoggerStats {
     211             :   ALL_GRPC_ACCESS_LOGGER_STATS(GENERATE_COUNTER_STRUCT)
     212             : };
     213             : 
     214             : /**
     215             :  * Base class for defining a gRPC logger with the `HttpLogProto` and `TcpLogProto` access log
     216             :  * entries and `LogRequest` and `LogResponse` gRPC messages.
     217             :  * The log entries and messages are distinct types to support batching of multiple access log
     218             :  * entries in a single gRPC messages that go on the wire.
     219             :  */
     220             : template <typename HttpLogProto, typename TcpLogProto, typename LogRequest, typename LogResponse>
     221             : class GrpcAccessLogger : public Detail::GrpcAccessLogger<HttpLogProto, TcpLogProto> {
     222             : public:
     223             :   using Interface = Detail::GrpcAccessLogger<HttpLogProto, TcpLogProto>;
     224             :   GrpcAccessLogger(
     225             :       const Grpc::RawAsyncClientSharedPtr& client,
     226             :       const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
     227             :       Event::Dispatcher& dispatcher, Stats::Scope& scope, std::string access_log_prefix,
     228             :       const Protobuf::MethodDescriptor& service_method, bool stream = true)
     229             :       : buffer_flush_interval_msec_(
     230             :             PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_interval, 1000)),
     231           0 :         flush_timer_(dispatcher.createTimer([this]() {
     232           0 :           flush();
     233           0 :           flush_timer_->enableTimer(buffer_flush_interval_msec_);
     234           0 :         })),
     235             :         max_buffer_size_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, 16384)),
     236           0 :         stats_({ALL_GRPC_ACCESS_LOGGER_STATS(POOL_COUNTER_PREFIX(scope, access_log_prefix))}) {
     237           0 :     if (stream) {
     238           0 :       client_ = std::make_unique<Detail::StreamingGrpcAccessLogClient<LogRequest, LogResponse>>(
     239           0 :           client, service_method, GrpcCommon::optionalRetryPolicy(config));
     240           0 :     } else {
     241           0 :       client_ = std::make_unique<Detail::UnaryGrpcAccessLogClient<LogRequest, LogResponse>>(
     242           0 :           client, service_method, GrpcCommon::optionalRetryPolicy(config));
     243           0 :     }
     244           0 :     flush_timer_->enableTimer(buffer_flush_interval_msec_);
     245           0 :   }
     246             : 
     247           0 :   void log(HttpLogProto&& entry) override {
     248           0 :     if (!canLogMore()) {
     249           0 :       return;
     250           0 :     }
     251           0 :     approximate_message_size_bytes_ += entry.ByteSizeLong();
     252           0 :     addEntry(std::move(entry));
     253           0 :     if (approximate_message_size_bytes_ >= max_buffer_size_bytes_) {
     254           0 :       flush();
     255           0 :     }
     256           0 :   }
     257             : 
     258           0 :   void log(TcpLogProto&& entry) override {
     259           0 :     approximate_message_size_bytes_ += entry.ByteSizeLong();
     260           0 :     addEntry(std::move(entry));
     261           0 :     if (approximate_message_size_bytes_ >= max_buffer_size_bytes_) {
     262           0 :       flush();
     263           0 :     }
     264           0 :   }
     265             : 
     266             : protected:
     267             :   std::unique_ptr<Detail::GrpcAccessLogClient<LogRequest, LogResponse>> client_;
     268             :   LogRequest message_;
     269             : 
     270             : private:
     271             :   virtual bool isEmpty() PURE;
     272             :   virtual void initMessage() PURE;
     273             :   virtual void addEntry(HttpLogProto&& entry) PURE;
     274             :   virtual void addEntry(TcpLogProto&& entry) PURE;
     275           0 :   virtual void clearMessage() { message_.Clear(); }
     276             : 
     277           0 :   void flush() {
     278           0 :     if (isEmpty()) {
     279             :       // Nothing to flush.
     280           0 :       return;
     281           0 :     }
     282             : 
     283           0 :     if (!client_->isConnected()) {
     284           0 :       initMessage();
     285           0 :     }
     286             : 
     287           0 :     if (client_->log(message_)) {
     288             :       // Clear the message regardless of the success.
     289           0 :       approximate_message_size_bytes_ = 0;
     290           0 :       clearMessage();
     291           0 :     }
     292           0 :   }
     293             : 
     294           0 :   bool canLogMore() {
     295           0 :     if (max_buffer_size_bytes_ == 0 || approximate_message_size_bytes_ < max_buffer_size_bytes_) {
     296           0 :       stats_.logs_written_.inc();
     297           0 :       return true;
     298           0 :     }
     299           0 :     flush();
     300           0 :     if (approximate_message_size_bytes_ < max_buffer_size_bytes_) {
     301           0 :       stats_.logs_written_.inc();
     302           0 :       return true;
     303           0 :     }
     304           0 :     stats_.logs_dropped_.inc();
     305           0 :     return false;
     306           0 :   }
     307             : 
     308             :   const std::chrono::milliseconds buffer_flush_interval_msec_;
     309             :   const Event::TimerPtr flush_timer_;
     310             :   const uint64_t max_buffer_size_bytes_;
     311             :   uint64_t approximate_message_size_bytes_ = 0;
     312             :   GrpcAccessLoggerStats stats_;
     313             : };
     314             : 
     315             : /**
     316             :  * Class for defining logger cache with the `GrpcAccessLogger` interface and
     317             :  * `ConfigProto` configuration.
     318             :  */
     319             : template <typename GrpcAccessLogger, typename ConfigProto>
     320             : class GrpcAccessLoggerCache : public Singleton::Instance,
     321             :                               public Detail::GrpcAccessLoggerCache<GrpcAccessLogger, ConfigProto> {
     322             : public:
     323             :   using Interface = Detail::GrpcAccessLoggerCache<GrpcAccessLogger, ConfigProto>;
     324             : 
     325             :   GrpcAccessLoggerCache(Grpc::AsyncClientManager& async_client_manager, Stats::Scope& scope,
     326             :                         ThreadLocal::SlotAllocator& tls)
     327           0 :       : scope_(scope), async_client_manager_(async_client_manager), tls_slot_(tls.allocateSlot()) {
     328           0 :     tls_slot_->set([](Event::Dispatcher& dispatcher) {
     329           0 :       return std::make_shared<ThreadLocalCache>(dispatcher);
     330           0 :     });
     331           0 :   }
     332             : 
     333             :   typename GrpcAccessLogger::SharedPtr
     334           0 :   getOrCreateLogger(const ConfigProto& config, GrpcAccessLoggerType logger_type) override {
     335             :     // TODO(euroelessar): Consider cleaning up loggers.
     336           0 :     auto& cache = tls_slot_->getTyped<ThreadLocalCache>();
     337           0 :     const auto cache_key = std::make_pair(MessageUtil::hash(config), logger_type);
     338           0 :     const auto it = cache.access_loggers_.find(cache_key);
     339           0 :     if (it != cache.access_loggers_.end()) {
     340           0 :       return it->second;
     341           0 :     }
     342             : 
     343           0 :     const auto logger = createLogger(config, cache.dispatcher_);
     344           0 :     cache.access_loggers_.emplace(cache_key, logger);
     345           0 :     return logger;
     346           0 :   }
     347             : 
     348             : protected:
     349             :   Stats::Scope& scope_;
     350             :   Grpc::AsyncClientManager& async_client_manager_;
     351             : 
     352             : private:
     353             :   /**
     354             :    * Per-thread cache.
     355             :    */
     356             :   struct ThreadLocalCache : public ThreadLocal::ThreadLocalObject {
     357           0 :     ThreadLocalCache(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
     358             : 
     359             :     Event::Dispatcher& dispatcher_;
     360             :     // Access loggers indexed by the hash of logger's configuration and logger type.
     361             :     absl::flat_hash_map<std::pair<std::size_t, Common::GrpcAccessLoggerType>,
     362             :                         typename GrpcAccessLogger::SharedPtr>
     363             :         access_loggers_;
     364             :   };
     365             : 
     366             :   // Create the specific logger type for this cache.
     367             :   virtual typename GrpcAccessLogger::SharedPtr createLogger(const ConfigProto& config,
     368             :                                                             Event::Dispatcher& dispatcher) PURE;
     369             : 
     370             :   ThreadLocal::SlotPtr tls_slot_;
     371             : };
     372             : 
     373             : } // namespace Common
     374             : } // namespace AccessLoggers
     375             : } // namespace Extensions
     376             : } // namespace Envoy

Generated by: LCOV version 1.15