1
#include "source/extensions/access_loggers/open_telemetry/access_log_impl.h"
2

            
3
#include <chrono>
4

            
5
#include "envoy/config/core/v3/base.pb.h"
6
#include "envoy/data/accesslog/v3/accesslog.pb.h"
7
#include "envoy/extensions/access_loggers/grpc/v3/als.pb.h"
8
#include "envoy/extensions/access_loggers/open_telemetry/v3/logs_service.pb.h"
9

            
10
#include "source/common/config/utility.h"
11
#include "source/common/formatter/substitution_formatter.h"
12
#include "source/common/http/headers.h"
13
#include "source/common/network/utility.h"
14
#include "source/common/protobuf/message_validator_impl.h"
15
#include "source/common/protobuf/utility.h"
16
#include "source/common/stream_info/utility.h"
17
#include "source/extensions/access_loggers/open_telemetry/otlp_log_utils.h"
18
#include "source/extensions/access_loggers/open_telemetry/substitution_formatter.h"
19

            
20
#include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h"
21
#include "opentelemetry/proto/common/v1/common.pb.h"
22
#include "opentelemetry/proto/logs/v1/logs.pb.h"
23
#include "opentelemetry/proto/resource/v1/resource.pb.h"
24

            
25
namespace Envoy {
26
namespace Extensions {
27
namespace AccessLoggers {
28
namespace OpenTelemetry {
29

            
30
Http::RegisterCustomInlineHeader<Http::CustomInlineHeaderRegistry::Type::RequestHeaders>
31
    referer_handle(Http::CustomHeaders::get().Referer);
32

            
33
AccessLog::ThreadLocalLogger::ThreadLocalLogger(GrpcAccessLoggerSharedPtr logger)
34
18
    : logger_(std::move(logger)) {}
35

            
36
AccessLog::AccessLog(
37
    ::Envoy::AccessLog::FilterPtr&& filter,
38
    envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config,
39
    ThreadLocal::SlotAllocator& tls, GrpcAccessLoggerCacheSharedPtr access_logger_cache,
40
    const std::vector<Formatter::CommandParserPtr>& commands)
41
12
    : Common::ImplBase(std::move(filter)), tls_slot_(tls.allocateSlot()),
42
12
      access_logger_cache_(std::move(access_logger_cache)),
43
12
      filter_state_objects_to_log_(getFilterStateObjectsToLog(config)),
44
12
      custom_tags_(getCustomTags(config)) {
45

            
46
12
  THROW_IF_NOT_OK(Envoy::Config::Utility::checkTransportVersion(config.common_config()));
47
18
  tls_slot_->set([this, config](Event::Dispatcher&) {
48
18
    return std::make_shared<ThreadLocalLogger>(
49
18
        access_logger_cache_->getOrCreateLogger(config, Common::GrpcAccessLoggerType::HTTP));
50
18
  });
51

            
52
  // Packing the body "AnyValue" to a "KeyValueList" only if it's not empty, otherwise the
53
  // formatter would fail to parse it.
54
12
  if (config.body().value_case() != ::opentelemetry::proto::common::v1::AnyValue::VALUE_NOT_SET) {
55
7
    body_formatter_ = std::make_unique<OpenTelemetryFormatter>(packBody(config.body()), commands);
56
7
  }
57
12
  attributes_formatter_ = std::make_unique<OpenTelemetryFormatter>(config.attributes(), commands);
58
12
}
59

            
60
void AccessLog::emitLog(const Formatter::Context& log_context,
61
12
                        const StreamInfo::StreamInfo& stream_info) {
62
12
  opentelemetry::proto::logs::v1::LogRecord log_entry;
63
12
  log_entry.set_time_unix_nano(std::chrono::duration_cast<std::chrono::nanoseconds>(
64
12
                                   stream_info.startTime().time_since_epoch())
65
12
                                   .count());
66

            
67
  // Unpacks the body "KeyValueList" to "AnyValue".
68
12
  if (body_formatter_) {
69
9
    const auto formatted_body = unpackBody(body_formatter_->format(log_context, stream_info));
70
9
    *log_entry.mutable_body() = formatted_body;
71
9
  }
72
12
  const auto formatted_attributes = attributes_formatter_->format(log_context, stream_info);
73
12
  *log_entry.mutable_attributes() = formatted_attributes.values();
74

            
75
  // Sets trace context (trace_id, span_id) if available.
76
12
  const std::string trace_id_hex =
77
12
      log_context.activeSpan().has_value() ? log_context.activeSpan()->getTraceId() : "";
78
12
  const std::string span_id_hex =
79
12
      log_context.activeSpan().has_value() ? log_context.activeSpan()->getSpanId() : "";
80
12
  populateTraceContext(log_entry, trace_id_hex, span_id_hex);
81

            
82
12
  addFilterStateToAttributes(stream_info, filter_state_objects_to_log_, log_entry);
83
12
  addCustomTagsToAttributes(custom_tags_, log_context, stream_info, log_entry);
84

            
85
12
  tls_slot_->getTyped<ThreadLocalLogger>().logger_->log(std::move(log_entry));
86
12
}
87

            
88
} // namespace OpenTelemetry
89
} // namespace AccessLoggers
90
} // namespace Extensions
91
} // namespace Envoy