1
#include "source/extensions/tracers/zipkin/span_buffer.h"
2

            
3
#include "envoy/config/trace/v3/zipkin.pb.h"
4

            
5
#include "source/common/protobuf/utility.h"
6
#include "source/extensions/tracers/zipkin/util.h"
7
#include "source/extensions/tracers/zipkin/zipkin_core_constants.h"
8
#include "source/extensions/tracers/zipkin/zipkin_json_field_names.h"
9

            
10
#include "absl/strings/str_join.h"
11
#include "absl/strings/str_replace.h"
12

            
13
namespace Envoy {
14
namespace Extensions {
15
namespace Tracers {
16
namespace Zipkin {
17

            
18
SpanBuffer::SpanBuffer(
19
    const envoy::config::trace::v3::ZipkinConfig::CollectorEndpointVersion& version,
20
    const bool shared_span_context)
21
46
    : serializer_{makeSerializer(version, shared_span_context)} {}
22

            
23
SpanBuffer::SpanBuffer(
24
    const envoy::config::trace::v3::ZipkinConfig::CollectorEndpointVersion& version,
25
    const bool shared_span_context, uint64_t size)
26
10
    : serializer_{makeSerializer(version, shared_span_context)} {
27
10
  allocateBuffer(size);
28
10
}
29

            
30
43
bool SpanBuffer::addSpan(Span&& span) {
31
43
  const auto& annotations = span.annotations();
32
43
  if (span_buffer_.size() == span_buffer_.capacity() || annotations.empty() ||
33
43
      annotations.end() ==
34
35
          std::find_if(annotations.begin(), annotations.end(), [](const auto& annotation) {
35
35
            return annotation.value() == CLIENT_SEND || annotation.value() == SERVER_RECV;
36
35
          })) {
37

            
38
    // Buffer full or invalid span.
39
11
    return false;
40
11
  }
41

            
42
32
  span_buffer_.push_back(std::move(span));
43

            
44
32
  return true;
45
43
}
46

            
47
SerializerPtr SpanBuffer::makeSerializer(
48
    const envoy::config::trace::v3::ZipkinConfig::CollectorEndpointVersion& version,
49
56
    const bool shared_span_context) {
50
56
  switch (version) {
51
    PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
52
1
  case envoy::config::trace::v3::ZipkinConfig::DEPRECATED_AND_UNAVAILABLE_DO_NOT_USE:
53
1
    throw EnvoyException(
54
1
        "hidden_envoy_deprecated_HTTP_JSON_V1 has been deprecated. Please use a non-default "
55
1
        "envoy::config::trace::v3::ZipkinConfig::CollectorEndpointVersion value.");
56
49
  case envoy::config::trace::v3::ZipkinConfig::HTTP_JSON:
57
49
    return std::make_unique<JsonV2Serializer>(shared_span_context);
58
6
  case envoy::config::trace::v3::ZipkinConfig::HTTP_PROTO:
59
6
    return std::make_unique<ProtobufSerializer>(shared_span_context);
60
  case envoy::config::trace::v3::ZipkinConfig::GRPC:
61
    PANIC("not handled");
62
56
  }
63
  PANIC_DUE_TO_CORRUPT_ENUM;
64
}
65

            
66
JsonV2Serializer::JsonV2Serializer(const bool shared_span_context)
67
49
    : shared_span_context_{shared_span_context} {}
68

            
69
32
std::string JsonV2Serializer::serialize(const std::vector<Span>& zipkin_spans) {
70
32
  Util::Replacements replacements;
71
32
  const std::string serialized_elements = absl::StrJoin(
72
32
      zipkin_spans, ",", [this, &replacements](std::string* out, const Span& zipkin_span) {
73
28
        const auto& replacement_values = replacements;
74
28
        absl::StrAppend(
75
28
            out, absl::StrJoin(
76
28
                     toListOfSpans(zipkin_span, replacements), ",",
77
33
                     [&replacement_values](std::string* element, const Protobuf::Struct& span) {
78
33
                       absl::StatusOr<std::string> json_or_error =
79
33
                           MessageUtil::getJsonStringFromMessage(span, false, true);
80
33
                       ENVOY_BUG(json_or_error.ok(), "Failed to parse json");
81
33
                       if (json_or_error.ok()) {
82
33
                         absl::StrAppend(element, absl::StrReplaceAll(json_or_error.value(),
83
33
                                                                      replacement_values));
84
33
                       }
85

            
86
                       // The Zipkin API V2 specification mandates to store timestamp value as int64
87
                       // https://github.com/openzipkin/zipkin-api/blob/228fabe660f1b5d1e28eac9df41f7d1deed4a1c2/zipkin2-api.yaml#L447-L463
88
                       // (often translated as uint64 in some of the official implementations:
89
                       // https://github.com/openzipkin/zipkin-go/blob/62dc8b26c05e0e8b88eb7536eff92498e65bbfc3/model/span.go#L114,
90
                       // and see the discussion here:
91
                       // https://github.com/openzipkin/zipkin-go/pull/161#issuecomment-598558072).
92
                       // However, when the timestamp is stored as number value in a protobuf
93
                       // struct, it is stored as a double. Because of how protobuf serializes
94
                       // doubles, there is a possibility that the value will be rendered as a
95
                       // number with scientific notation as reported in:
96
                       // https://github.com/envoyproxy/envoy/issues/9341#issuecomment-566912973. To
97
                       // deal with that issue, here we do a workaround by storing the timestamp as
98
                       // string and keeping track of that with the corresponding integer
99
                       // replacements, and do the replacement here so we can meet the Zipkin API V2
100
                       // requirements.
101
                       //
102
                       // TODO(dio): The right fix for this is to introduce additional knob when
103
                       // serializing double in protobuf DoubleToBuffer function, and make it
104
                       // available to be controlled at caller site.
105
                       // https://github.com/envoyproxy/envoy/issues/10411).
106
33
                     }));
107
28
      });
108
32
  return absl::StrCat("[", serialized_elements, "]");
109
32
}
110

            
111
const std::vector<Protobuf::Struct>
112
28
JsonV2Serializer::toListOfSpans(const Span& zipkin_span, Util::Replacements& replacements) const {
113
28
  std::vector<Protobuf::Struct> spans;
114
28
  spans.reserve(zipkin_span.annotations().size());
115

            
116
  // This holds the annotation entries from logs.
117
28
  std::vector<Protobuf::Value> annotation_entries;
118

            
119
49
  for (const auto& annotation : zipkin_span.annotations()) {
120
49
    Protobuf::Struct span;
121
49
    auto* fields = span.mutable_fields();
122
49
    if (annotation.value() == CLIENT_SEND) {
123
13
      (*fields)[SPAN_KIND] = ValueUtil::stringValue(KIND_CLIENT);
124
36
    } else if (annotation.value() == SERVER_RECV) {
125
20
      if (shared_span_context_ && zipkin_span.annotations().size() > 1) {
126
18
        (*fields)[SPAN_SHARED] = ValueUtil::boolValue(true);
127
18
      }
128
20
      (*fields)[SPAN_KIND] = ValueUtil::stringValue(KIND_SERVER);
129
20
    } else {
130
16
      Protobuf::Struct annotation_entry;
131
16
      auto* annotation_entry_fields = annotation_entry.mutable_fields();
132
16
      (*annotation_entry_fields)[ANNOTATION_VALUE] = ValueUtil::stringValue(annotation.value());
133
16
      (*annotation_entry_fields)[ANNOTATION_TIMESTAMP] =
134
16
          Util::uint64Value(annotation.timestamp(), ANNOTATION_TIMESTAMP, replacements);
135
16
      annotation_entries.push_back(ValueUtil::structValue(annotation_entry));
136
16
      continue;
137
16
    }
138

            
139
33
    if (annotation.isSetEndpoint()) {
140
      // Usually we store number to a Protobuf::Struct object via ValueUtil::numberValue.
141
      // However, due to the possibility of rendering that to a number with scientific notation, we
142
      // chose to store it as a string and keeping track the corresponding replacement. For example,
143
      // we have 1584324295476870 if we stored it as a double value, MessageToJsonString gives
144
      // us 1.58432429547687e+15. Instead we store it as the string of 1584324295476870 (when it is
145
      // serialized: "1584324295476870"), and replace it post MessageToJsonString serialization with
146
      // integer (1584324295476870 without `"`), see: JsonV2Serializer::serialize.
147
33
      (*fields)[SPAN_TIMESTAMP] =
148
33
          Util::uint64Value(annotation.timestamp(), SPAN_TIMESTAMP, replacements);
149
33
      (*fields)[SPAN_LOCAL_ENDPOINT] =
150
33
          ValueUtil::structValue(toProtoEndpoint(annotation.endpoint()));
151
33
    }
152

            
153
33
    (*fields)[SPAN_TRACE_ID] = ValueUtil::stringValue(zipkin_span.traceIdAsHexString());
154
33
    if (zipkin_span.isSetParentId()) {
155
      (*fields)[SPAN_PARENT_ID] = ValueUtil::stringValue(zipkin_span.parentIdAsHexString());
156
    }
157

            
158
33
    (*fields)[SPAN_ID] = ValueUtil::stringValue(zipkin_span.idAsHexString());
159

            
160
33
    const auto& span_name = zipkin_span.name();
161
33
    if (!span_name.empty()) {
162
15
      (*fields)[SPAN_NAME] = ValueUtil::stringValue(span_name);
163
15
    }
164

            
165
33
    if (zipkin_span.isSetDuration()) {
166
      // Since SPAN_DURATION has the same data type with SPAN_TIMESTAMP, we use Util::uint64Value to
167
      // store it.
168
18
      (*fields)[SPAN_DURATION] =
169
18
          Util::uint64Value(zipkin_span.duration(), SPAN_DURATION, replacements);
170
18
    }
171

            
172
33
    const auto& binary_annotations = zipkin_span.binaryAnnotations();
173
33
    if (!binary_annotations.empty()) {
174
18
      Protobuf::Struct tags;
175
18
      auto* tag_fields = tags.mutable_fields();
176
18
      for (const auto& binary_annotation : binary_annotations) {
177
18
        (*tag_fields)[binary_annotation.key()] = ValueUtil::stringValue(binary_annotation.value());
178
18
      }
179
18
      (*fields)[SPAN_TAGS] = ValueUtil::structValue(tags);
180
18
    }
181

            
182
33
    spans.push_back(std::move(span));
183
33
  }
184

            
185
  // Fill up annotation entries from logs.
186
33
  for (auto& span : spans) {
187
33
    auto* fields = span.mutable_fields();
188
33
    if (!annotation_entries.empty()) {
189
16
      (*fields)[ANNOTATIONS] = ValueUtil::listValue(annotation_entries);
190
16
    }
191
33
  }
192

            
193
28
  return spans;
194
28
}
195

            
196
33
const Protobuf::Struct JsonV2Serializer::toProtoEndpoint(const Endpoint& zipkin_endpoint) const {
197
33
  Protobuf::Struct endpoint;
198
33
  auto* fields = endpoint.mutable_fields();
199

            
200
33
  Network::Address::InstanceConstSharedPtr address = zipkin_endpoint.address();
201
33
  if (address) {
202
33
    if (address->ip()->version() == Network::Address::IpVersion::v4) {
203
32
      (*fields)[ENDPOINT_IPV4] = ValueUtil::stringValue(address->ip()->addressAsString());
204
32
    } else {
205
1
      (*fields)[ENDPOINT_IPV6] = ValueUtil::stringValue(address->ip()->addressAsString());
206
1
    }
207
33
    (*fields)[ENDPOINT_PORT] = ValueUtil::numberValue(address->ip()->port());
208
33
  }
209

            
210
33
  const std::string& service_name = zipkin_endpoint.serviceName();
211
33
  if (!service_name.empty()) {
212
33
    (*fields)[ENDPOINT_SERVICE_NAME] = ValueUtil::stringValue(service_name);
213
33
  }
214

            
215
33
  return endpoint;
216
33
}
217

            
218
ProtobufSerializer::ProtobufSerializer(const bool shared_span_context)
219
6
    : shared_span_context_{shared_span_context} {}
220

            
221
5
std::string ProtobufSerializer::serialize(const std::vector<Span>& zipkin_spans) {
222
5
  zipkin::proto3::ListOfSpans spans;
223
6
  for (const Span& zipkin_span : zipkin_spans) {
224
6
    spans.MergeFrom(toListOfSpans(zipkin_span));
225
6
  }
226
5
  std::string serialized;
227
5
  spans.SerializeToString(&serialized);
228
5
  return serialized;
229
5
}
230

            
231
6
const zipkin::proto3::ListOfSpans ProtobufSerializer::toListOfSpans(const Span& zipkin_span) const {
232
6
  zipkin::proto3::ListOfSpans spans;
233

            
234
  // This holds the annotation entries from logs.
235
6
  std::vector<Annotation> annotation_entries;
236

            
237
11
  for (const auto& annotation : zipkin_span.annotations()) {
238
11
    zipkin::proto3::Span span;
239
11
    if (annotation.value() == CLIENT_SEND) {
240
4
      span.set_kind(zipkin::proto3::Span::CLIENT);
241
7
    } else if (annotation.value() == SERVER_RECV) {
242
4
      span.set_shared(shared_span_context_ && zipkin_span.annotations().size() > 1);
243
4
      span.set_kind(zipkin::proto3::Span::SERVER);
244
4
    } else {
245
3
      annotation_entries.push_back(annotation);
246
3
      continue;
247
3
    }
248

            
249
8
    if (annotation.isSetEndpoint()) {
250
8
      span.set_timestamp(annotation.timestamp());
251
8
      span.mutable_local_endpoint()->MergeFrom(toProtoEndpoint(annotation.endpoint()));
252
8
    }
253

            
254
8
    span.set_trace_id(zipkin_span.traceIdAsByteString());
255
8
    if (zipkin_span.isSetParentId()) {
256
      span.set_parent_id(zipkin_span.parentIdAsByteString());
257
    }
258

            
259
8
    span.set_id(zipkin_span.idAsByteString());
260
8
    span.set_name(zipkin_span.name());
261

            
262
8
    if (zipkin_span.isSetDuration()) {
263
6
      span.set_duration(zipkin_span.duration());
264
6
    }
265

            
266
8
    auto& tags = *span.mutable_tags();
267
8
    for (const auto& binary_annotation : zipkin_span.binaryAnnotations()) {
268
6
      tags[binary_annotation.key()] = binary_annotation.value();
269
6
    }
270

            
271
8
    auto* mutable_span = spans.add_spans();
272
8
    mutable_span->MergeFrom(span);
273
8
  }
274

            
275
  // Fill up annotation entries from logs.
276
8
  for (auto& span : *spans.mutable_spans()) {
277
8
    for (const auto& annotation_entry : annotation_entries) {
278
3
      const auto entry = span.mutable_annotations()->Add();
279
3
      entry->set_value(annotation_entry.value());
280
3
      entry->set_timestamp(annotation_entry.timestamp());
281
3
    }
282
8
  }
283

            
284
6
  return spans;
285
6
}
286

            
287
const zipkin::proto3::Endpoint
288
8
ProtobufSerializer::toProtoEndpoint(const Endpoint& zipkin_endpoint) const {
289
8
  zipkin::proto3::Endpoint endpoint;
290
8
  Network::Address::InstanceConstSharedPtr address = zipkin_endpoint.address();
291
8
  if (address) {
292
8
    if (address->ip()->version() == Network::Address::IpVersion::v4) {
293
7
      endpoint.set_ipv4(Util::toByteString(address->ip()->ipv4()->address()));
294
7
    } else {
295
1
      endpoint.set_ipv6(Util::toByteString(address->ip()->ipv6()->address()));
296
1
    }
297
8
    endpoint.set_port(address->ip()->port());
298
8
  }
299

            
300
8
  const std::string& service_name = zipkin_endpoint.serviceName();
301
8
  if (!service_name.empty()) {
302
8
    endpoint.set_service_name(service_name);
303
8
  }
304

            
305
8
  return endpoint;
306
8
}
307

            
308
} // namespace Zipkin
309
} // namespace Tracers
310
} // namespace Extensions
311
} // namespace Envoy