1
#include "source/extensions/stat_sinks/common/statsd/statsd.h"
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <string>
6

            
7
#include "envoy/buffer/buffer.h"
8
#include "envoy/common/platform.h"
9
#include "envoy/event/dispatcher.h"
10
#include "envoy/stats/scope.h"
11
#include "envoy/upstream/cluster_manager.h"
12

            
13
#include "source/common/api/os_sys_calls_impl.h"
14
#include "source/common/buffer/buffer_impl.h"
15
#include "source/common/common/assert.h"
16
#include "source/common/common/fmt.h"
17
#include "source/common/common/utility.h"
18
#include "source/common/config/utility.h"
19
#include "source/common/network/socket_interface.h"
20
#include "source/common/network/utility.h"
21
#include "source/common/stats/symbol_table.h"
22

            
23
#include "absl/strings/str_join.h"
24

            
25
namespace Envoy {
26
namespace Extensions {
27
namespace StatSinks {
28
namespace Common {
29
namespace Statsd {
30

            
31
UdpStatsdSink::WriterImpl::WriterImpl(UdpStatsdSink& parent)
32
14
    : parent_(parent), io_handle_(Network::ioHandleForAddr(Network::Socket::Type::Datagram,
33
14
                                                           parent_.server_address_, {})) {}
34

            
35
12
void UdpStatsdSink::WriterImpl::write(const std::string& message) {
36
  // TODO(mattklein123): We can avoid this const_cast pattern by having a constant variant of
37
  // RawSlice. This can be fixed elsewhere as well.
38
12
  Buffer::RawSlice slice{const_cast<char*>(message.c_str()), message.size()};
39
12
  Network::Utility::writeToSocket(*io_handle_, &slice, 1, nullptr, *parent_.server_address_);
40
12
}
41

            
42
void UdpStatsdSink::WriterImpl::writeBuffer(Buffer::Instance& data) {
43
  Network::Utility::writeToSocket(*io_handle_, data, nullptr, *parent_.server_address_);
44
}
45

            
46
UdpStatsdSink::UdpStatsdSink(ThreadLocal::SlotAllocator& tls,
47
                             Network::Address::InstanceConstSharedPtr address, const bool use_tag,
48
                             const std::string& prefix, absl::optional<uint64_t> buffer_size,
49
                             const Statsd::TagFormat& tag_format)
50
14
    : tls_(tls.allocateSlot()), server_address_(std::move(address)), use_tag_(use_tag),
51
14
      prefix_(prefix.empty() ? Statsd::getDefaultPrefix() : prefix),
52
14
      buffer_size_(buffer_size.value_or(0)), tag_format_(tag_format) {
53
14
  tls_->set([this](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr {
54
14
    return std::make_shared<WriterImpl>(*this);
55
14
  });
56
14
}
57

            
58
15
void UdpStatsdSink::flush(Stats::MetricSnapshot& snapshot) {
59
15
  Writer& writer = tls_->getTyped<Writer>();
60
15
  Buffer::OwnedImpl buffer;
61

            
62
16
  for (const auto& counter : snapshot.counters()) {
63
16
    if (counter.counter_.get().used()) {
64
12
      const std::string counter_str = buildMessage(counter.counter_.get(), counter.delta_, "|c");
65
12
      writeBuffer(buffer, writer, counter_str);
66
12
    }
67
16
  }
68

            
69
15
  for (const auto& counter : snapshot.hostCounters()) {
70
2
    const std::string counter_str = buildMessage(counter, counter.delta(), "|c");
71
2
    writeBuffer(buffer, writer, counter_str);
72
2
  }
73

            
74
15
  for (const auto& gauge : snapshot.gauges()) {
75
8
    if (gauge.get().used()) {
76
8
      const std::string gauge_str = buildMessage(gauge.get(), gauge.get().value(), "|g");
77
8
      writeBuffer(buffer, writer, gauge_str);
78
8
    }
79
8
  }
80

            
81
15
  for (const auto& gauge : snapshot.hostGauges()) {
82
2
    const std::string gauge_str = buildMessage(gauge, gauge.value(), "|g");
83
2
    writeBuffer(buffer, writer, gauge_str);
84
2
  }
85

            
86
15
  flushBuffer(buffer, writer);
87
  // TODO(efimki): Add support of text readouts stats.
88
15
}
89

            
90
void UdpStatsdSink::writeBuffer(Buffer::OwnedImpl& buffer, Writer& writer,
91
24
                                const std::string& statsd_metric) const {
92
24
  if (statsd_metric.length() >= buffer_size_) {
93
    // Our statsd_metric is too large to fit into the buffer, skip buffering and write directly
94
12
    writer.write(statsd_metric);
95
12
  } else {
96
12
    if ((buffer.length() + statsd_metric.length() + 1) > buffer_size_) {
97
      // If we add the new statsd_metric, we'll overflow our buffer. Flush the buffer to make
98
      // room for the new statsd_metric.
99
1
      flushBuffer(buffer, writer);
100
11
    } else if (buffer.length() > 0) {
101
      // We have room and have metrics already in the buffer, add a newline to separate
102
      // metric entries.
103
2
      buffer.add("\n");
104
2
    }
105
12
    buffer.add(statsd_metric);
106
12
  }
107
24
}
108

            
109
16
void UdpStatsdSink::flushBuffer(Buffer::OwnedImpl& buffer, Writer& writer) const {
110
16
  if (buffer.length() == 0) {
111
6
    return;
112
6
  }
113
10
  writer.writeBuffer(buffer);
114
10
  buffer.drain(buffer.length());
115
10
}
116

            
117
14
void UdpStatsdSink::onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) {
118
  // For statsd histograms are all timers in milliseconds, Envoy histograms are however
119
  // not necessarily timers in milliseconds, for Envoy histograms suffixed with their corresponding
120
  // SI unit symbol this is acceptable, but for histograms without a suffix, especially those which
121
  // are timers but record in units other than milliseconds, it may make sense to scale the value to
122
  // milliseconds here and potentially suffix the names accordingly (minus the pre-existing ones for
123
  // backwards compatibility).
124
14
  std::string message;
125
14
  if (histogram.unit() == Stats::Histogram::Unit::Percent) {
126
    // 32-bit floating point values should have plenty of range for these values, and are faster to
127
    // operate on than 64-bit doubles.
128
1
    constexpr float divisor = Stats::Histogram::PercentScale;
129
1
    const float float_value = value;
130
1
    const float scaled = float_value / divisor;
131
1
    message = buildMessage(histogram, scaled, "|h");
132
13
  } else {
133
13
    message = buildMessage(histogram, std::chrono::milliseconds(value).count(), "|ms");
134
13
  }
135
14
  tls_->getTyped<Writer>().write(message);
136
14
}
137

            
138
template <class StatType, typename ValueType>
139
const std::string UdpStatsdSink::buildMessage(const StatType& metric, ValueType value,
140
38
                                              const std::string& type) const {
141
38
  switch (tag_format_.tag_position) {
142
35
  case Statsd::TagPosition::TagAfterValue: {
143
35
    const std::string message = absl::StrCat(
144
        // metric name
145
35
        prefix_, ".", getName(metric),
146
        // value and type
147
35
        ":", value, type,
148
        // tags
149
35
        buildTagStr(metric.tags()));
150
35
    return message;
151
  }
152

            
153
3
  case Statsd::TagPosition::TagAfterName: {
154
3
    const std::string message = absl::StrCat(
155
        // metric name
156
3
        prefix_, ".", getName(metric),
157
        // tags
158
3
        buildTagStr(metric.tags()),
159
        // value and type
160
3
        ":", value, type);
161
3
    return message;
162
  }
163
38
  }
164
  PANIC_DUE_TO_CORRUPT_ENUM;
165
}
166

            
167
38
template <class StatType> const std::string UdpStatsdSink::getName(const StatType& metric) const {
168
38
  if (use_tag_) {
169
15
    return metric.tagExtractedName();
170
23
  } else {
171
23
    return metric.name();
172
23
  }
173
38
}
174

            
175
38
const std::string UdpStatsdSink::buildTagStr(const std::vector<Stats::Tag>& tags) const {
176
38
  if (!use_tag_ || tags.empty()) {
177
23
    return "";
178
23
  }
179

            
180
15
  std::vector<std::string> tag_strings;
181
15
  tag_strings.reserve(tags.size());
182
25
  for (const Stats::Tag& tag : tags) {
183
25
    tag_strings.emplace_back(tag.name_ + tag_format_.assign + tag.value_);
184
25
  }
185
15
  return tag_format_.start + absl::StrJoin(tag_strings, tag_format_.separator);
186
38
}
187

            
188
TcpStatsdSink::TcpStatsdSink(const LocalInfo::LocalInfo& local_info,
189
                             const std::string& cluster_name, ThreadLocal::SlotAllocator& tls,
190
                             Upstream::ClusterManager& cluster_manager, Stats::Scope& scope,
191
                             absl::Status& creation_status, const std::string& prefix)
192
17
    : prefix_(prefix.empty() ? Statsd::getDefaultPrefix() : prefix), tls_(tls.allocateSlot()),
193
17
      cluster_manager_(cluster_manager),
194
17
      cx_overflow_stat_(scope.counterFromStatName(
195
17
          Stats::StatNameManagedStorage("statsd.cx_overflow", scope.symbolTable()).statName())) {
196
17
  SET_AND_RETURN_IF_NOT_OK(Config::Utility::checkLocalInfo("tcp statsd", local_info),
197
17
                           creation_status);
198
17
  auto cluster_or_error =
199
17
      Config::Utility::checkCluster("tcp statsd", cluster_name, cluster_manager);
200
17
  SET_AND_RETURN_IF_NOT_OK(cluster_or_error.status(), creation_status);
201
17
  const auto cluster = cluster_or_error.value();
202
17
  cluster_info_ = cluster->get().info();
203
17
  tls_->set([this](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr {
204
17
    return std::make_shared<TlsSink>(*this, dispatcher);
205
17
  });
206
17
}
207

            
208
absl::StatusOr<std::unique_ptr<TcpStatsdSink>>
209
TcpStatsdSink::create(const LocalInfo::LocalInfo& local_info, const std::string& cluster_name,
210
                      ThreadLocal::SlotAllocator& tls, Upstream::ClusterManager& cluster_manager,
211
17
                      Stats::Scope& scope, const std::string& prefix) {
212
17
  absl::Status creation_status;
213
17
  auto sink = std::unique_ptr<TcpStatsdSink>(new TcpStatsdSink(
214
17
      local_info, cluster_name, tls, cluster_manager, scope, creation_status, prefix));
215
17
  RETURN_IF_NOT_OK_REF(creation_status);
216
17
  return sink;
217
17
}
218

            
219
9
void TcpStatsdSink::flush(Stats::MetricSnapshot& snapshot) {
220
9
  TlsSink& tls_sink = tls_->getTyped<TlsSink>();
221
9
  tls_sink.beginFlush(true);
222
2007
  for (const auto& counter : snapshot.counters()) {
223
2007
    if (counter.counter_.get().used()) {
224
2007
      tls_sink.flushCounter(counter.counter_.get().name(), counter.delta_);
225
2007
    }
226
2007
  }
227

            
228
9
  for (const auto& counter : snapshot.hostCounters()) {
229
1
    tls_sink.flushCounter(counter.name(), counter.delta());
230
1
  }
231

            
232
9
  for (const auto& gauge : snapshot.gauges()) {
233
1
    if (gauge.get().used()) {
234
1
      tls_sink.flushGauge(gauge.get().name(), gauge.get().value());
235
1
    }
236

            
237
1
    for (const auto& gauge : snapshot.hostGauges()) {
238
1
      tls_sink.flushGauge(gauge.name(), gauge.value());
239
1
    }
240
1
  }
241
  // TODO(efimki): Add support of text readouts stats.
242
9
  tls_sink.endFlush(true);
243
9
}
244

            
245
6
void TcpStatsdSink::onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) {
246
  // For statsd histograms are all timers except percents.
247
6
  if (histogram.unit() == Stats::Histogram::Unit::Percent) {
248
    // 32-bit floating point values should have plenty of range for these values, and are faster to
249
    // operate on than 64-bit doubles.
250
1
    constexpr float divisor = Stats::Histogram::PercentScale;
251
1
    const float float_value = value;
252
1
    const float scaled = float_value / divisor;
253
1
    tls_->getTyped<TlsSink>().onPercentHistogramComplete(histogram.name(), scaled);
254
5
  } else {
255
5
    tls_->getTyped<TlsSink>().onTimespanComplete(histogram.name(),
256
5
                                                 std::chrono::milliseconds(value));
257
5
  }
258
6
}
259

            
260
TcpStatsdSink::TlsSink::TlsSink(TcpStatsdSink& parent, Event::Dispatcher& dispatcher)
261
17
    : parent_(parent), dispatcher_(dispatcher) {}
262

            
263
17
TcpStatsdSink::TlsSink::~TlsSink() {
264
17
  if (connection_) {
265
6
    connection_->close(Network::ConnectionCloseType::NoFlush);
266
6
  }
267
17
}
268

            
269
11
void TcpStatsdSink::TlsSink::beginFlush(bool expect_empty_buffer) {
270
11
  ASSERT(!expect_empty_buffer || buffer_.length() == 0);
271
11
  ASSERT(current_slice_mem_ == nullptr);
272
11
  ASSERT(!current_buffer_reservation_.has_value());
273

            
274
11
  current_buffer_reservation_.emplace(buffer_.reserveSingleSlice(FLUSH_SLICE_SIZE_BYTES));
275

            
276
11
  ASSERT(current_buffer_reservation_->slice().len_ >= FLUSH_SLICE_SIZE_BYTES);
277
11
  current_slice_mem_ = reinterpret_cast<char*>(current_buffer_reservation_->slice().mem_);
278
11
}
279

            
280
2010
void TcpStatsdSink::TlsSink::commonFlush(const std::string& name, uint64_t value, char stat_type) {
281
2010
  ASSERT(current_slice_mem_ != nullptr);
282
  // 36 > 1 ("." after prefix) + 1 (":" after name) + 4 (postfix chars, e.g., "|ms\n") + 30 for
283
  // number (bigger than it will ever be)
284
2010
  const uint32_t max_size = name.size() + parent_.getPrefix().size() + 36;
285
2010
  if (current_buffer_reservation_->slice().len_ - usedBuffer() < max_size) {
286
2
    endFlush(false);
287
2
    beginFlush(false);
288
2
  }
289

            
290
  // Produces something like "envoy.{}:{}|c\n"
291
  // This written this way for maximum perf since with a large number of stats and at a high flush
292
  // rate this can become expensive.
293
2010
  const char* snapped_current = current_slice_mem_;
294
2010
  const std::string prefix = parent_.getPrefix();
295
2010
  memcpy(current_slice_mem_, prefix.data(), prefix.size()); // NOLINT(safe-memcpy)
296
2010
  current_slice_mem_ += prefix.size();
297
2010
  *current_slice_mem_++ = '.';
298
2010
  memcpy(current_slice_mem_, name.data(), name.size()); // NOLINT(safe-memcpy)
299
2010
  current_slice_mem_ += name.size();
300
2010
  *current_slice_mem_++ = ':';
301
2010
  current_slice_mem_ += StringUtil::itoa(current_slice_mem_, 30, value);
302
2010
  *current_slice_mem_++ = '|';
303
2010
  *current_slice_mem_++ = stat_type;
304

            
305
2010
  *current_slice_mem_++ = '\n';
306

            
307
2010
  ASSERT(static_cast<uint64_t>(current_slice_mem_ - snapped_current) < max_size);
308
2010
}
309

            
310
2008
void TcpStatsdSink::TlsSink::flushCounter(const std::string& name, uint64_t delta) {
311
2008
  commonFlush(name, delta, 'c');
312
2008
}
313

            
314
2
void TcpStatsdSink::TlsSink::flushGauge(const std::string& name, uint64_t value) {
315
2
  commonFlush(name, value, 'g');
316
2
}
317

            
318
11
void TcpStatsdSink::TlsSink::endFlush(bool do_write) {
319
11
  ASSERT(current_slice_mem_ != nullptr);
320
11
  ASSERT(current_buffer_reservation_.has_value());
321
11
  current_buffer_reservation_->commit(usedBuffer());
322
11
  current_buffer_reservation_.reset();
323
11
  current_slice_mem_ = nullptr;
324
11
  if (do_write) {
325
9
    write(buffer_);
326
9
    ASSERT(buffer_.length() == 0);
327
9
  }
328
11
}
329

            
330
8
void TcpStatsdSink::TlsSink::onEvent(Network::ConnectionEvent event) {
331
8
  if (event == Network::ConnectionEvent::LocalClose ||
332
8
      event == Network::ConnectionEvent::RemoteClose) {
333
8
    dispatcher_.deferredDelete(std::move(connection_));
334
8
  }
335
8
}
336

            
337
void TcpStatsdSink::TlsSink::onTimespanComplete(const std::string& name,
338
5
                                                std::chrono::milliseconds ms) {
339
  // Ultimately it would be nice to perf optimize this path also, but it's not very frequent. It's
340
  // also currently not possible that this interleaves with any counter/gauge flushing.
341
  // See the comment at UdpStatsdSink::onHistogramComplete with respect to unit suffixes.
342
5
  ASSERT(current_slice_mem_ == nullptr);
343
5
  Buffer::OwnedImpl buffer(
344
5
      fmt::format("{}.{}:{}|ms\n", parent_.getPrefix().c_str(), name, ms.count()));
345
5
  write(buffer);
346
5
}
347

            
348
1
void TcpStatsdSink::TlsSink::onPercentHistogramComplete(const std::string& name, float value) {
349
1
  ASSERT(current_slice_mem_ == nullptr);
350
1
  Buffer::OwnedImpl buffer(fmt::format("{}.{}:{}|h\n", parent_.getPrefix().c_str(), name, value));
351
1
  write(buffer);
352
1
}
353

            
354
15
void TcpStatsdSink::TlsSink::write(Buffer::Instance& buffer) {
355
  // Guard against the stats connection backing up. In this case we probably have no visibility
356
  // into what is going on externally, but we also increment a stat that should be viewable
357
  // locally.
358
  // NOTE: In the current implementation, we write most stats on the main thread, but timers
359
  //       get emitted on the worker threads. Since this is using global buffered data, it's
360
  //       possible that we are about to kill the connection that is not actually backed up.
361
  //       This is essentially a panic state, so it's not worth keeping per thread buffer stats,
362
  //       since if we stay over, the other threads will eventually kill their connections too.
363
  // TODO(mattklein123): The use of the stat is somewhat of a hack, and should be replaced with
364
  // real flow control callbacks once they are available.
365
15
  Upstream::ClusterTrafficStats& cluster_traffic_stats = *parent_.cluster_info_->trafficStats();
366
15
  if (cluster_traffic_stats.upstream_cx_tx_bytes_buffered_.value() > MAX_BUFFERED_STATS_BYTES) {
367
2
    if (connection_) {
368
1
      connection_->close(Network::ConnectionCloseType::NoFlush);
369
1
    }
370
2
    parent_.cx_overflow_stat_.inc();
371
2
    buffer.drain(buffer.length());
372
2
    return;
373
2
  }
374

            
375
13
  if (!connection_) {
376
10
    const auto thread_local_cluster =
377
10
        parent_.cluster_manager_.getThreadLocalCluster(parent_.cluster_info_->name());
378
10
    Upstream::Host::CreateConnectionData info;
379
10
    if (thread_local_cluster != nullptr) {
380
10
      info = thread_local_cluster->tcpConn(nullptr);
381
10
    }
382
10
    if (!info.connection_) {
383
2
      buffer.drain(buffer.length());
384
2
      return;
385
2
    }
386

            
387
8
    connection_ = std::move(info.connection_);
388
8
    connection_->addConnectionCallbacks(*this);
389
8
    connection_->setConnectionStats({cluster_traffic_stats.upstream_cx_rx_bytes_total_,
390
8
                                     cluster_traffic_stats.upstream_cx_rx_bytes_buffered_,
391
8
                                     cluster_traffic_stats.upstream_cx_tx_bytes_total_,
392
8
                                     cluster_traffic_stats.upstream_cx_tx_bytes_buffered_,
393
8
                                     &cluster_traffic_stats.bind_errors_, nullptr});
394
8
    connection_->connect();
395
8
  }
396

            
397
11
  connection_->write(buffer, false);
398
11
}
399

            
400
2021
uint64_t TcpStatsdSink::TlsSink::usedBuffer() const {
401
2021
  ASSERT(current_slice_mem_ != nullptr);
402
2021
  ASSERT(current_buffer_reservation_.has_value());
403
2021
  return current_slice_mem_ - reinterpret_cast<char*>(current_buffer_reservation_->slice().mem_);
404
2021
}
405

            
406
} // namespace Statsd
407
} // namespace Common
408
} // namespace StatSinks
409
} // namespace Extensions
410
} // namespace Envoy