1
#pragma once
2

            
3
#include <memory>
4
#include <vector>
5

            
6
#include "envoy/access_log/access_log.h"
7
#include "envoy/config/core/v3/http_service.pb.h"
8
#include "envoy/event/dispatcher.h"
9
#include "envoy/extensions/access_loggers/open_telemetry/v3/logs_service.pb.h"
10
#include "envoy/local_info/local_info.h"
11
#include "envoy/singleton/instance.h"
12
#include "envoy/thread_local/thread_local.h"
13
#include "envoy/upstream/cluster_manager.h"
14

            
15
#include "source/common/common/logger.h"
16
#include "source/common/http/async_client_impl.h"
17
#include "source/common/http/async_client_utility.h"
18
#include "source/common/http/headers.h"
19
#include "source/common/http/message_impl.h"
20
#include "source/common/http/utility.h"
21
#include "source/common/protobuf/protobuf.h"
22
#include "source/common/tracing/custom_tag_impl.h"
23
#include "source/extensions/access_loggers/common/access_log_base.h"
24
#include "source/extensions/access_loggers/open_telemetry/otlp_log_utils.h"
25
#include "source/extensions/access_loggers/open_telemetry/substitution_formatter.h"
26

            
27
#include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h"
28
#include "opentelemetry/proto/common/v1/common.pb.h"
29
#include "opentelemetry/proto/logs/v1/logs.pb.h"
30
#include "opentelemetry/proto/resource/v1/resource.pb.h"
31

            
32
namespace Envoy {
33
namespace Extensions {
34
namespace AccessLoggers {
35
namespace OpenTelemetry {
36

            
37
/**
38
 * HTTP access logger that exports OTLP logs over HTTP.
39
 * Follows the same pattern as OpenTelemetryHttpTraceExporter.
40
 */
41
class HttpAccessLoggerImpl : public Logger::Loggable<Logger::Id::misc>,
42
                             public Http::AsyncClient::Callbacks {
43
public:
44
  HttpAccessLoggerImpl(
45
      Upstream::ClusterManager& cluster_manager,
46
      const envoy::config::core::v3::HttpService& http_service,
47
      const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
48
          config,
49
      Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope);
50

            
51
  using SharedPtr = std::shared_ptr<HttpAccessLoggerImpl>;
52

            
53
  /**
54
   * Log a single log entry. Batches entries and flushes periodically.
55
   */
56
  void log(opentelemetry::proto::logs::v1::LogRecord&& entry);
57

            
58
  // Http::AsyncClient::Callbacks.
59
  void onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&&) override;
60
  void onFailure(const Http::AsyncClient::Request&, Http::AsyncClient::FailureReason) override;
61
9
  void onBeforeFinalizeUpstreamSpan(Tracing::Span&, const Http::ResponseHeaderMap*) override {}
62

            
63
private:
64
  void flush();
65

            
66
  Upstream::ClusterManager& cluster_manager_;
67
  envoy::config::core::v3::HttpService http_service_;
68
  // Track active HTTP requests to be able to cancel them on destruction.
69
  Http::AsyncClientRequestTracker active_requests_;
70
  std::vector<std::pair<const Http::LowerCaseString, const std::string>> parsed_headers_to_add_;
71

            
72
  // Message structure: ExportLogsServiceRequest -> ResourceLogs -> ScopeLogs -> LogRecord.
73
  opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest message_;
74
  opentelemetry::proto::logs::v1::ScopeLogs* root_;
75

            
76
  // Batching timer.
77
  Event::TimerPtr flush_timer_;
78
  const std::chrono::milliseconds buffer_flush_interval_;
79
  const uint64_t max_buffer_size_bytes_;
80
  uint64_t approximate_message_size_bytes_ = 0;
81

            
82
  OtlpAccessLogStats stats_;
83
  uint32_t batched_log_entries_ = 0;
84
  uint32_t in_flight_log_entries_ = 0;
85
};
86

            
87
/**
88
 * Cache for HTTP access loggers. Creates one logger per unique configuration.
89
 */
90
class HttpAccessLoggerCacheImpl : public Singleton::Instance,
91
                                  public Logger::Loggable<Logger::Id::misc> {
92
public:
93
  HttpAccessLoggerCacheImpl(Upstream::ClusterManager& cluster_manager, Stats::Scope& scope,
94
                            ThreadLocal::SlotAllocator& tls,
95
                            const LocalInfo::LocalInfo& local_info);
96

            
97
  HttpAccessLoggerImpl::SharedPtr getOrCreateLogger(
98
      const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
99
          config,
100
      const envoy::config::core::v3::HttpService& http_service);
101

            
102
private:
103
  struct ThreadLocalCache : public ThreadLocal::ThreadLocalObject {
104
12
    ThreadLocalCache(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
105
    Event::Dispatcher& dispatcher_;
106
    absl::flat_hash_map<std::size_t, HttpAccessLoggerImpl::SharedPtr> access_loggers_;
107
  };
108

            
109
  Upstream::ClusterManager& cluster_manager_;
110
  Stats::Scope& scope_;
111
  ThreadLocal::SlotPtr tls_slot_;
112
  const LocalInfo::LocalInfo& local_info_;
113
};
114

            
115
using HttpAccessLoggerCacheSharedPtr = std::shared_ptr<HttpAccessLoggerCacheImpl>;
116

            
117
/**
118
 * Access log instance that streams logs over HTTP.
119
 */
120
class HttpAccessLog : public Common::ImplBase {
121
public:
122
  HttpAccessLog(
123
      ::Envoy::AccessLog::FilterPtr&& filter,
124
      envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config,
125
      ThreadLocal::SlotAllocator& tls, HttpAccessLoggerCacheSharedPtr access_logger_cache,
126
      const std::vector<Formatter::CommandParserPtr>& commands);
127

            
128
private:
129
  /**
130
   * Per-thread cached logger.
131
   */
132
  struct ThreadLocalLogger : public ThreadLocal::ThreadLocalObject {
133
    ThreadLocalLogger(HttpAccessLoggerImpl::SharedPtr logger);
134

            
135
    const HttpAccessLoggerImpl::SharedPtr logger_;
136
  };
137

            
138
  // Common::ImplBase
139
  void emitLog(const Formatter::Context& context, const StreamInfo::StreamInfo& info) override;
140

            
141
  const ThreadLocal::SlotPtr tls_slot_;
142
  const HttpAccessLoggerCacheSharedPtr access_logger_cache_;
143
  const envoy::config::core::v3::HttpService http_service_;
144
  std::unique_ptr<OpenTelemetryFormatter> body_formatter_;
145
  std::unique_ptr<OpenTelemetryFormatter> attributes_formatter_;
146
  const std::vector<std::string> filter_state_objects_to_log_;
147
  const std::vector<Tracing::CustomTagConstSharedPtr> custom_tags_;
148
};
149

            
150
using HttpAccessLogPtr = std::unique_ptr<HttpAccessLog>;
151

            
152
} // namespace OpenTelemetry
153
} // namespace AccessLoggers
154
} // namespace Extensions
155
} // namespace Envoy