1
#include "source/extensions/access_loggers/open_telemetry/grpc_access_log_impl.h"
2

            
3
#include "envoy/extensions/access_loggers/grpc/v3/als.pb.h"
4
#include "envoy/extensions/access_loggers/open_telemetry/v3/logs_service.pb.h"
5
#include "envoy/grpc/async_client_manager.h"
6
#include "envoy/local_info/local_info.h"
7

            
8
#include "source/common/config/utility.h"
9
#include "source/common/grpc/typed_async_client.h"
10
#include "source/common/protobuf/utility.h"
11
#include "source/extensions/access_loggers/common/grpc_access_logger_clients.h"
12
#include "source/extensions/access_loggers/open_telemetry/otlp_log_utils.h"
13

            
14
#include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h"
15
#include "opentelemetry/proto/common/v1/common.pb.h"
16
#include "opentelemetry/proto/logs/v1/logs.pb.h"
17
#include "opentelemetry/proto/resource/v1/resource.pb.h"
18

            
19
namespace Envoy {
20
namespace Extensions {
21
namespace AccessLoggers {
22
namespace OpenTelemetry {
23

            
24
namespace {
25
using opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest;
26
using opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse;
27
} // namespace
28

            
29
GrpcAccessLoggerImpl::GrpcAccessLoggerImpl(
30
    const Grpc::RawAsyncClientSharedPtr& client,
31
    const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
32
        config,
33
    Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope)
34
18
    : GrpcAccessLogger(
35
18
          config.common_config(), dispatcher, scope, std::nullopt,
36
18
          std::make_unique<Common::UnaryGrpcAccessLogClient<ExportLogsServiceRequest,
37
18
                                                            ExportLogsServiceResponse>>(
38
18
              client,
39
18
              *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
40
18
                  "opentelemetry.proto.collector.logs.v1.LogsService.Export"),
41
18
              GrpcCommon::optionalRetryPolicy(config.common_config()), genOTelCallbacksFactory())),
42
18
      stats_({ALL_GRPC_ACCESS_LOGGER_STATS(POOL_COUNTER_PREFIX(
43
18
          scope, absl::StrCat(OtlpAccessLogStatsPrefix, config.stat_prefix())))}) {
44
18
  root_ = initOtlpMessageRoot(message_, config, local_info);
45
18
}
46

            
47
std::function<GrpcAccessLoggerImpl::OTelLogRequestCallbacks&()>
48
18
GrpcAccessLoggerImpl::genOTelCallbacksFactory() {
49
19
  return [this]() -> OTelLogRequestCallbacks& {
50
17
    auto callback = std::make_unique<OTelLogRequestCallbacks>(
51
17
        this->stats_, this->batched_log_entries_, [this](OTelLogRequestCallbacks* p) {
52
17
          if (this->callbacks_.contains(p)) {
53
17
            this->callbacks_.erase(p);
54
17
          }
55
17
        });
56
17
    OTelLogRequestCallbacks* ptr = callback.get();
57
17
    this->batched_log_entries_ = 0;
58
17
    this->callbacks_.emplace(ptr, std::move(callback));
59
17
    return *ptr;
60
17
  };
61
18
}
62
17
void GrpcAccessLoggerImpl::addEntry(opentelemetry::proto::logs::v1::LogRecord&& entry) {
63
17
  batched_log_entries_++;
64
17
  root_->mutable_log_records()->Add(std::move(entry));
65
17
}
66

            
67
30
bool GrpcAccessLoggerImpl::isEmpty() { return root_->log_records().empty(); }
68

            
69
// The message is already initialized in the c'tor, and only the logs are cleared.
70
17
void GrpcAccessLoggerImpl::initMessage() {}
71

            
72
17
void GrpcAccessLoggerImpl::clearMessage() { root_->clear_log_records(); }
73

            
74
GrpcAccessLoggerCacheImpl::GrpcAccessLoggerCacheImpl(Grpc::AsyncClientManager& async_client_manager,
75
                                                     Stats::Scope& scope,
76
                                                     ThreadLocal::SlotAllocator& tls,
77
                                                     const LocalInfo::LocalInfo& local_info)
78
11
    : GrpcAccessLoggerCache(async_client_manager, scope, tls), local_info_(local_info) {}
79

            
80
GrpcAccessLoggerImpl::SharedPtr GrpcAccessLoggerCacheImpl::createLogger(
81
    const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
82
        config,
83
15
    Event::Dispatcher& dispatcher) {
84
  // We pass skip_cluster_check=true to factoryForGrpcService in order to avoid throwing
85
  // exceptions in worker threads. Call sites of this getOrCreateLogger must check the cluster
86
  // availability via ClusterManager::checkActiveStaticCluster beforehand, and throw exceptions in
87
  // the main thread if necessary to ensure it does not throw here.
88
15
  auto factory_or_error =
89
15
      async_client_manager_.factoryForGrpcService(getGrpcService(config), scope_, true);
90
15
  THROW_IF_NOT_OK_REF(factory_or_error.status());
91
15
  auto client = THROW_OR_RETURN_VALUE(factory_or_error.value()->createUncachedRawAsyncClient(),
92
15
                                      Grpc::RawAsyncClientPtr);
93
15
  return std::make_shared<GrpcAccessLoggerImpl>(std::move(client), config, dispatcher, local_info_,
94
15
                                                scope_);
95
15
}
96

            
97
} // namespace OpenTelemetry
98
} // namespace AccessLoggers
99
} // namespace Extensions
100
} // namespace Envoy