Line data Source code
1 : #include "source/extensions/tracers/zipkin/span_buffer.h" 2 : 3 : #include "envoy/config/trace/v3/zipkin.pb.h" 4 : 5 : #include "source/common/protobuf/utility.h" 6 : #include "source/extensions/tracers/zipkin/util.h" 7 : #include "source/extensions/tracers/zipkin/zipkin_core_constants.h" 8 : #include "source/extensions/tracers/zipkin/zipkin_json_field_names.h" 9 : 10 : #include "absl/strings/str_join.h" 11 : #include "absl/strings/str_replace.h" 12 : 13 : namespace Envoy { 14 : namespace Extensions { 15 : namespace Tracers { 16 : namespace Zipkin { 17 : 18 : SpanBuffer::SpanBuffer( 19 : const envoy::config::trace::v3::ZipkinConfig::CollectorEndpointVersion& version, 20 : const bool shared_span_context) 21 0 : : serializer_{makeSerializer(version, shared_span_context)} {} 22 : 23 : SpanBuffer::SpanBuffer( 24 : const envoy::config::trace::v3::ZipkinConfig::CollectorEndpointVersion& version, 25 : const bool shared_span_context, uint64_t size) 26 0 : : serializer_{makeSerializer(version, shared_span_context)} { 27 0 : allocateBuffer(size); 28 0 : } 29 : 30 0 : bool SpanBuffer::addSpan(Span&& span) { 31 0 : const auto& annotations = span.annotations(); 32 0 : if (span_buffer_.size() == span_buffer_.capacity() || annotations.empty() || 33 0 : annotations.end() == 34 0 : std::find_if(annotations.begin(), annotations.end(), [](const auto& annotation) { 35 0 : return annotation.value() == CLIENT_SEND || annotation.value() == SERVER_RECV; 36 0 : })) { 37 : 38 : // Buffer full or invalid span. 39 0 : return false; 40 0 : } 41 : 42 0 : span_buffer_.push_back(std::move(span)); 43 : 44 0 : return true; 45 0 : } 46 : 47 : SerializerPtr SpanBuffer::makeSerializer( 48 : const envoy::config::trace::v3::ZipkinConfig::CollectorEndpointVersion& version, 49 0 : const bool shared_span_context) { 50 0 : switch (version) { 51 0 : PANIC_ON_PROTO_ENUM_SENTINEL_VALUES; 52 0 : case envoy::config::trace::v3::ZipkinConfig::DEPRECATED_AND_UNAVAILABLE_DO_NOT_USE: 53 0 : throw EnvoyException( 54 0 : "hidden_envoy_deprecated_HTTP_JSON_V1 has been deprecated. Please use a non-default " 55 0 : "envoy::config::trace::v3::ZipkinConfig::CollectorEndpointVersion value."); 56 0 : case envoy::config::trace::v3::ZipkinConfig::HTTP_JSON: 57 0 : return std::make_unique<JsonV2Serializer>(shared_span_context); 58 0 : case envoy::config::trace::v3::ZipkinConfig::HTTP_PROTO: 59 0 : return std::make_unique<ProtobufSerializer>(shared_span_context); 60 0 : case envoy::config::trace::v3::ZipkinConfig::GRPC: 61 0 : PANIC("not handled"); 62 0 : } 63 0 : PANIC_DUE_TO_CORRUPT_ENUM; 64 0 : } 65 : 66 : JsonV2Serializer::JsonV2Serializer(const bool shared_span_context) 67 0 : : shared_span_context_{shared_span_context} {} 68 : 69 0 : std::string JsonV2Serializer::serialize(const std::vector<Span>& zipkin_spans) { 70 0 : Util::Replacements replacements; 71 0 : const std::string serialized_elements = absl::StrJoin( 72 0 : zipkin_spans, ",", [this, &replacements](std::string* out, const Span& zipkin_span) { 73 0 : const auto& replacement_values = replacements; 74 0 : absl::StrAppend( 75 0 : out, absl::StrJoin( 76 0 : toListOfSpans(zipkin_span, replacements), ",", 77 0 : [&replacement_values](std::string* element, const ProtobufWkt::Struct& span) { 78 0 : absl::StatusOr<std::string> json_or_error = 79 0 : MessageUtil::getJsonStringFromMessage(span, false, true); 80 0 : ENVOY_BUG(json_or_error.ok(), "Failed to parse json"); 81 0 : if (json_or_error.ok()) { 82 0 : absl::StrAppend(element, absl::StrReplaceAll(json_or_error.value(), 83 0 : replacement_values)); 84 0 : } 85 : 86 : // The Zipkin API V2 specification mandates to store timestamp value as int64 87 : // https://github.com/openzipkin/zipkin-api/blob/228fabe660f1b5d1e28eac9df41f7d1deed4a1c2/zipkin2-api.yaml#L447-L463 88 : // (often translated as uint64 in some of the official implementations: 89 : // https://github.com/openzipkin/zipkin-go/blob/62dc8b26c05e0e8b88eb7536eff92498e65bbfc3/model/span.go#L114, 90 : // and see the discussion here: 91 : // https://github.com/openzipkin/zipkin-go/pull/161#issuecomment-598558072). 92 : // However, when the timestamp is stored as number value in a protobuf 93 : // struct, it is stored as a double. Because of how protobuf serializes 94 : // doubles, there is a possibility that the value will be rendered as a 95 : // number with scientific notation as reported in: 96 : // https://github.com/envoyproxy/envoy/issues/9341#issuecomment-566912973. To 97 : // deal with that issue, here we do a workaround by storing the timestamp as 98 : // string and keeping track of that with the corresponding integer 99 : // replacements, and do the replacement here so we can meet the Zipkin API V2 100 : // requirements. 101 : // 102 : // TODO(dio): The right fix for this is to introduce additional knob when 103 : // serializing double in protobuf DoubleToBuffer function, and make it 104 : // available to be controlled at caller site. 105 : // https://github.com/envoyproxy/envoy/issues/10411). 106 0 : })); 107 0 : }); 108 0 : return absl::StrCat("[", serialized_elements, "]"); 109 0 : } 110 : 111 : const std::vector<ProtobufWkt::Struct> 112 0 : JsonV2Serializer::toListOfSpans(const Span& zipkin_span, Util::Replacements& replacements) const { 113 0 : std::vector<ProtobufWkt::Struct> spans; 114 0 : spans.reserve(zipkin_span.annotations().size()); 115 : 116 : // This holds the annotation entries from logs. 117 0 : std::vector<ProtobufWkt::Value> annotation_entries; 118 : 119 0 : for (const auto& annotation : zipkin_span.annotations()) { 120 0 : ProtobufWkt::Struct span; 121 0 : auto* fields = span.mutable_fields(); 122 0 : if (annotation.value() == CLIENT_SEND) { 123 0 : (*fields)[SPAN_KIND] = ValueUtil::stringValue(KIND_CLIENT); 124 0 : } else if (annotation.value() == SERVER_RECV) { 125 0 : if (shared_span_context_ && zipkin_span.annotations().size() > 1) { 126 0 : (*fields)[SPAN_SHARED] = ValueUtil::boolValue(true); 127 0 : } 128 0 : (*fields)[SPAN_KIND] = ValueUtil::stringValue(KIND_SERVER); 129 0 : } else { 130 0 : ProtobufWkt::Struct annotation_entry; 131 0 : auto* annotation_entry_fields = annotation_entry.mutable_fields(); 132 0 : (*annotation_entry_fields)[ANNOTATION_VALUE] = ValueUtil::stringValue(annotation.value()); 133 0 : (*annotation_entry_fields)[ANNOTATION_TIMESTAMP] = 134 0 : Util::uint64Value(annotation.timestamp(), ANNOTATION_TIMESTAMP, replacements); 135 0 : annotation_entries.push_back(ValueUtil::structValue(annotation_entry)); 136 0 : continue; 137 0 : } 138 : 139 0 : if (annotation.isSetEndpoint()) { 140 : // Usually we store number to a ProtobufWkt::Struct object via ValueUtil::numberValue. 141 : // However, due to the possibility of rendering that to a number with scientific notation, we 142 : // chose to store it as a string and keeping track the corresponding replacement. For example, 143 : // we have 1584324295476870 if we stored it as a double value, MessageToJsonString gives 144 : // us 1.58432429547687e+15. Instead we store it as the string of 1584324295476870 (when it is 145 : // serialized: "1584324295476870"), and replace it post MessageToJsonString serialization with 146 : // integer (1584324295476870 without `"`), see: JsonV2Serializer::serialize. 147 0 : (*fields)[SPAN_TIMESTAMP] = 148 0 : Util::uint64Value(annotation.timestamp(), SPAN_TIMESTAMP, replacements); 149 0 : (*fields)[SPAN_LOCAL_ENDPOINT] = 150 0 : ValueUtil::structValue(toProtoEndpoint(annotation.endpoint())); 151 0 : } 152 : 153 0 : (*fields)[SPAN_TRACE_ID] = ValueUtil::stringValue(zipkin_span.traceIdAsHexString()); 154 0 : if (zipkin_span.isSetParentId()) { 155 0 : (*fields)[SPAN_PARENT_ID] = ValueUtil::stringValue(zipkin_span.parentIdAsHexString()); 156 0 : } 157 : 158 0 : (*fields)[SPAN_ID] = ValueUtil::stringValue(zipkin_span.idAsHexString()); 159 : 160 0 : const auto& span_name = zipkin_span.name(); 161 0 : if (!span_name.empty()) { 162 0 : (*fields)[SPAN_NAME] = ValueUtil::stringValue(span_name); 163 0 : } 164 : 165 0 : if (zipkin_span.isSetDuration()) { 166 : // Since SPAN_DURATION has the same data type with SPAN_TIMESTAMP, we use Util::uint64Value to 167 : // store it. 168 0 : (*fields)[SPAN_DURATION] = 169 0 : Util::uint64Value(zipkin_span.duration(), SPAN_DURATION, replacements); 170 0 : } 171 : 172 0 : const auto& binary_annotations = zipkin_span.binaryAnnotations(); 173 0 : if (!binary_annotations.empty()) { 174 0 : ProtobufWkt::Struct tags; 175 0 : auto* tag_fields = tags.mutable_fields(); 176 0 : for (const auto& binary_annotation : binary_annotations) { 177 0 : (*tag_fields)[binary_annotation.key()] = ValueUtil::stringValue(binary_annotation.value()); 178 0 : } 179 0 : (*fields)[SPAN_TAGS] = ValueUtil::structValue(tags); 180 0 : } 181 : 182 0 : spans.push_back(std::move(span)); 183 0 : } 184 : 185 : // Fill up annotation entries from logs. 186 0 : for (auto& span : spans) { 187 0 : auto* fields = span.mutable_fields(); 188 0 : if (!annotation_entries.empty()) { 189 0 : (*fields)[ANNOTATIONS] = ValueUtil::listValue(annotation_entries); 190 0 : } 191 0 : } 192 : 193 0 : return spans; 194 0 : } 195 : 196 0 : const ProtobufWkt::Struct JsonV2Serializer::toProtoEndpoint(const Endpoint& zipkin_endpoint) const { 197 0 : ProtobufWkt::Struct endpoint; 198 0 : auto* fields = endpoint.mutable_fields(); 199 : 200 0 : Network::Address::InstanceConstSharedPtr address = zipkin_endpoint.address(); 201 0 : if (address) { 202 0 : if (address->ip()->version() == Network::Address::IpVersion::v4) { 203 0 : (*fields)[ENDPOINT_IPV4] = ValueUtil::stringValue(address->ip()->addressAsString()); 204 0 : } else { 205 0 : (*fields)[ENDPOINT_IPV6] = ValueUtil::stringValue(address->ip()->addressAsString()); 206 0 : } 207 0 : (*fields)[ENDPOINT_PORT] = ValueUtil::numberValue(address->ip()->port()); 208 0 : } 209 : 210 0 : const std::string& service_name = zipkin_endpoint.serviceName(); 211 0 : if (!service_name.empty()) { 212 0 : (*fields)[ENDPOINT_SERVICE_NAME] = ValueUtil::stringValue(service_name); 213 0 : } 214 : 215 0 : return endpoint; 216 0 : } 217 : 218 : ProtobufSerializer::ProtobufSerializer(const bool shared_span_context) 219 0 : : shared_span_context_{shared_span_context} {} 220 : 221 0 : std::string ProtobufSerializer::serialize(const std::vector<Span>& zipkin_spans) { 222 0 : zipkin::proto3::ListOfSpans spans; 223 0 : for (const Span& zipkin_span : zipkin_spans) { 224 0 : spans.MergeFrom(toListOfSpans(zipkin_span)); 225 0 : } 226 0 : std::string serialized; 227 0 : spans.SerializeToString(&serialized); 228 0 : return serialized; 229 0 : } 230 : 231 0 : const zipkin::proto3::ListOfSpans ProtobufSerializer::toListOfSpans(const Span& zipkin_span) const { 232 0 : zipkin::proto3::ListOfSpans spans; 233 : 234 : // This holds the annotation entries from logs. 235 0 : std::vector<Annotation> annotation_entries; 236 : 237 0 : for (const auto& annotation : zipkin_span.annotations()) { 238 0 : zipkin::proto3::Span span; 239 0 : if (annotation.value() == CLIENT_SEND) { 240 0 : span.set_kind(zipkin::proto3::Span::CLIENT); 241 0 : } else if (annotation.value() == SERVER_RECV) { 242 0 : span.set_shared(shared_span_context_ && zipkin_span.annotations().size() > 1); 243 0 : span.set_kind(zipkin::proto3::Span::SERVER); 244 0 : } else { 245 0 : annotation_entries.push_back(annotation); 246 0 : continue; 247 0 : } 248 : 249 0 : if (annotation.isSetEndpoint()) { 250 0 : span.set_timestamp(annotation.timestamp()); 251 0 : span.mutable_local_endpoint()->MergeFrom(toProtoEndpoint(annotation.endpoint())); 252 0 : } 253 : 254 0 : span.set_trace_id(zipkin_span.traceIdAsByteString()); 255 0 : if (zipkin_span.isSetParentId()) { 256 0 : span.set_parent_id(zipkin_span.parentIdAsByteString()); 257 0 : } 258 : 259 0 : span.set_id(zipkin_span.idAsByteString()); 260 0 : span.set_name(zipkin_span.name()); 261 : 262 0 : if (zipkin_span.isSetDuration()) { 263 0 : span.set_duration(zipkin_span.duration()); 264 0 : } 265 : 266 0 : auto& tags = *span.mutable_tags(); 267 0 : for (const auto& binary_annotation : zipkin_span.binaryAnnotations()) { 268 0 : tags[binary_annotation.key()] = binary_annotation.value(); 269 0 : } 270 : 271 0 : auto* mutable_span = spans.add_spans(); 272 0 : mutable_span->MergeFrom(span); 273 0 : } 274 : 275 : // Fill up annotation entries from logs. 276 0 : for (auto& span : *spans.mutable_spans()) { 277 0 : for (const auto& annotation_entry : annotation_entries) { 278 0 : const auto entry = span.mutable_annotations()->Add(); 279 0 : entry->set_value(annotation_entry.value()); 280 0 : entry->set_timestamp(annotation_entry.timestamp()); 281 0 : } 282 0 : } 283 : 284 0 : return spans; 285 0 : } 286 : 287 : const zipkin::proto3::Endpoint 288 0 : ProtobufSerializer::toProtoEndpoint(const Endpoint& zipkin_endpoint) const { 289 0 : zipkin::proto3::Endpoint endpoint; 290 0 : Network::Address::InstanceConstSharedPtr address = zipkin_endpoint.address(); 291 0 : if (address) { 292 0 : if (address->ip()->version() == Network::Address::IpVersion::v4) { 293 0 : endpoint.set_ipv4(Util::toByteString(address->ip()->ipv4()->address())); 294 0 : } else { 295 0 : endpoint.set_ipv6(Util::toByteString(address->ip()->ipv6()->address())); 296 0 : } 297 0 : endpoint.set_port(address->ip()->port()); 298 0 : } 299 : 300 0 : const std::string& service_name = zipkin_endpoint.serviceName(); 301 0 : if (!service_name.empty()) { 302 0 : endpoint.set_service_name(service_name); 303 0 : } 304 : 305 0 : return endpoint; 306 0 : } 307 : 308 : } // namespace Zipkin 309 : } // namespace Tracers 310 : } // namespace Extensions 311 : } // namespace Envoy