Line data Source code
1 : #include "source/extensions/access_loggers/grpc/grpc_access_log_impl.h" 2 : 3 : #include "envoy/data/accesslog/v3/accesslog.pb.h" 4 : #include "envoy/extensions/access_loggers/grpc/v3/als.pb.h" 5 : #include "envoy/grpc/async_client_manager.h" 6 : #include "envoy/local_info/local_info.h" 7 : 8 : #include "source/common/config/utility.h" 9 : #include "source/common/grpc/typed_async_client.h" 10 : 11 : const char GRPC_LOG_STATS_PREFIX[] = "access_logs.grpc_access_log."; 12 : 13 : namespace Envoy { 14 : namespace Extensions { 15 : namespace AccessLoggers { 16 : namespace GrpcCommon { 17 : 18 : GrpcAccessLoggerImpl::GrpcAccessLoggerImpl( 19 : const Grpc::RawAsyncClientSharedPtr& client, 20 : const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config, 21 : Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope) 22 : : GrpcAccessLogger(std::move(client), config, dispatcher, scope, GRPC_LOG_STATS_PREFIX, 23 : *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( 24 : "envoy.service.accesslog.v3.AccessLogService.StreamAccessLogs")), 25 0 : log_name_(config.log_name()), local_info_(local_info) {} 26 : 27 0 : void GrpcAccessLoggerImpl::addEntry(envoy::data::accesslog::v3::HTTPAccessLogEntry&& entry) { 28 0 : message_.mutable_http_logs()->mutable_log_entry()->Add(std::move(entry)); 29 0 : } 30 : 31 0 : void GrpcAccessLoggerImpl::addEntry(envoy::data::accesslog::v3::TCPAccessLogEntry&& entry) { 32 0 : message_.mutable_tcp_logs()->mutable_log_entry()->Add(std::move(entry)); 33 0 : } 34 : 35 0 : bool GrpcAccessLoggerImpl::isEmpty() { 36 0 : return !message_.has_http_logs() && !message_.has_tcp_logs(); 37 0 : } 38 : 39 0 : void GrpcAccessLoggerImpl::initMessage() { 40 0 : auto* identifier = message_.mutable_identifier(); 41 0 : *identifier->mutable_node() = local_info_.node(); 42 0 : identifier->set_log_name(log_name_); 43 0 : } 44 : 45 : GrpcAccessLoggerCacheImpl::GrpcAccessLoggerCacheImpl(Grpc::AsyncClientManager& async_client_manager, 46 : Stats::Scope& scope, 47 : ThreadLocal::SlotAllocator& tls, 48 : const LocalInfo::LocalInfo& local_info) 49 0 : : GrpcAccessLoggerCache(async_client_manager, scope, tls), local_info_(local_info) {} 50 : 51 : GrpcAccessLoggerImpl::SharedPtr GrpcAccessLoggerCacheImpl::createLogger( 52 : const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config, 53 0 : Event::Dispatcher& dispatcher) { 54 : // We pass skip_cluster_check=true to factoryForGrpcService in order to avoid throwing 55 : // exceptions in worker threads. Call sites of this getOrCreateLogger must check the cluster 56 : // availability via ClusterManager::checkActiveStaticCluster beforehand, and throw exceptions in 57 : // the main thread if necessary. 58 0 : auto client = async_client_manager_.factoryForGrpcService(config.grpc_service(), scope_, true) 59 0 : ->createUncachedRawAsyncClient(); 60 0 : return std::make_shared<GrpcAccessLoggerImpl>(std::move(client), config, dispatcher, local_info_, 61 0 : scope_); 62 0 : } 63 : 64 : } // namespace GrpcCommon 65 : } // namespace AccessLoggers 66 : } // namespace Extensions 67 : } // namespace Envoy