1
#pragma once
2

            
3
#include <memory>
4

            
5
#include "envoy/event/dispatcher.h"
6
#include "envoy/extensions/access_loggers/grpc/v3/als.pb.h"
7
#include "envoy/extensions/access_loggers/open_telemetry/v3/logs_service.pb.h"
8
#include "envoy/grpc/async_client_manager.h"
9
#include "envoy/local_info/local_info.h"
10
#include "envoy/thread_local/thread_local.h"
11

            
12
#include "source/common/protobuf/protobuf.h"
13
#include "source/extensions/access_loggers/common/grpc_access_logger.h"
14

            
15
#include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h"
16
#include "opentelemetry/proto/common/v1/common.pb.h"
17
#include "opentelemetry/proto/logs/v1/logs.pb.h"
18
#include "opentelemetry/proto/resource/v1/resource.pb.h"
19

            
20
namespace Envoy {
21
namespace Extensions {
22
namespace AccessLoggers {
23
namespace OpenTelemetry {
24

            
25
// Note: OpenTelemetry protos are extra flexible and used also in the OT collector for batching and
26
// so forth. As a result, some fields are repeated, but for our use case we assume the following
27
// structure:
28
// ExportLogsServiceRequest -> (single) ResourceLogs -> (single) ScopeLogs ->
29
// (repeated) LogRecord.
30
class GrpcAccessLoggerImpl
31
    : public Common::GrpcAccessLogger<
32
          opentelemetry::proto::logs::v1::LogRecord,
33
          // OpenTelemetry logging uses LogRecord for both HTTP and TCP, so protobuf::Empty is used
34
          // as an empty placeholder for the non-used addEntry method.
35
          // TODO(itamarkam): Don't cache OpenTelemetry loggers by type (HTTP/TCP).
36
          Protobuf::Empty, opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest,
37
          opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse> {
38
public:
39
  GrpcAccessLoggerImpl(
40
      const Grpc::RawAsyncClientSharedPtr& client,
41
      const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
42
          config,
43
      Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope);
44

            
45
private:
46
  class OTelLogRequestCallbacks
47
      : public Grpc::AsyncRequestCallbacks<
48
            opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse> {
49
  public:
50
    OTelLogRequestCallbacks(Common::GrpcAccessLoggerStats& stats, uint32_t sending_log_entries,
51
                            std::function<void(OTelLogRequestCallbacks*)> deletion)
52
17
        : stats_(stats), sending_log_entries_(sending_log_entries), deletion_(deletion) {}
53

            
54
8
    void onCreateInitialMetadata(Http::RequestHeaderMap&) override {}
55

            
56
    void onSuccess(Grpc::ResponsePtr<
57
                       opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse>&& resp,
58
16
                   Tracing::Span&) override {
59
16
      const uint32_t partial_rejected_log_entries =
60
16
          (resp && resp->has_partial_success()) ? resp->partial_success().rejected_log_records()
61
16
                                                : 0;
62
      // For the unexpected case where partial rejected log entries are more than
63
      // sending log entries, we just regard all of them are dropped.
64
16
      if (sending_log_entries_ < partial_rejected_log_entries) {
65
        stats_.logs_dropped_.add(sending_log_entries_);
66
16
      } else {
67
16
        stats_.logs_dropped_.add(partial_rejected_log_entries);
68
16
        stats_.logs_written_.add(sending_log_entries_ - partial_rejected_log_entries);
69
16
      }
70

            
71
16
      deletion_(this);
72
16
    }
73

            
74
1
    void onFailure(Grpc::Status::GrpcStatus, const std::string&, Tracing::Span&) override {
75
1
      stats_.logs_dropped_.add(sending_log_entries_);
76
1
      deletion_(this);
77
1
    }
78

            
79
    Common::GrpcAccessLoggerStats& stats_;
80
    const uint32_t sending_log_entries_;
81
    std::function<void(OTelLogRequestCallbacks*)> deletion_;
82
  };
83

            
84
  // Extensions::AccessLoggers::GrpcCommon::GrpcAccessLogger
85
  void addEntry(opentelemetry::proto::logs::v1::LogRecord&& entry) override;
86
  // Non used addEntry method (the above is used for both TCP and HTTP).
87
1
  void addEntry(Protobuf::Empty&& entry) override { (void)entry; };
88
  bool isEmpty() override;
89
  void initMessage() override;
90
  void clearMessage() override;
91

            
92
  std::function<OTelLogRequestCallbacks&()> genOTelCallbacksFactory();
93

            
94
  opentelemetry::proto::logs::v1::ScopeLogs* root_;
95
  Common::GrpcAccessLoggerStats stats_;
96

            
97
  // Hold the ownership of `OTelLogRequestCallbacks` and `OTelLogRequestCallbacks.deletion_` called
98
  // in the callback time will be responsible to remove itself from map for deletion. If
99
  // `GrpcAccessLoggerImpl` get deleted, it will cancel all the requests on the flight. Therefore,
100
  // we guarantee the GrpcAccessLoggerImpl is always alive when `OTelLogRequestCallbacks`'s
101
  // callbacks get called.
102
  absl::flat_hash_map<OTelLogRequestCallbacks*, std::unique_ptr<OTelLogRequestCallbacks>>
103
      callbacks_;
104
  uint32_t batched_log_entries_ = 0;
105
};
106

            
107
class GrpcAccessLoggerCacheImpl
108
    : public Common::GrpcAccessLoggerCache<
109
          GrpcAccessLoggerImpl,
110
          envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig> {
111
public:
112
  GrpcAccessLoggerCacheImpl(Grpc::AsyncClientManager& async_client_manager, Stats::Scope& scope,
113
                            ThreadLocal::SlotAllocator& tls,
114
                            const LocalInfo::LocalInfo& local_info);
115

            
116
private:
117
  // Common::GrpcAccessLoggerCache
118
  GrpcAccessLoggerImpl::SharedPtr createLogger(
119
      const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
120
          config,
121
      Event::Dispatcher& dispatcher) override;
122

            
123
  const LocalInfo::LocalInfo& local_info_;
124
};
125

            
126
/**
127
 * Aliases for class interfaces for mock definitions.
128
 */
129
using GrpcAccessLogger = GrpcAccessLoggerImpl::Interface;
130
using GrpcAccessLoggerSharedPtr = GrpcAccessLogger::SharedPtr;
131

            
132
using GrpcAccessLoggerCache = GrpcAccessLoggerCacheImpl::Interface;
133
using GrpcAccessLoggerCacheSharedPtr = GrpcAccessLoggerCache::SharedPtr;
134

            
135
} // namespace OpenTelemetry
136
} // namespace AccessLoggers
137
} // namespace Extensions
138
} // namespace Envoy