Line data Source code
1 : #include "source/extensions/access_loggers/open_telemetry/grpc_access_log_impl.h" 2 : 3 : #include "envoy/extensions/access_loggers/grpc/v3/als.pb.h" 4 : #include "envoy/extensions/access_loggers/open_telemetry/v3/logs_service.pb.h" 5 : #include "envoy/grpc/async_client_manager.h" 6 : #include "envoy/local_info/local_info.h" 7 : 8 : #include "source/common/config/utility.h" 9 : #include "source/common/grpc/typed_async_client.h" 10 : 11 : #include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h" 12 : #include "opentelemetry/proto/common/v1/common.pb.h" 13 : #include "opentelemetry/proto/logs/v1/logs.pb.h" 14 : #include "opentelemetry/proto/resource/v1/resource.pb.h" 15 : 16 : const char GRPC_LOG_STATS_PREFIX[] = "access_logs.open_telemetry_access_log."; 17 : 18 : namespace Envoy { 19 : namespace Extensions { 20 : namespace AccessLoggers { 21 : namespace OpenTelemetry { 22 : 23 : GrpcAccessLoggerImpl::GrpcAccessLoggerImpl( 24 : const Grpc::RawAsyncClientSharedPtr& client, 25 : const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& 26 : config, 27 : Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope) 28 : : GrpcAccessLogger(client, config.common_config(), dispatcher, scope, GRPC_LOG_STATS_PREFIX, 29 : *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( 30 : "opentelemetry.proto.collector.logs.v1.LogsService.Export"), 31 0 : false) { 32 0 : initMessageRoot(config, local_info); 33 0 : } 34 : 35 : namespace { 36 : 37 : opentelemetry::proto::common::v1::KeyValue getStringKeyValue(const std::string& key, 38 0 : const std::string& value) { 39 0 : opentelemetry::proto::common::v1::KeyValue keyValue; 40 0 : keyValue.set_key(key); 41 0 : keyValue.mutable_value()->set_string_value(value); 42 0 : return keyValue; 43 0 : } 44 : 45 : } // namespace 46 : 47 : // See comment about the structure of repeated fields in the header file. 48 : void GrpcAccessLoggerImpl::initMessageRoot( 49 : const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& 50 : config, 51 0 : const LocalInfo::LocalInfo& local_info) { 52 0 : auto* resource_logs = message_.add_resource_logs(); 53 0 : root_ = resource_logs->add_scope_logs(); 54 0 : auto* resource = resource_logs->mutable_resource(); 55 0 : if (!config.disable_builtin_labels()) { 56 0 : *resource->add_attributes() = getStringKeyValue("log_name", config.common_config().log_name()); 57 0 : *resource->add_attributes() = getStringKeyValue("zone_name", local_info.zoneName()); 58 0 : *resource->add_attributes() = getStringKeyValue("cluster_name", local_info.clusterName()); 59 0 : *resource->add_attributes() = getStringKeyValue("node_name", local_info.nodeName()); 60 0 : } 61 : 62 0 : for (const auto& pair : config.resource_attributes().values()) { 63 0 : *resource->add_attributes() = pair; 64 0 : } 65 0 : } 66 : 67 0 : void GrpcAccessLoggerImpl::addEntry(opentelemetry::proto::logs::v1::LogRecord&& entry) { 68 0 : root_->mutable_log_records()->Add(std::move(entry)); 69 0 : } 70 : 71 0 : bool GrpcAccessLoggerImpl::isEmpty() { return root_->log_records().empty(); } 72 : 73 : // The message is already initialized in the c'tor, and only the logs are cleared. 74 0 : void GrpcAccessLoggerImpl::initMessage() {} 75 : 76 0 : void GrpcAccessLoggerImpl::clearMessage() { root_->clear_log_records(); } 77 : 78 : GrpcAccessLoggerCacheImpl::GrpcAccessLoggerCacheImpl(Grpc::AsyncClientManager& async_client_manager, 79 : Stats::Scope& scope, 80 : ThreadLocal::SlotAllocator& tls, 81 : const LocalInfo::LocalInfo& local_info) 82 0 : : GrpcAccessLoggerCache(async_client_manager, scope, tls), local_info_(local_info) {} 83 : 84 : GrpcAccessLoggerImpl::SharedPtr GrpcAccessLoggerCacheImpl::createLogger( 85 : const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& 86 : config, 87 0 : Event::Dispatcher& dispatcher) { 88 : // We pass skip_cluster_check=true to factoryForGrpcService in order to avoid throwing 89 : // exceptions in worker threads. Call sites of this getOrCreateLogger must check the cluster 90 : // availability via ClusterManager::checkActiveStaticCluster beforehand, and throw exceptions in 91 : // the main thread if necessary. 92 0 : auto client = async_client_manager_ 93 0 : .factoryForGrpcService(config.common_config().grpc_service(), scope_, true) 94 0 : ->createUncachedRawAsyncClient(); 95 0 : return std::make_shared<GrpcAccessLoggerImpl>(std::move(client), config, dispatcher, local_info_, 96 0 : scope_); 97 0 : } 98 : 99 : } // namespace OpenTelemetry 100 : } // namespace AccessLoggers 101 : } // namespace Extensions 102 : } // namespace Envoy