1
#include "source/extensions/access_loggers/open_telemetry/http_access_log_impl.h"
2

            
3
#include <chrono>
4
#include <memory>
5
#include <string>
6
#include <vector>
7

            
8
#include "envoy/config/core/v3/base.pb.h"
9
#include "envoy/data/accesslog/v3/accesslog.pb.h"
10
#include "envoy/extensions/access_loggers/open_telemetry/v3/logs_service.pb.h"
11

            
12
#include "source/common/common/enum_to_int.h"
13
#include "source/common/common/logger.h"
14
#include "source/common/config/utility.h"
15
#include "source/common/formatter/substitution_formatter.h"
16
#include "source/common/http/headers.h"
17
#include "source/common/network/utility.h"
18
#include "source/common/protobuf/message_validator_impl.h"
19
#include "source/common/protobuf/protobuf.h"
20
#include "source/common/protobuf/utility.h"
21
#include "source/common/stream_info/utility.h"
22
#include "source/extensions/access_loggers/open_telemetry/otlp_log_utils.h"
23
#include "source/extensions/access_loggers/open_telemetry/substitution_formatter.h"
24

            
25
namespace Envoy {
26
namespace Extensions {
27
namespace AccessLoggers {
28
namespace OpenTelemetry {
29

            
30
HttpAccessLoggerImpl::HttpAccessLoggerImpl(
31
    Upstream::ClusterManager& cluster_manager,
32
    const envoy::config::core::v3::HttpService& http_service,
33
    const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
34
        config,
35
    Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope)
36
19
    : cluster_manager_(cluster_manager), http_service_(http_service),
37
19
      buffer_flush_interval_(getBufferFlushInterval(config)),
38
19
      max_buffer_size_bytes_(getBufferSizeBytes(config)),
39
19
      stats_({ALL_OTLP_ACCESS_LOG_STATS(POOL_COUNTER_PREFIX(
40
19
          scope, absl::StrCat(OtlpAccessLogStatsPrefix, config.stat_prefix())))}) {
41

            
42
  // Prepares and stores headers to be used later on each export request.
43
19
  for (const auto& header_value_option : http_service_.request_headers_to_add()) {
44
4
    parsed_headers_to_add_.push_back({Http::LowerCaseString(header_value_option.header().key()),
45
4
                                      header_value_option.header().value()});
46
4
  }
47

            
48
19
  root_ = initOtlpMessageRoot(message_, config, local_info);
49

            
50
  // Sets up the flush timer.
51
31
  flush_timer_ = dispatcher.createTimer([this]() {
52
26
    flush();
53
26
    flush_timer_->enableTimer(buffer_flush_interval_);
54
26
  });
55
19
  flush_timer_->enableTimer(buffer_flush_interval_);
56
19
}
57

            
58
14
void HttpAccessLoggerImpl::log(opentelemetry::proto::logs::v1::LogRecord&& entry) {
59
14
  approximate_message_size_bytes_ += entry.ByteSizeLong();
60
14
  batched_log_entries_++;
61
14
  root_->mutable_log_records()->Add(std::move(entry));
62

            
63
14
  if (approximate_message_size_bytes_ >= max_buffer_size_bytes_) {
64
1
    flush();
65
1
  }
66
14
}
67

            
68
27
void HttpAccessLoggerImpl::flush() {
69
27
  if (root_->log_records().empty()) {
70
13
    return;
71
13
  }
72

            
73
14
  std::string request_body;
74
14
  const auto ok = message_.SerializeToString(&request_body);
75
14
  if (!ok) {
76
    ENVOY_LOG(warn, "Error while serializing the binary proto ExportLogsServiceRequest.");
77
    root_->clear_log_records();
78
    approximate_message_size_bytes_ = 0;
79
    return;
80
  }
81

            
82
14
  const auto thread_local_cluster =
83
14
      cluster_manager_.getThreadLocalCluster(http_service_.http_uri().cluster());
84
14
  if (thread_local_cluster == nullptr) {
85
1
    ENVOY_LOG(error, "OTLP HTTP access log exporter failed: [cluster = {}] is not configured",
86
1
              http_service_.http_uri().cluster());
87
1
    root_->clear_log_records();
88
1
    approximate_message_size_bytes_ = 0;
89
1
    return;
90
1
  }
91

            
92
13
  Http::RequestMessagePtr message = Http::Utility::prepareHeaders(http_service_.http_uri());
93

            
94
  // The request follows the OTLP HTTP specification:
95
  // https://github.com/open-telemetry/opentelemetry-proto/blob/v1.9.0/docs/specification.md#otlphttp.
96
13
  message->headers().setReferenceMethod(Http::Headers::get().MethodValues.Post);
97
13
  message->headers().setReferenceContentType(Http::Headers::get().ContentTypeValues.Protobuf);
98

            
99
  // User-Agent header follows the OTLP specification.
100
13
  message->headers().setReferenceUserAgent(getOtlpUserAgentHeader());
101

            
102
  // Adds all custom headers to the request.
103
13
  for (const auto& header_pair : parsed_headers_to_add_) {
104
2
    message->headers().setReference(header_pair.first, header_pair.second);
105
2
  }
106
13
  message->body().add(request_body);
107

            
108
13
  const auto options =
109
13
      Http::AsyncClient::RequestOptions()
110
13
          .setTimeout(std::chrono::milliseconds(
111
13
              DurationUtil::durationToMilliseconds(http_service_.http_uri().timeout())))
112
13
          .setDiscardResponseBody(true);
113

            
114
13
  Http::AsyncClient::Request* in_flight_request =
115
13
      thread_local_cluster->httpAsyncClient().send(std::move(message), *this, options);
116

            
117
13
  if (in_flight_request != nullptr) {
118
12
    active_requests_.add(*in_flight_request);
119
12
    in_flight_log_entries_ = batched_log_entries_;
120
12
  } else {
121
1
    stats_.logs_dropped_.add(batched_log_entries_);
122
1
  }
123

            
124
13
  root_->clear_log_records();
125
13
  approximate_message_size_bytes_ = 0;
126
13
  batched_log_entries_ = 0;
127
13
}
128

            
129
void HttpAccessLoggerImpl::onSuccess(const Http::AsyncClient::Request& request,
130
11
                                     Http::ResponseMessagePtr&& http_response) {
131
11
  active_requests_.remove(request);
132
11
  const auto response_code = Http::Utility::getResponseStatus(http_response->headers());
133
11
  if (response_code == enumToInt(Http::Code::OK)) {
134
10
    stats_.logs_written_.add(in_flight_log_entries_);
135
10
  } else {
136
1
    ENVOY_LOG(error,
137
1
              "OTLP HTTP access log exporter received a non-success status code: {} while "
138
1
              "exporting the OTLP message",
139
1
              response_code);
140
1
    stats_.logs_dropped_.add(in_flight_log_entries_);
141
1
  }
142
11
  in_flight_log_entries_ = 0;
143
11
}
144

            
145
void HttpAccessLoggerImpl::onFailure(const Http::AsyncClient::Request& request,
146
1
                                     Http::AsyncClient::FailureReason reason) {
147
1
  active_requests_.remove(request);
148
1
  ENVOY_LOG(warn, "OTLP HTTP access log export request failed. Failure reason: {}",
149
1
            enumToInt(reason));
150
1
  stats_.logs_dropped_.add(in_flight_log_entries_);
151
1
  in_flight_log_entries_ = 0;
152
1
}
153

            
154
HttpAccessLoggerCacheImpl::HttpAccessLoggerCacheImpl(Upstream::ClusterManager& cluster_manager,
155
                                                     Stats::Scope& scope,
156
                                                     ThreadLocal::SlotAllocator& tls,
157
                                                     const LocalInfo::LocalInfo& local_info)
158
8
    : cluster_manager_(cluster_manager), scope_(scope), tls_slot_(tls.allocateSlot()),
159
8
      local_info_(local_info) {
160
8
  tls_slot_->set(
161
12
      [](Event::Dispatcher& dispatcher) { return std::make_shared<ThreadLocalCache>(dispatcher); });
162
8
}
163

            
164
HttpAccessLoggerImpl::SharedPtr HttpAccessLoggerCacheImpl::getOrCreateLogger(
165
    const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
166
        config,
167
17
    const envoy::config::core::v3::HttpService& http_service) {
168
17
  auto& cache = tls_slot_->getTyped<ThreadLocalCache>();
169
17
  const std::size_t config_hash = MessageUtil::hash(config) ^ MessageUtil::hash(http_service);
170

            
171
17
  const auto it = cache.access_loggers_.find(config_hash);
172
17
  if (it != cache.access_loggers_.end()) {
173
5
    return it->second;
174
5
  }
175

            
176
12
  auto logger = std::make_shared<HttpAccessLoggerImpl>(cluster_manager_, http_service, config,
177
12
                                                       cache.dispatcher_, local_info_, scope_);
178
12
  cache.access_loggers_.emplace(config_hash, logger);
179
12
  return logger;
180
17
}
181

            
182
HttpAccessLog::ThreadLocalLogger::ThreadLocalLogger(HttpAccessLoggerImpl::SharedPtr logger)
183
15
    : logger_(std::move(logger)) {}
184

            
185
HttpAccessLog::HttpAccessLog(
186
    ::Envoy::AccessLog::FilterPtr&& filter,
187
    envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config,
188
    ThreadLocal::SlotAllocator& tls, HttpAccessLoggerCacheSharedPtr access_logger_cache,
189
    const std::vector<Formatter::CommandParserPtr>& commands)
190
9
    : Common::ImplBase(std::move(filter)), tls_slot_(tls.allocateSlot()),
191
9
      access_logger_cache_(std::move(access_logger_cache)), http_service_(config.http_service()),
192
9
      filter_state_objects_to_log_(getFilterStateObjectsToLog(config)),
193
9
      custom_tags_(getCustomTags(config)) {
194

            
195
15
  tls_slot_->set([this, config](Event::Dispatcher&) {
196
15
    return std::make_shared<ThreadLocalLogger>(
197
15
        access_logger_cache_->getOrCreateLogger(config, http_service_));
198
15
  });
199

            
200
  // Packs the body "AnyValue" to a "KeyValueList" only if it's not empty. Otherwise the
201
  // formatter would fail to parse it.
202
9
  if (config.body().value_case() != ::opentelemetry::proto::common::v1::AnyValue::VALUE_NOT_SET) {
203
8
    body_formatter_ = std::make_unique<OpenTelemetryFormatter>(packBody(config.body()), commands);
204
8
  }
205
9
  attributes_formatter_ = std::make_unique<OpenTelemetryFormatter>(config.attributes(), commands);
206
9
}
207

            
208
void HttpAccessLog::emitLog(const Formatter::Context& log_context,
209
8
                            const StreamInfo::StreamInfo& stream_info) {
210
8
  opentelemetry::proto::logs::v1::LogRecord log_entry;
211
8
  log_entry.set_time_unix_nano(std::chrono::duration_cast<std::chrono::nanoseconds>(
212
8
                                   stream_info.startTime().time_since_epoch())
213
8
                                   .count());
214

            
215
  // Unpacks the body "KeyValueList" to "AnyValue".
216
8
  if (body_formatter_) {
217
8
    const auto formatted_body = unpackBody(body_formatter_->format(log_context, stream_info));
218
8
    *log_entry.mutable_body() = formatted_body;
219
8
  }
220
8
  const auto formatted_attributes = attributes_formatter_->format(log_context, stream_info);
221
8
  *log_entry.mutable_attributes() = formatted_attributes.values();
222

            
223
  // Sets trace context (trace_id, span_id) if available.
224
8
  const std::string trace_id_hex =
225
8
      log_context.activeSpan().has_value() ? log_context.activeSpan()->getTraceId() : "";
226
8
  const std::string span_id_hex =
227
8
      log_context.activeSpan().has_value() ? log_context.activeSpan()->getSpanId() : "";
228
8
  populateTraceContext(log_entry, trace_id_hex, span_id_hex);
229

            
230
8
  addFilterStateToAttributes(stream_info, filter_state_objects_to_log_, log_entry);
231
8
  addCustomTagsToAttributes(custom_tags_, log_context, stream_info, log_entry);
232

            
233
8
  tls_slot_->getTyped<ThreadLocalLogger>().logger_->log(std::move(log_entry));
234
8
}
235

            
236
} // namespace OpenTelemetry
237
} // namespace AccessLoggers
238
} // namespace Extensions
239
} // namespace Envoy