1
#pragma once
2

            
3
#include <memory>
4
#include <vector>
5

            
6
#include "envoy/access_log/access_log.h"
7
#include "envoy/config/core/v3/http_service.pb.h"
8
#include "envoy/event/dispatcher.h"
9
#include "envoy/extensions/access_loggers/open_telemetry/v3/logs_service.pb.h"
10
#include "envoy/server/factory_context.h"
11
#include "envoy/singleton/instance.h"
12
#include "envoy/thread_local/thread_local.h"
13
#include "envoy/upstream/cluster_manager.h"
14

            
15
#include "source/common/common/logger.h"
16
#include "source/common/http/async_client_impl.h"
17
#include "source/common/http/async_client_utility.h"
18
#include "source/common/http/http_service_headers.h"
19
#include "source/common/http/message_impl.h"
20
#include "source/common/http/utility.h"
21
#include "source/common/protobuf/protobuf.h"
22
#include "source/common/tracing/custom_tag_impl.h"
23
#include "source/extensions/access_loggers/common/access_log_base.h"
24
#include "source/extensions/access_loggers/open_telemetry/otlp_log_utils.h"
25
#include "source/extensions/access_loggers/open_telemetry/substitution_formatter.h"
26

            
27
#include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h"
28
#include "opentelemetry/proto/common/v1/common.pb.h"
29
#include "opentelemetry/proto/logs/v1/logs.pb.h"
30
#include "opentelemetry/proto/resource/v1/resource.pb.h"
31

            
32
namespace Envoy {
33
namespace Extensions {
34
namespace AccessLoggers {
35
namespace OpenTelemetry {
36

            
37
/**
38
 * HTTP access logger that exports OTLP logs over HTTP.
39
 * Follows the same pattern as OpenTelemetryHttpTraceExporter.
40
 */
41
class HttpAccessLoggerImpl : public Logger::Loggable<Logger::Id::misc>,
42
                             public Http::AsyncClient::Callbacks {
43
public:
44
  HttpAccessLoggerImpl(
45
      Upstream::ClusterManager& cluster_manager,
46
      const envoy::config::core::v3::HttpService& http_service,
47
      std::shared_ptr<const Http::HttpServiceHeadersApplicator> headers_applicator,
48
      const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
49
          config,
50
      Event::Dispatcher& dispatcher, Server::Configuration::ServerFactoryContext& server_context);
51

            
52
  using SharedPtr = std::shared_ptr<HttpAccessLoggerImpl>;
53

            
54
  /**
55
   * Log a single log entry. Batches entries and flushes periodically.
56
   */
57
  void log(opentelemetry::proto::logs::v1::LogRecord&& entry);
58

            
59
  // Http::AsyncClient::Callbacks.
60
  void onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&&) override;
61
  void onFailure(const Http::AsyncClient::Request&, Http::AsyncClient::FailureReason) override;
62
11
  void onBeforeFinalizeUpstreamSpan(Tracing::Span&, const Http::ResponseHeaderMap*) override {}
63

            
64
private:
65
  void flush();
66

            
67
  Upstream::ClusterManager& cluster_manager_;
68
  envoy::config::core::v3::HttpService http_service_;
69
  // Track active HTTP requests to be able to cancel them on destruction.
70
  Http::AsyncClientRequestTracker active_requests_;
71
  std::shared_ptr<const Http::HttpServiceHeadersApplicator> headers_applicator_;
72

            
73
  // Message structure: ExportLogsServiceRequest -> ResourceLogs -> ScopeLogs -> LogRecord.
74
  opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest message_;
75
  opentelemetry::proto::logs::v1::ScopeLogs* root_;
76

            
77
  // Batching timer.
78
  Event::TimerPtr flush_timer_;
79
  const std::chrono::milliseconds buffer_flush_interval_;
80
  const uint64_t max_buffer_size_bytes_;
81
  uint64_t approximate_message_size_bytes_ = 0;
82

            
83
  OtlpAccessLogStats stats_;
84
  uint32_t batched_log_entries_ = 0;
85
  uint32_t in_flight_log_entries_ = 0;
86
};
87

            
88
/**
89
 * Cache for HTTP access loggers. Creates one logger per unique configuration.
90
 */
91
class HttpAccessLoggerCacheImpl : public Singleton::Instance,
92
                                  public Logger::Loggable<Logger::Id::misc>,
93
                                  public std::enable_shared_from_this<HttpAccessLoggerCacheImpl> {
94
public:
95
  HttpAccessLoggerCacheImpl(Server::Configuration::ServerFactoryContext& server_context);
96

            
97
  HttpAccessLoggerImpl::SharedPtr getOrCreateLogger(
98
      const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
99
          config,
100
      const envoy::config::core::v3::HttpService& http_service,
101
      std::shared_ptr<const Http::HttpServiceHeadersApplicator> headers_applicator);
102

            
103
  std::shared_ptr<const Http::HttpServiceHeadersApplicator>
104
  getOrCreateApplicator(const envoy::config::core::v3::HttpService& http_service,
105
                        Server::Configuration::ServerFactoryContext& server_context);
106

            
107
private:
108
  struct ThreadLocalCache : public ThreadLocal::ThreadLocalObject {
109
14
    ThreadLocalCache(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
110
    Event::Dispatcher& dispatcher_;
111
    absl::flat_hash_map<std::size_t, HttpAccessLoggerImpl::SharedPtr> access_loggers_;
112
  };
113

            
114
  ThreadLocal::SlotPtr tls_slot_;
115
  Server::Configuration::ServerFactoryContext& server_context_;
116

            
117
  // Cache of headers applicators, keyed by HttpService config hash. Protected by
118
  // applicator_mutex_ because the custom deleter on the shared_ptr may run on any thread.
119
  absl::Mutex applicator_mutex_;
120
  absl::flat_hash_map<std::size_t, std::weak_ptr<const Http::HttpServiceHeadersApplicator>>
121
      applicators_ ABSL_GUARDED_BY(applicator_mutex_);
122
};
123

            
124
using HttpAccessLoggerCacheSharedPtr = std::shared_ptr<HttpAccessLoggerCacheImpl>;
125

            
126
/**
127
 * Access log instance that streams logs over HTTP.
128
 */
129
class HttpAccessLog : public Common::ImplBase {
130
public:
131
  HttpAccessLog(
132
      ::Envoy::AccessLog::FilterPtr&& filter,
133
      envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config,
134
      HttpAccessLoggerCacheSharedPtr access_logger_cache,
135
      Server::Configuration::ServerFactoryContext& server_context,
136
      const std::vector<Formatter::CommandParserPtr>& commands);
137

            
138
private:
139
  /**
140
   * Per-thread cached logger.
141
   */
142
  struct ThreadLocalLogger : public ThreadLocal::ThreadLocalObject {
143
    ThreadLocalLogger(HttpAccessLoggerImpl::SharedPtr logger);
144

            
145
    const HttpAccessLoggerImpl::SharedPtr logger_;
146
  };
147

            
148
  // Common::ImplBase
149
  void emitLog(const Formatter::Context& context, const StreamInfo::StreamInfo& info) override;
150

            
151
  const ThreadLocal::SlotPtr tls_slot_;
152
  const HttpAccessLoggerCacheSharedPtr access_logger_cache_;
153
  const envoy::config::core::v3::HttpService http_service_;
154
  std::unique_ptr<OpenTelemetryFormatter> body_formatter_;
155
  std::unique_ptr<OpenTelemetryFormatter> attributes_formatter_;
156
  const std::vector<std::string> filter_state_objects_to_log_;
157
  const std::vector<Tracing::CustomTagConstSharedPtr> custom_tags_;
158
};
159

            
160
using HttpAccessLogPtr = std::unique_ptr<HttpAccessLog>;
161

            
162
} // namespace OpenTelemetry
163
} // namespace AccessLoggers
164
} // namespace Extensions
165
} // namespace Envoy