LCOV - code coverage report
Current view: top level - source/extensions/tracers/zipkin - span_buffer.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 202 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 15 0.0 %

          Line data    Source code
       1             : #include "source/extensions/tracers/zipkin/span_buffer.h"
       2             : 
       3             : #include "envoy/config/trace/v3/zipkin.pb.h"
       4             : 
       5             : #include "source/common/protobuf/utility.h"
       6             : #include "source/extensions/tracers/zipkin/util.h"
       7             : #include "source/extensions/tracers/zipkin/zipkin_core_constants.h"
       8             : #include "source/extensions/tracers/zipkin/zipkin_json_field_names.h"
       9             : 
      10             : #include "absl/strings/str_join.h"
      11             : #include "absl/strings/str_replace.h"
      12             : 
      13             : namespace Envoy {
      14             : namespace Extensions {
      15             : namespace Tracers {
      16             : namespace Zipkin {
      17             : 
      18             : SpanBuffer::SpanBuffer(
      19             :     const envoy::config::trace::v3::ZipkinConfig::CollectorEndpointVersion& version,
      20             :     const bool shared_span_context)
      21           0 :     : serializer_{makeSerializer(version, shared_span_context)} {}
      22             : 
      23             : SpanBuffer::SpanBuffer(
      24             :     const envoy::config::trace::v3::ZipkinConfig::CollectorEndpointVersion& version,
      25             :     const bool shared_span_context, uint64_t size)
      26           0 :     : serializer_{makeSerializer(version, shared_span_context)} {
      27           0 :   allocateBuffer(size);
      28           0 : }
      29             : 
      30           0 : bool SpanBuffer::addSpan(Span&& span) {
      31           0 :   const auto& annotations = span.annotations();
      32           0 :   if (span_buffer_.size() == span_buffer_.capacity() || annotations.empty() ||
      33           0 :       annotations.end() ==
      34           0 :           std::find_if(annotations.begin(), annotations.end(), [](const auto& annotation) {
      35           0 :             return annotation.value() == CLIENT_SEND || annotation.value() == SERVER_RECV;
      36           0 :           })) {
      37             : 
      38             :     // Buffer full or invalid span.
      39           0 :     return false;
      40           0 :   }
      41             : 
      42           0 :   span_buffer_.push_back(std::move(span));
      43             : 
      44           0 :   return true;
      45           0 : }
      46             : 
      47             : SerializerPtr SpanBuffer::makeSerializer(
      48             :     const envoy::config::trace::v3::ZipkinConfig::CollectorEndpointVersion& version,
      49           0 :     const bool shared_span_context) {
      50           0 :   switch (version) {
      51           0 :     PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
      52           0 :   case envoy::config::trace::v3::ZipkinConfig::DEPRECATED_AND_UNAVAILABLE_DO_NOT_USE:
      53           0 :     throw EnvoyException(
      54           0 :         "hidden_envoy_deprecated_HTTP_JSON_V1 has been deprecated. Please use a non-default "
      55           0 :         "envoy::config::trace::v3::ZipkinConfig::CollectorEndpointVersion value.");
      56           0 :   case envoy::config::trace::v3::ZipkinConfig::HTTP_JSON:
      57           0 :     return std::make_unique<JsonV2Serializer>(shared_span_context);
      58           0 :   case envoy::config::trace::v3::ZipkinConfig::HTTP_PROTO:
      59           0 :     return std::make_unique<ProtobufSerializer>(shared_span_context);
      60           0 :   case envoy::config::trace::v3::ZipkinConfig::GRPC:
      61           0 :     PANIC("not handled");
      62           0 :   }
      63           0 :   PANIC_DUE_TO_CORRUPT_ENUM;
      64           0 : }
      65             : 
      66             : JsonV2Serializer::JsonV2Serializer(const bool shared_span_context)
      67           0 :     : shared_span_context_{shared_span_context} {}
      68             : 
      69           0 : std::string JsonV2Serializer::serialize(const std::vector<Span>& zipkin_spans) {
      70           0 :   Util::Replacements replacements;
      71           0 :   const std::string serialized_elements = absl::StrJoin(
      72           0 :       zipkin_spans, ",", [this, &replacements](std::string* out, const Span& zipkin_span) {
      73           0 :         const auto& replacement_values = replacements;
      74           0 :         absl::StrAppend(
      75           0 :             out, absl::StrJoin(
      76           0 :                      toListOfSpans(zipkin_span, replacements), ",",
      77           0 :                      [&replacement_values](std::string* element, const ProtobufWkt::Struct& span) {
      78           0 :                        absl::StatusOr<std::string> json_or_error =
      79           0 :                            MessageUtil::getJsonStringFromMessage(span, false, true);
      80           0 :                        ENVOY_BUG(json_or_error.ok(), "Failed to parse json");
      81           0 :                        if (json_or_error.ok()) {
      82           0 :                          absl::StrAppend(element, absl::StrReplaceAll(json_or_error.value(),
      83           0 :                                                                       replacement_values));
      84           0 :                        }
      85             : 
      86             :                        // The Zipkin API V2 specification mandates to store timestamp value as int64
      87             :                        // https://github.com/openzipkin/zipkin-api/blob/228fabe660f1b5d1e28eac9df41f7d1deed4a1c2/zipkin2-api.yaml#L447-L463
      88             :                        // (often translated as uint64 in some of the official implementations:
      89             :                        // https://github.com/openzipkin/zipkin-go/blob/62dc8b26c05e0e8b88eb7536eff92498e65bbfc3/model/span.go#L114,
      90             :                        // and see the discussion here:
      91             :                        // https://github.com/openzipkin/zipkin-go/pull/161#issuecomment-598558072).
      92             :                        // However, when the timestamp is stored as number value in a protobuf
      93             :                        // struct, it is stored as a double. Because of how protobuf serializes
      94             :                        // doubles, there is a possibility that the value will be rendered as a
      95             :                        // number with scientific notation as reported in:
      96             :                        // https://github.com/envoyproxy/envoy/issues/9341#issuecomment-566912973. To
      97             :                        // deal with that issue, here we do a workaround by storing the timestamp as
      98             :                        // string and keeping track of that with the corresponding integer
      99             :                        // replacements, and do the replacement here so we can meet the Zipkin API V2
     100             :                        // requirements.
     101             :                        //
     102             :                        // TODO(dio): The right fix for this is to introduce additional knob when
     103             :                        // serializing double in protobuf DoubleToBuffer function, and make it
     104             :                        // available to be controlled at caller site.
     105             :                        // https://github.com/envoyproxy/envoy/issues/10411).
     106           0 :                      }));
     107           0 :       });
     108           0 :   return absl::StrCat("[", serialized_elements, "]");
     109           0 : }
     110             : 
     111             : const std::vector<ProtobufWkt::Struct>
     112           0 : JsonV2Serializer::toListOfSpans(const Span& zipkin_span, Util::Replacements& replacements) const {
     113           0 :   std::vector<ProtobufWkt::Struct> spans;
     114           0 :   spans.reserve(zipkin_span.annotations().size());
     115             : 
     116             :   // This holds the annotation entries from logs.
     117           0 :   std::vector<ProtobufWkt::Value> annotation_entries;
     118             : 
     119           0 :   for (const auto& annotation : zipkin_span.annotations()) {
     120           0 :     ProtobufWkt::Struct span;
     121           0 :     auto* fields = span.mutable_fields();
     122           0 :     if (annotation.value() == CLIENT_SEND) {
     123           0 :       (*fields)[SPAN_KIND] = ValueUtil::stringValue(KIND_CLIENT);
     124           0 :     } else if (annotation.value() == SERVER_RECV) {
     125           0 :       if (shared_span_context_ && zipkin_span.annotations().size() > 1) {
     126           0 :         (*fields)[SPAN_SHARED] = ValueUtil::boolValue(true);
     127           0 :       }
     128           0 :       (*fields)[SPAN_KIND] = ValueUtil::stringValue(KIND_SERVER);
     129           0 :     } else {
     130           0 :       ProtobufWkt::Struct annotation_entry;
     131           0 :       auto* annotation_entry_fields = annotation_entry.mutable_fields();
     132           0 :       (*annotation_entry_fields)[ANNOTATION_VALUE] = ValueUtil::stringValue(annotation.value());
     133           0 :       (*annotation_entry_fields)[ANNOTATION_TIMESTAMP] =
     134           0 :           Util::uint64Value(annotation.timestamp(), ANNOTATION_TIMESTAMP, replacements);
     135           0 :       annotation_entries.push_back(ValueUtil::structValue(annotation_entry));
     136           0 :       continue;
     137           0 :     }
     138             : 
     139           0 :     if (annotation.isSetEndpoint()) {
     140             :       // Usually we store number to a ProtobufWkt::Struct object via ValueUtil::numberValue.
     141             :       // However, due to the possibility of rendering that to a number with scientific notation, we
     142             :       // chose to store it as a string and keeping track the corresponding replacement. For example,
     143             :       // we have 1584324295476870 if we stored it as a double value, MessageToJsonString gives
     144             :       // us 1.58432429547687e+15. Instead we store it as the string of 1584324295476870 (when it is
     145             :       // serialized: "1584324295476870"), and replace it post MessageToJsonString serialization with
     146             :       // integer (1584324295476870 without `"`), see: JsonV2Serializer::serialize.
     147           0 :       (*fields)[SPAN_TIMESTAMP] =
     148           0 :           Util::uint64Value(annotation.timestamp(), SPAN_TIMESTAMP, replacements);
     149           0 :       (*fields)[SPAN_LOCAL_ENDPOINT] =
     150           0 :           ValueUtil::structValue(toProtoEndpoint(annotation.endpoint()));
     151           0 :     }
     152             : 
     153           0 :     (*fields)[SPAN_TRACE_ID] = ValueUtil::stringValue(zipkin_span.traceIdAsHexString());
     154           0 :     if (zipkin_span.isSetParentId()) {
     155           0 :       (*fields)[SPAN_PARENT_ID] = ValueUtil::stringValue(zipkin_span.parentIdAsHexString());
     156           0 :     }
     157             : 
     158           0 :     (*fields)[SPAN_ID] = ValueUtil::stringValue(zipkin_span.idAsHexString());
     159             : 
     160           0 :     const auto& span_name = zipkin_span.name();
     161           0 :     if (!span_name.empty()) {
     162           0 :       (*fields)[SPAN_NAME] = ValueUtil::stringValue(span_name);
     163           0 :     }
     164             : 
     165           0 :     if (zipkin_span.isSetDuration()) {
     166             :       // Since SPAN_DURATION has the same data type with SPAN_TIMESTAMP, we use Util::uint64Value to
     167             :       // store it.
     168           0 :       (*fields)[SPAN_DURATION] =
     169           0 :           Util::uint64Value(zipkin_span.duration(), SPAN_DURATION, replacements);
     170           0 :     }
     171             : 
     172           0 :     const auto& binary_annotations = zipkin_span.binaryAnnotations();
     173           0 :     if (!binary_annotations.empty()) {
     174           0 :       ProtobufWkt::Struct tags;
     175           0 :       auto* tag_fields = tags.mutable_fields();
     176           0 :       for (const auto& binary_annotation : binary_annotations) {
     177           0 :         (*tag_fields)[binary_annotation.key()] = ValueUtil::stringValue(binary_annotation.value());
     178           0 :       }
     179           0 :       (*fields)[SPAN_TAGS] = ValueUtil::structValue(tags);
     180           0 :     }
     181             : 
     182           0 :     spans.push_back(std::move(span));
     183           0 :   }
     184             : 
     185             :   // Fill up annotation entries from logs.
     186           0 :   for (auto& span : spans) {
     187           0 :     auto* fields = span.mutable_fields();
     188           0 :     if (!annotation_entries.empty()) {
     189           0 :       (*fields)[ANNOTATIONS] = ValueUtil::listValue(annotation_entries);
     190           0 :     }
     191           0 :   }
     192             : 
     193           0 :   return spans;
     194           0 : }
     195             : 
     196           0 : const ProtobufWkt::Struct JsonV2Serializer::toProtoEndpoint(const Endpoint& zipkin_endpoint) const {
     197           0 :   ProtobufWkt::Struct endpoint;
     198           0 :   auto* fields = endpoint.mutable_fields();
     199             : 
     200           0 :   Network::Address::InstanceConstSharedPtr address = zipkin_endpoint.address();
     201           0 :   if (address) {
     202           0 :     if (address->ip()->version() == Network::Address::IpVersion::v4) {
     203           0 :       (*fields)[ENDPOINT_IPV4] = ValueUtil::stringValue(address->ip()->addressAsString());
     204           0 :     } else {
     205           0 :       (*fields)[ENDPOINT_IPV6] = ValueUtil::stringValue(address->ip()->addressAsString());
     206           0 :     }
     207           0 :     (*fields)[ENDPOINT_PORT] = ValueUtil::numberValue(address->ip()->port());
     208           0 :   }
     209             : 
     210           0 :   const std::string& service_name = zipkin_endpoint.serviceName();
     211           0 :   if (!service_name.empty()) {
     212           0 :     (*fields)[ENDPOINT_SERVICE_NAME] = ValueUtil::stringValue(service_name);
     213           0 :   }
     214             : 
     215           0 :   return endpoint;
     216           0 : }
     217             : 
     218             : ProtobufSerializer::ProtobufSerializer(const bool shared_span_context)
     219           0 :     : shared_span_context_{shared_span_context} {}
     220             : 
     221           0 : std::string ProtobufSerializer::serialize(const std::vector<Span>& zipkin_spans) {
     222           0 :   zipkin::proto3::ListOfSpans spans;
     223           0 :   for (const Span& zipkin_span : zipkin_spans) {
     224           0 :     spans.MergeFrom(toListOfSpans(zipkin_span));
     225           0 :   }
     226           0 :   std::string serialized;
     227           0 :   spans.SerializeToString(&serialized);
     228           0 :   return serialized;
     229           0 : }
     230             : 
     231           0 : const zipkin::proto3::ListOfSpans ProtobufSerializer::toListOfSpans(const Span& zipkin_span) const {
     232           0 :   zipkin::proto3::ListOfSpans spans;
     233             : 
     234             :   // This holds the annotation entries from logs.
     235           0 :   std::vector<Annotation> annotation_entries;
     236             : 
     237           0 :   for (const auto& annotation : zipkin_span.annotations()) {
     238           0 :     zipkin::proto3::Span span;
     239           0 :     if (annotation.value() == CLIENT_SEND) {
     240           0 :       span.set_kind(zipkin::proto3::Span::CLIENT);
     241           0 :     } else if (annotation.value() == SERVER_RECV) {
     242           0 :       span.set_shared(shared_span_context_ && zipkin_span.annotations().size() > 1);
     243           0 :       span.set_kind(zipkin::proto3::Span::SERVER);
     244           0 :     } else {
     245           0 :       annotation_entries.push_back(annotation);
     246           0 :       continue;
     247           0 :     }
     248             : 
     249           0 :     if (annotation.isSetEndpoint()) {
     250           0 :       span.set_timestamp(annotation.timestamp());
     251           0 :       span.mutable_local_endpoint()->MergeFrom(toProtoEndpoint(annotation.endpoint()));
     252           0 :     }
     253             : 
     254           0 :     span.set_trace_id(zipkin_span.traceIdAsByteString());
     255           0 :     if (zipkin_span.isSetParentId()) {
     256           0 :       span.set_parent_id(zipkin_span.parentIdAsByteString());
     257           0 :     }
     258             : 
     259           0 :     span.set_id(zipkin_span.idAsByteString());
     260           0 :     span.set_name(zipkin_span.name());
     261             : 
     262           0 :     if (zipkin_span.isSetDuration()) {
     263           0 :       span.set_duration(zipkin_span.duration());
     264           0 :     }
     265             : 
     266           0 :     auto& tags = *span.mutable_tags();
     267           0 :     for (const auto& binary_annotation : zipkin_span.binaryAnnotations()) {
     268           0 :       tags[binary_annotation.key()] = binary_annotation.value();
     269           0 :     }
     270             : 
     271           0 :     auto* mutable_span = spans.add_spans();
     272           0 :     mutable_span->MergeFrom(span);
     273           0 :   }
     274             : 
     275             :   // Fill up annotation entries from logs.
     276           0 :   for (auto& span : *spans.mutable_spans()) {
     277           0 :     for (const auto& annotation_entry : annotation_entries) {
     278           0 :       const auto entry = span.mutable_annotations()->Add();
     279           0 :       entry->set_value(annotation_entry.value());
     280           0 :       entry->set_timestamp(annotation_entry.timestamp());
     281           0 :     }
     282           0 :   }
     283             : 
     284           0 :   return spans;
     285           0 : }
     286             : 
     287             : const zipkin::proto3::Endpoint
     288           0 : ProtobufSerializer::toProtoEndpoint(const Endpoint& zipkin_endpoint) const {
     289           0 :   zipkin::proto3::Endpoint endpoint;
     290           0 :   Network::Address::InstanceConstSharedPtr address = zipkin_endpoint.address();
     291           0 :   if (address) {
     292           0 :     if (address->ip()->version() == Network::Address::IpVersion::v4) {
     293           0 :       endpoint.set_ipv4(Util::toByteString(address->ip()->ipv4()->address()));
     294           0 :     } else {
     295           0 :       endpoint.set_ipv6(Util::toByteString(address->ip()->ipv6()->address()));
     296           0 :     }
     297           0 :     endpoint.set_port(address->ip()->port());
     298           0 :   }
     299             : 
     300           0 :   const std::string& service_name = zipkin_endpoint.serviceName();
     301           0 :   if (!service_name.empty()) {
     302           0 :     endpoint.set_service_name(service_name);
     303           0 :   }
     304             : 
     305           0 :   return endpoint;
     306           0 : }
     307             : 
     308             : } // namespace Zipkin
     309             : } // namespace Tracers
     310             : } // namespace Extensions
     311             : } // namespace Envoy

Generated by: LCOV version 1.15