1
#include "source/extensions/access_loggers/open_telemetry/otlp_log_utils.h"
2

            
3
#include <chrono>
4
#include <string>
5

            
6
#include "envoy/data/accesslog/v3/accesslog.pb.h"
7

            
8
#include "source/common/common/assert.h"
9
#include "source/common/common/macros.h"
10
#include "source/common/http/header_map_impl.h"
11
#include "source/common/protobuf/utility.h"
12
#include "source/common/tracing/custom_tag_impl.h"
13
#include "source/common/tracing/http_tracer_impl.h"
14
#include "source/common/version/version.h"
15

            
16
namespace Envoy {
17
namespace Extensions {
18
namespace AccessLoggers {
19
namespace OpenTelemetry {
20

            
21
opentelemetry::proto::common::v1::KeyValue getStringKeyValue(const std::string& key,
22
145
                                                             const std::string& value) {
23
145
  opentelemetry::proto::common::v1::KeyValue keyValue;
24
145
  keyValue.set_key(key);
25
145
  keyValue.mutable_value()->set_string_value(value);
26
145
  return keyValue;
27
145
}
28

            
29
::opentelemetry::proto::common::v1::KeyValueList
30
16
packBody(const ::opentelemetry::proto::common::v1::AnyValue& body) {
31
16
  ::opentelemetry::proto::common::v1::KeyValueList output;
32
16
  auto* kv = output.add_values();
33
16
  kv->set_key(std::string(BodyKey));
34
16
  *kv->mutable_value() = body;
35
16
  return output;
36
16
}
37

            
38
::opentelemetry::proto::common::v1::AnyValue
39
18
unpackBody(const ::opentelemetry::proto::common::v1::KeyValueList& value) {
40
18
  ASSERT(value.values().size() == 1 && value.values(0).key() == BodyKey);
41
18
  return value.values(0).value();
42
18
}
43

            
44
// User-Agent header follows the OTLP specification:
45
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.52.0/specification/protocol/exporter.md#user-agent
46
27
const std::string& getOtlpUserAgentHeader() {
47
27
  CONSTRUCT_ON_FIRST_USE(std::string, "OTel-OTLP-Exporter-Envoy/" + VersionInfo::version());
48
27
}
49

            
50
void populateTraceContext(opentelemetry::proto::logs::v1::LogRecord& log_entry,
51
24
                          const std::string& trace_id_hex, const std::string& span_id_hex) {
52
  // Sets trace_id if available. OpenTelemetry trace_id is a 16-byte array, and backends
53
  // (e.g. OTel-collector) will reject requests if the length is incorrect. Some trace
54
  // providers (e.g. Zipkin) return a 64-bit hex string, which must be padded to 128-bit.
55
24
  if (trace_id_hex.size() == TraceIdHexLength) {
56
2
    *log_entry.mutable_trace_id() = absl::HexStringToBytes(trace_id_hex);
57
22
  } else if (trace_id_hex.size() == ShortTraceIdHexLength) {
58
2
    const auto trace_id = absl::StrCat(Hex::uint64ToHex(0), trace_id_hex);
59
2
    *log_entry.mutable_trace_id() = absl::HexStringToBytes(trace_id);
60
2
  }
61
  // Sets span_id if available.
62
24
  if (!span_id_hex.empty()) {
63
4
    *log_entry.mutable_span_id() = absl::HexStringToBytes(span_id_hex);
64
4
  }
65
24
}
66

            
67
const std::string& getLogName(
68
    const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
69
39
        config) {
70
  // Prefer top-level log_name, fall back to common_config.log_name (deprecated).
71
39
  if (!config.log_name().empty()) {
72
24
    return config.log_name();
73
24
  }
74
15
  return config.common_config().log_name();
75
39
}
76

            
77
const envoy::config::core::v3::GrpcService& getGrpcService(
78
    const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
79
17
        config) {
80
  // Prefer top-level grpc_service, fall back to common_config.grpc_service (deprecated).
81
17
  if (config.has_grpc_service()) {
82
11
    return config.grpc_service();
83
11
  }
84
6
  return config.common_config().grpc_service();
85
17
}
86

            
87
std::chrono::milliseconds getBufferFlushInterval(
88
    const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
89
21
        config) {
90
21
  if (config.has_buffer_flush_interval()) {
91
1
    return std::chrono::milliseconds(
92
1
        DurationUtil::durationToMilliseconds(config.buffer_flush_interval()));
93
1
  }
94
20
  return DefaultBufferFlushInterval;
95
21
}
96

            
97
uint64_t getBufferSizeBytes(
98
    const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
99
21
        config) {
100
21
  if (config.has_buffer_size_bytes()) {
101
4
    return config.buffer_size_bytes().value();
102
4
  }
103
17
  return DefaultMaxBufferSizeBytes;
104
21
}
105

            
106
std::vector<std::string> getFilterStateObjectsToLog(
107
    const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
108
23
        config) {
109
23
  return std::vector<std::string>(config.filter_state_objects_to_log().begin(),
110
23
                                  config.filter_state_objects_to_log().end());
111
23
}
112

            
113
std::vector<Tracing::CustomTagConstSharedPtr> getCustomTags(
114
    const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
115
25
        config) {
116
25
  std::vector<Tracing::CustomTagConstSharedPtr> custom_tags;
117
25
  for (const auto& custom_tag : config.custom_tags()) {
118
3
    custom_tags.push_back(Tracing::CustomTagUtility::createCustomTag(custom_tag));
119
3
  }
120
25
  return custom_tags;
121
25
}
122

            
123
void addFilterStateToAttributes(const StreamInfo::StreamInfo& stream_info,
124
                                const std::vector<std::string>& filter_state_objects_to_log,
125
25
                                opentelemetry::proto::logs::v1::LogRecord& log_entry) {
126
25
  for (const auto& key : filter_state_objects_to_log) {
127
4
    const StreamInfo::FilterState* filter_state = &stream_info.filterState();
128
    // Check downstream filter state first, then upstream.
129
4
    if (auto state = filter_state->getDataReadOnlyGeneric(key); state != nullptr) {
130
2
      ProtobufTypes::MessagePtr serialized_proto = state->serializeAsProto();
131
2
      if (serialized_proto != nullptr) {
132
2
        auto json_or_error = MessageUtil::getJsonStringFromMessage(*serialized_proto);
133
2
        if (json_or_error.ok()) {
134
2
          auto* attr = log_entry.add_attributes();
135
2
          attr->set_key(key);
136
2
          attr->mutable_value()->set_string_value(json_or_error.value());
137
2
        }
138
2
      }
139
2
    } else if (stream_info.upstreamInfo().has_value() &&
140
2
               stream_info.upstreamInfo()->upstreamFilterState() != nullptr) {
141
1
      if (auto upstream_state =
142
1
              stream_info.upstreamInfo()->upstreamFilterState()->getDataReadOnlyGeneric(key);
143
1
          upstream_state != nullptr) {
144
1
        ProtobufTypes::MessagePtr serialized_proto = upstream_state->serializeAsProto();
145
1
        if (serialized_proto != nullptr) {
146
1
          auto json_or_error = MessageUtil::getJsonStringFromMessage(*serialized_proto);
147
1
          if (json_or_error.ok()) {
148
1
            auto* attr = log_entry.add_attributes();
149
1
            attr->set_key(key);
150
1
            attr->mutable_value()->set_string_value(json_or_error.value());
151
1
          }
152
1
        }
153
1
      }
154
1
    }
155
4
  }
156
25
}
157

            
158
void addCustomTagsToAttributes(const std::vector<Tracing::CustomTagConstSharedPtr>& custom_tags,
159
                               const Formatter::Context& context,
160
                               const StreamInfo::StreamInfo& stream_info,
161
23
                               opentelemetry::proto::logs::v1::LogRecord& log_entry) {
162
23
  if (custom_tags.empty()) {
163
21
    return;
164
21
  }
165

            
166
  // Create empty header map if request headers not available.
167
2
  const Http::RequestHeaderMap* headers_ptr =
168
2
      context.requestHeaders().has_value()
169
2
          ? &static_cast<const Http::RequestHeaderMap&>(context.requestHeaders().value())
170
2
          : Http::StaticEmptyHeaders::get().request_headers.get();
171
2
  const Http::RequestHeaderMap& headers = *headers_ptr;
172

            
173
2
  Tracing::ReadOnlyHttpTraceContext trace_context(headers);
174
2
  Tracing::CustomTagContext tag_context{trace_context, stream_info, context};
175

            
176
  // Use a temporary AccessLogCommon to extract custom tag values via applyLog.
177
2
  envoy::data::accesslog::v3::AccessLogCommon temp_log;
178
2
  for (const auto& custom_tag : custom_tags) {
179
2
    custom_tag->applyLog(temp_log, tag_context);
180
2
  }
181

            
182
  // Copy custom tags to OTLP attributes.
183
2
  for (const auto& [key, value] : temp_log.custom_tags()) {
184
2
    auto* attr = log_entry.add_attributes();
185
2
    attr->set_key(key);
186
2
    attr->mutable_value()->set_string_value(value);
187
2
  }
188
2
}
189

            
190
opentelemetry::proto::logs::v1::ScopeLogs* initOtlpMessageRoot(
191
    opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest& message,
192
    const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
193
        config,
194
40
    const LocalInfo::LocalInfo& local_info) {
195
40
  auto* resource_logs = message.add_resource_logs();
196
40
  auto* root = resource_logs->add_scope_logs();
197
40
  auto* resource = resource_logs->mutable_resource();
198
40
  if (!config.disable_builtin_labels()) {
199
36
    *resource->add_attributes() = getStringKeyValue("log_name", getLogName(config));
200
36
    *resource->add_attributes() = getStringKeyValue("zone_name", local_info.zoneName());
201
36
    *resource->add_attributes() = getStringKeyValue("cluster_name", local_info.clusterName());
202
36
    *resource->add_attributes() = getStringKeyValue("node_name", local_info.nodeName());
203
36
  }
204
40
  for (const auto& pair : config.resource_attributes().values()) {
205
9
    *resource->add_attributes() = pair;
206
9
  }
207
40
  return root;
208
40
}
209

            
210
} // namespace OpenTelemetry
211
} // namespace AccessLoggers
212
} // namespace Extensions
213
} // namespace Envoy