1
#include "source/extensions/access_loggers/open_telemetry/http_access_log_impl.h"
2

            
3
#include <memory>
4
#include <string>
5
#include <vector>
6

            
7
#include "envoy/config/core/v3/base.pb.h"
8
#include "envoy/data/accesslog/v3/accesslog.pb.h"
9
#include "envoy/extensions/access_loggers/open_telemetry/v3/logs_service.pb.h"
10

            
11
#include "source/common/common/enum_to_int.h"
12
#include "source/common/common/logger.h"
13
#include "source/common/config/utility.h"
14
#include "source/common/http/headers.h"
15
#include "source/common/protobuf/utility.h"
16
#include "source/extensions/access_loggers/open_telemetry/otlp_log_utils.h"
17
#include "source/extensions/access_loggers/open_telemetry/substitution_formatter.h"
18

            
19
namespace Envoy {
20
namespace Extensions {
21
namespace AccessLoggers {
22
namespace OpenTelemetry {
23

            
24
HttpAccessLoggerImpl::HttpAccessLoggerImpl(
25
    Upstream::ClusterManager& cluster_manager,
26
    const envoy::config::core::v3::HttpService& http_service,
27
    std::shared_ptr<const Http::HttpServiceHeadersApplicator> headers_applicator,
28
    const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
29
        config,
30
    Event::Dispatcher& dispatcher, Server::Configuration::ServerFactoryContext& server_context)
31
21
    : cluster_manager_(cluster_manager), http_service_(http_service),
32
21
      headers_applicator_(std::move(headers_applicator)),
33
21
      buffer_flush_interval_(getBufferFlushInterval(config)),
34
21
      max_buffer_size_bytes_(getBufferSizeBytes(config)),
35
21
      stats_({ALL_OTLP_ACCESS_LOG_STATS(
36
21
          POOL_COUNTER_PREFIX(server_context.serverScope(),
37
21
                              absl::StrCat(OtlpAccessLogStatsPrefix, config.stat_prefix())))}) {
38

            
39
21
  root_ = initOtlpMessageRoot(message_, config, server_context.localInfo());
40

            
41
  // Sets up the flush timer.
42
35
  flush_timer_ = dispatcher.createTimer([this]() {
43
30
    flush();
44
30
    flush_timer_->enableTimer(buffer_flush_interval_);
45
30
  });
46
21
  flush_timer_->enableTimer(buffer_flush_interval_);
47
21
}
48

            
49
16
void HttpAccessLoggerImpl::log(opentelemetry::proto::logs::v1::LogRecord&& entry) {
50
16
  approximate_message_size_bytes_ += entry.ByteSizeLong();
51
16
  batched_log_entries_++;
52
16
  root_->mutable_log_records()->Add(std::move(entry));
53

            
54
16
  if (approximate_message_size_bytes_ >= max_buffer_size_bytes_) {
55
3
    flush();
56
3
  }
57
16
}
58

            
59
33
void HttpAccessLoggerImpl::flush() {
60
33
  if (root_->log_records().empty()) {
61
17
    return;
62
17
  }
63

            
64
16
  std::string request_body;
65
16
  const auto ok = message_.SerializeToString(&request_body);
66
16
  if (!ok) {
67
    ENVOY_LOG(warn, "Error while serializing the binary proto ExportLogsServiceRequest.");
68
    root_->clear_log_records();
69
    approximate_message_size_bytes_ = 0;
70
    return;
71
  }
72

            
73
16
  const auto thread_local_cluster =
74
16
      cluster_manager_.getThreadLocalCluster(http_service_.http_uri().cluster());
75
16
  if (thread_local_cluster == nullptr) {
76
1
    ENVOY_LOG(error, "OTLP HTTP access log exporter failed: [cluster = {}] is not configured",
77
1
              http_service_.http_uri().cluster());
78
1
    root_->clear_log_records();
79
1
    approximate_message_size_bytes_ = 0;
80
1
    return;
81
1
  }
82

            
83
15
  Http::RequestMessagePtr message = Http::Utility::prepareHeaders(http_service_.http_uri());
84

            
85
  // The request follows the OTLP HTTP specification:
86
  // https://github.com/open-telemetry/opentelemetry-proto/blob/v1.9.0/docs/specification.md#otlphttp.
87
15
  message->headers().setReferenceMethod(Http::Headers::get().MethodValues.Post);
88
15
  message->headers().setReferenceContentType(Http::Headers::get().ContentTypeValues.Protobuf);
89

            
90
  // User-Agent header follows the OTLP specification.
91
15
  message->headers().setReferenceUserAgent(getOtlpUserAgentHeader());
92

            
93
  // Adds all custom headers to the request.
94
15
  headers_applicator_->apply(message->headers());
95

            
96
15
  message->body().add(request_body);
97

            
98
15
  const auto options =
99
15
      Http::AsyncClient::RequestOptions()
100
15
          .setTimeout(std::chrono::milliseconds(
101
15
              DurationUtil::durationToMilliseconds(http_service_.http_uri().timeout())))
102
15
          .setDiscardResponseBody(true);
103

            
104
15
  Http::AsyncClient::Request* in_flight_request =
105
15
      thread_local_cluster->httpAsyncClient().send(std::move(message), *this, options);
106

            
107
15
  if (in_flight_request != nullptr) {
108
14
    active_requests_.add(*in_flight_request);
109
14
    in_flight_log_entries_ = batched_log_entries_;
110
14
  } else {
111
1
    stats_.logs_dropped_.add(batched_log_entries_);
112
1
  }
113

            
114
15
  root_->clear_log_records();
115
15
  approximate_message_size_bytes_ = 0;
116
15
  batched_log_entries_ = 0;
117
15
}
118

            
119
void HttpAccessLoggerImpl::onSuccess(const Http::AsyncClient::Request& request,
120
13
                                     Http::ResponseMessagePtr&& http_response) {
121
13
  active_requests_.remove(request);
122
13
  const auto response_code = Http::Utility::getResponseStatus(http_response->headers());
123
13
  if (response_code == enumToInt(Http::Code::OK)) {
124
12
    stats_.logs_written_.add(in_flight_log_entries_);
125
12
  } else {
126
1
    ENVOY_LOG(error,
127
1
              "OTLP HTTP access log exporter received a non-success status code: {} while "
128
1
              "exporting the OTLP message",
129
1
              response_code);
130
1
    stats_.logs_dropped_.add(in_flight_log_entries_);
131
1
  }
132
13
  in_flight_log_entries_ = 0;
133
13
}
134

            
135
void HttpAccessLoggerImpl::onFailure(const Http::AsyncClient::Request& request,
136
1
                                     Http::AsyncClient::FailureReason reason) {
137
1
  active_requests_.remove(request);
138
1
  ENVOY_LOG(warn, "OTLP HTTP access log export request failed. Failure reason: {}",
139
1
            enumToInt(reason));
140
1
  stats_.logs_dropped_.add(in_flight_log_entries_);
141
1
  in_flight_log_entries_ = 0;
142
1
}
143

            
144
HttpAccessLoggerCacheImpl::HttpAccessLoggerCacheImpl(
145
    Server::Configuration::ServerFactoryContext& server_context)
146
9
    : tls_slot_(server_context.threadLocal().allocateSlot()), server_context_(server_context) {
147
9
  tls_slot_->set(
148
14
      [](Event::Dispatcher& dispatcher) { return std::make_shared<ThreadLocalCache>(dispatcher); });
149
9
}
150

            
151
HttpAccessLoggerImpl::SharedPtr HttpAccessLoggerCacheImpl::getOrCreateLogger(
152
    const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
153
        config,
154
    const envoy::config::core::v3::HttpService& http_service,
155
19
    std::shared_ptr<const Http::HttpServiceHeadersApplicator> headers_applicator) {
156
19
  auto& cache = tls_slot_->getTyped<ThreadLocalCache>();
157
19
  const std::size_t config_hash = MessageUtil::hash(config) ^ MessageUtil::hash(http_service);
158

            
159
19
  const auto it = cache.access_loggers_.find(config_hash);
160
19
  if (it != cache.access_loggers_.end()) {
161
5
    return it->second;
162
5
  }
163

            
164
14
  auto logger = std::make_shared<HttpAccessLoggerImpl>(server_context_.clusterManager(),
165
14
                                                       http_service, std::move(headers_applicator),
166
14
                                                       config, cache.dispatcher_, server_context_);
167
14
  cache.access_loggers_.emplace(config_hash, logger);
168
14
  return logger;
169
19
}
170

            
171
std::shared_ptr<const Http::HttpServiceHeadersApplicator>
172
HttpAccessLoggerCacheImpl::getOrCreateApplicator(
173
    const envoy::config::core::v3::HttpService& http_service,
174
10
    Server::Configuration::ServerFactoryContext& server_context) {
175
10
  ASSERT_IS_MAIN_OR_TEST_THREAD();
176
10
  const std::size_t config_hash = MessageUtil::hash(http_service);
177

            
178
10
  absl::MutexLock lock(&applicator_mutex_);
179

            
180
10
  const auto it = applicators_.find(config_hash);
181
10
  if (it != applicators_.end()) {
182
2
    auto existing = it->second.lock();
183
2
    if (existing) {
184
2
      return existing;
185
2
    }
186
2
  }
187

            
188
8
  absl::Status creation_status = absl::OkStatus();
189
  // Capture shared_from_this() in the deleter so the mutex and map remain alive.
190
8
  std::shared_ptr<HttpAccessLoggerCacheImpl> self = shared_from_this();
191
8
  std::shared_ptr<const Http::HttpServiceHeadersApplicator> applicator(
192
8
      new Http::HttpServiceHeadersApplicator(http_service, server_context, creation_status),
193
8
      [self, config_hash](const Http::HttpServiceHeadersApplicator* ptr) {
194
8
        {
195
8
          absl::MutexLock lock(&self->applicator_mutex_);
196
8
          const auto it = self->applicators_.find(config_hash);
197
          // Check for expired in case a new entry was added at nearly the same time because the
198
          // check for an existing entry failed to `lock()`.
199
8
          if (it != self->applicators_.end() && it->second.expired()) {
200
8
            self->applicators_.erase(it);
201
8
          }
202
8
        }
203
8
        delete ptr;
204
8
      });
205
8
  THROW_IF_NOT_OK_REF(creation_status);
206
8
  applicators_.insert_or_assign(config_hash, applicator);
207
8
  return applicator;
208
8
}
209

            
210
HttpAccessLog::ThreadLocalLogger::ThreadLocalLogger(HttpAccessLoggerImpl::SharedPtr logger)
211
17
    : logger_(std::move(logger)) {}
212

            
213
HttpAccessLog::HttpAccessLog(
214
    ::Envoy::AccessLog::FilterPtr&& filter,
215
    envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config,
216
    HttpAccessLoggerCacheSharedPtr access_logger_cache,
217
    Server::Configuration::ServerFactoryContext& server_context,
218
    const std::vector<Formatter::CommandParserPtr>& commands)
219
10
    : Common::ImplBase(std::move(filter)), tls_slot_(server_context.threadLocal().allocateSlot()),
220
10
      access_logger_cache_(std::move(access_logger_cache)), http_service_(config.http_service()),
221
10
      filter_state_objects_to_log_(getFilterStateObjectsToLog(config)),
222
10
      custom_tags_(getCustomTags(config)) {
223

            
224
  // Get or create the headers applicator on the main thread. This is required because
225
  // DataSourceProvider (used by FILE_CONTENT formatter) allocates TLS slots,
226
  // which can only happen on the main thread.
227
10
  std::shared_ptr<const Http::HttpServiceHeadersApplicator> headers_applicator =
228
10
      access_logger_cache_->getOrCreateApplicator(http_service_, server_context);
229

            
230
17
  tls_slot_->set([this, config, headers_applicator](Event::Dispatcher&) {
231
17
    return std::make_shared<ThreadLocalLogger>(
232
17
        access_logger_cache_->getOrCreateLogger(config, http_service_, headers_applicator));
233
17
  });
234

            
235
  // Packs the body "AnyValue" to a "KeyValueList" only if it's not empty. Otherwise the
236
  // formatter would fail to parse it.
237
10
  if (config.body().value_case() != ::opentelemetry::proto::common::v1::AnyValue::VALUE_NOT_SET) {
238
8
    body_formatter_ = std::make_unique<OpenTelemetryFormatter>(packBody(config.body()), commands);
239
8
  }
240
10
  attributes_formatter_ = std::make_unique<OpenTelemetryFormatter>(config.attributes(), commands);
241
10
}
242

            
243
void HttpAccessLog::emitLog(const Formatter::Context& log_context,
244
10
                            const StreamInfo::StreamInfo& stream_info) {
245
10
  opentelemetry::proto::logs::v1::LogRecord log_entry;
246
10
  log_entry.set_time_unix_nano(std::chrono::duration_cast<std::chrono::nanoseconds>(
247
10
                                   stream_info.startTime().time_since_epoch())
248
10
                                   .count());
249

            
250
  // Unpacks the body "KeyValueList" to "AnyValue".
251
10
  if (body_formatter_) {
252
8
    const auto formatted_body = unpackBody(body_formatter_->format(log_context, stream_info));
253
8
    *log_entry.mutable_body() = formatted_body;
254
8
  }
255
10
  const auto formatted_attributes = attributes_formatter_->format(log_context, stream_info);
256
10
  *log_entry.mutable_attributes() = formatted_attributes.values();
257

            
258
  // Sets trace context (trace_id, span_id) if available.
259
10
  const std::string trace_id_hex =
260
10
      log_context.activeSpan().has_value() ? log_context.activeSpan()->getTraceId() : "";
261
10
  const std::string span_id_hex =
262
10
      log_context.activeSpan().has_value() ? log_context.activeSpan()->getSpanId() : "";
263
10
  populateTraceContext(log_entry, trace_id_hex, span_id_hex);
264

            
265
10
  addFilterStateToAttributes(stream_info, filter_state_objects_to_log_, log_entry);
266
10
  addCustomTagsToAttributes(custom_tags_, log_context, stream_info, log_entry);
267

            
268
10
  tls_slot_->getTyped<ThreadLocalLogger>().logger_->log(std::move(log_entry));
269
10
}
270

            
271
} // namespace OpenTelemetry
272
} // namespace AccessLoggers
273
} // namespace Extensions
274
} // namespace Envoy