Line data Source code
1 : #include "source/extensions/stat_sinks/common/statsd/statsd.h"
2 :
3 : #include <chrono>
4 : #include <cstdint>
5 : #include <string>
6 :
7 : #include "envoy/buffer/buffer.h"
8 : #include "envoy/common/exception.h"
9 : #include "envoy/common/platform.h"
10 : #include "envoy/event/dispatcher.h"
11 : #include "envoy/stats/scope.h"
12 : #include "envoy/upstream/cluster_manager.h"
13 :
14 : #include "source/common/api/os_sys_calls_impl.h"
15 : #include "source/common/buffer/buffer_impl.h"
16 : #include "source/common/common/assert.h"
17 : #include "source/common/common/fmt.h"
18 : #include "source/common/common/utility.h"
19 : #include "source/common/config/utility.h"
20 : #include "source/common/network/socket_interface.h"
21 : #include "source/common/network/utility.h"
22 : #include "source/common/stats/symbol_table.h"
23 :
24 : #include "absl/strings/str_join.h"
25 :
26 : namespace Envoy {
27 : namespace Extensions {
28 : namespace StatSinks {
29 : namespace Common {
30 : namespace Statsd {
31 :
32 : UdpStatsdSink::WriterImpl::WriterImpl(UdpStatsdSink& parent)
33 : : parent_(parent), io_handle_(Network::ioHandleForAddr(Network::Socket::Type::Datagram,
34 0 : parent_.server_address_, {})) {}
35 :
36 0 : void UdpStatsdSink::WriterImpl::write(const std::string& message) {
37 : // TODO(mattklein123): We can avoid this const_cast pattern by having a constant variant of
38 : // RawSlice. This can be fixed elsewhere as well.
39 0 : Buffer::RawSlice slice{const_cast<char*>(message.c_str()), message.size()};
40 0 : Network::Utility::writeToSocket(*io_handle_, &slice, 1, nullptr, *parent_.server_address_);
41 0 : }
42 :
43 0 : void UdpStatsdSink::WriterImpl::writeBuffer(Buffer::Instance& data) {
44 0 : Network::Utility::writeToSocket(*io_handle_, data, nullptr, *parent_.server_address_);
45 0 : }
46 :
47 : UdpStatsdSink::UdpStatsdSink(ThreadLocal::SlotAllocator& tls,
48 : Network::Address::InstanceConstSharedPtr address, const bool use_tag,
49 : const std::string& prefix, absl::optional<uint64_t> buffer_size,
50 : const Statsd::TagFormat& tag_format)
51 : : tls_(tls.allocateSlot()), server_address_(std::move(address)), use_tag_(use_tag),
52 : prefix_(prefix.empty() ? Statsd::getDefaultPrefix() : prefix),
53 0 : buffer_size_(buffer_size.value_or(0)), tag_format_(tag_format) {
54 0 : tls_->set([this](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr {
55 0 : return std::make_shared<WriterImpl>(*this);
56 0 : });
57 0 : }
58 :
59 0 : void UdpStatsdSink::flush(Stats::MetricSnapshot& snapshot) {
60 0 : Writer& writer = tls_->getTyped<Writer>();
61 0 : Buffer::OwnedImpl buffer;
62 :
63 0 : for (const auto& counter : snapshot.counters()) {
64 0 : if (counter.counter_.get().used()) {
65 0 : const std::string counter_str = buildMessage(counter.counter_.get(), counter.delta_, "|c");
66 0 : writeBuffer(buffer, writer, counter_str);
67 0 : }
68 0 : }
69 :
70 0 : for (const auto& counter : snapshot.hostCounters()) {
71 0 : const std::string counter_str = buildMessage(counter, counter.delta(), "|c");
72 0 : writeBuffer(buffer, writer, counter_str);
73 0 : }
74 :
75 0 : for (const auto& gauge : snapshot.gauges()) {
76 0 : if (gauge.get().used()) {
77 0 : const std::string gauge_str = buildMessage(gauge.get(), gauge.get().value(), "|g");
78 0 : writeBuffer(buffer, writer, gauge_str);
79 0 : }
80 0 : }
81 :
82 0 : for (const auto& gauge : snapshot.hostGauges()) {
83 0 : const std::string gauge_str = buildMessage(gauge, gauge.value(), "|g");
84 0 : writeBuffer(buffer, writer, gauge_str);
85 0 : }
86 :
87 0 : flushBuffer(buffer, writer);
88 : // TODO(efimki): Add support of text readouts stats.
89 0 : }
90 :
91 : void UdpStatsdSink::writeBuffer(Buffer::OwnedImpl& buffer, Writer& writer,
92 0 : const std::string& statsd_metric) const {
93 0 : if (statsd_metric.length() >= buffer_size_) {
94 : // Our statsd_metric is too large to fit into the buffer, skip buffering and write directly
95 0 : writer.write(statsd_metric);
96 0 : } else {
97 0 : if ((buffer.length() + statsd_metric.length() + 1) > buffer_size_) {
98 : // If we add the new statsd_metric, we'll overflow our buffer. Flush the buffer to make
99 : // room for the new statsd_metric.
100 0 : flushBuffer(buffer, writer);
101 0 : } else if (buffer.length() > 0) {
102 : // We have room and have metrics already in the buffer, add a newline to separate
103 : // metric entries.
104 0 : buffer.add("\n");
105 0 : }
106 0 : buffer.add(statsd_metric);
107 0 : }
108 0 : }
109 :
110 0 : void UdpStatsdSink::flushBuffer(Buffer::OwnedImpl& buffer, Writer& writer) const {
111 0 : if (buffer.length() == 0) {
112 0 : return;
113 0 : }
114 0 : writer.writeBuffer(buffer);
115 0 : buffer.drain(buffer.length());
116 0 : }
117 :
118 0 : void UdpStatsdSink::onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) {
119 : // For statsd histograms are all timers in milliseconds, Envoy histograms are however
120 : // not necessarily timers in milliseconds, for Envoy histograms suffixed with their corresponding
121 : // SI unit symbol this is acceptable, but for histograms without a suffix, especially those which
122 : // are timers but record in units other than milliseconds, it may make sense to scale the value to
123 : // milliseconds here and potentially suffix the names accordingly (minus the pre-existing ones for
124 : // backwards compatibility).
125 0 : std::string message;
126 0 : if (histogram.unit() == Stats::Histogram::Unit::Percent) {
127 : // 32-bit floating point values should have plenty of range for these values, and are faster to
128 : // operate on than 64-bit doubles.
129 0 : constexpr float divisor = Stats::Histogram::PercentScale;
130 0 : const float float_value = value;
131 0 : const float scaled = float_value / divisor;
132 0 : message = buildMessage(histogram, scaled, "|h");
133 0 : } else {
134 0 : message = buildMessage(histogram, std::chrono::milliseconds(value).count(), "|ms");
135 0 : }
136 0 : tls_->getTyped<Writer>().write(message);
137 0 : }
138 :
139 : template <class StatType, typename ValueType>
140 : const std::string UdpStatsdSink::buildMessage(const StatType& metric, ValueType value,
141 0 : const std::string& type) const {
142 0 : switch (tag_format_.tag_position) {
143 0 : case Statsd::TagPosition::TagAfterValue: {
144 0 : const std::string message = absl::StrCat(
145 : // metric name
146 0 : prefix_, ".", getName(metric),
147 : // value and type
148 0 : ":", value, type,
149 : // tags
150 0 : buildTagStr(metric.tags()));
151 0 : return message;
152 0 : }
153 :
154 0 : case Statsd::TagPosition::TagAfterName: {
155 0 : const std::string message = absl::StrCat(
156 : // metric name
157 0 : prefix_, ".", getName(metric),
158 : // tags
159 0 : buildTagStr(metric.tags()),
160 : // value and type
161 0 : ":", value, type);
162 0 : return message;
163 0 : }
164 0 : }
165 0 : PANIC_DUE_TO_CORRUPT_ENUM;
166 0 : }
167 :
168 0 : template <class StatType> const std::string UdpStatsdSink::getName(const StatType& metric) const {
169 0 : if (use_tag_) {
170 0 : return metric.tagExtractedName();
171 0 : } else {
172 0 : return metric.name();
173 0 : }
174 0 : }
175 :
176 0 : const std::string UdpStatsdSink::buildTagStr(const std::vector<Stats::Tag>& tags) const {
177 0 : if (!use_tag_ || tags.empty()) {
178 0 : return "";
179 0 : }
180 :
181 0 : std::vector<std::string> tag_strings;
182 0 : tag_strings.reserve(tags.size());
183 0 : for (const Stats::Tag& tag : tags) {
184 0 : tag_strings.emplace_back(tag.name_ + tag_format_.assign + tag.value_);
185 0 : }
186 0 : return tag_format_.start + absl::StrJoin(tag_strings, tag_format_.separator);
187 0 : }
188 :
189 : TcpStatsdSink::TcpStatsdSink(const LocalInfo::LocalInfo& local_info,
190 : const std::string& cluster_name, ThreadLocal::SlotAllocator& tls,
191 : Upstream::ClusterManager& cluster_manager, Stats::Scope& scope,
192 : const std::string& prefix)
193 : : prefix_(prefix.empty() ? Statsd::getDefaultPrefix() : prefix), tls_(tls.allocateSlot()),
194 : cluster_manager_(cluster_manager),
195 : cx_overflow_stat_(scope.counterFromStatName(
196 0 : Stats::StatNameManagedStorage("statsd.cx_overflow", scope.symbolTable()).statName())) {
197 0 : const auto cluster = Config::Utility::checkClusterAndLocalInfo("tcp statsd", cluster_name,
198 0 : cluster_manager, local_info);
199 0 : cluster_info_ = cluster->get().info();
200 0 : tls_->set([this](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr {
201 0 : return std::make_shared<TlsSink>(*this, dispatcher);
202 0 : });
203 0 : }
204 :
205 0 : void TcpStatsdSink::flush(Stats::MetricSnapshot& snapshot) {
206 0 : TlsSink& tls_sink = tls_->getTyped<TlsSink>();
207 0 : tls_sink.beginFlush(true);
208 0 : for (const auto& counter : snapshot.counters()) {
209 0 : if (counter.counter_.get().used()) {
210 0 : tls_sink.flushCounter(counter.counter_.get().name(), counter.delta_);
211 0 : }
212 0 : }
213 :
214 0 : for (const auto& counter : snapshot.hostCounters()) {
215 0 : tls_sink.flushCounter(counter.name(), counter.delta());
216 0 : }
217 :
218 0 : for (const auto& gauge : snapshot.gauges()) {
219 0 : if (gauge.get().used()) {
220 0 : tls_sink.flushGauge(gauge.get().name(), gauge.get().value());
221 0 : }
222 :
223 0 : for (const auto& gauge : snapshot.hostGauges()) {
224 0 : tls_sink.flushGauge(gauge.name(), gauge.value());
225 0 : }
226 0 : }
227 : // TODO(efimki): Add support of text readouts stats.
228 0 : tls_sink.endFlush(true);
229 0 : }
230 :
231 0 : void TcpStatsdSink::onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) {
232 : // For statsd histograms are all timers except percents.
233 0 : if (histogram.unit() == Stats::Histogram::Unit::Percent) {
234 : // 32-bit floating point values should have plenty of range for these values, and are faster to
235 : // operate on than 64-bit doubles.
236 0 : constexpr float divisor = Stats::Histogram::PercentScale;
237 0 : const float float_value = value;
238 0 : const float scaled = float_value / divisor;
239 0 : tls_->getTyped<TlsSink>().onPercentHistogramComplete(histogram.name(), scaled);
240 0 : } else {
241 0 : tls_->getTyped<TlsSink>().onTimespanComplete(histogram.name(),
242 0 : std::chrono::milliseconds(value));
243 0 : }
244 0 : }
245 :
246 : TcpStatsdSink::TlsSink::TlsSink(TcpStatsdSink& parent, Event::Dispatcher& dispatcher)
247 0 : : parent_(parent), dispatcher_(dispatcher) {}
248 :
249 0 : TcpStatsdSink::TlsSink::~TlsSink() {
250 0 : if (connection_) {
251 0 : connection_->close(Network::ConnectionCloseType::NoFlush);
252 0 : }
253 0 : }
254 :
255 0 : void TcpStatsdSink::TlsSink::beginFlush(bool expect_empty_buffer) {
256 0 : ASSERT(!expect_empty_buffer || buffer_.length() == 0);
257 0 : ASSERT(current_slice_mem_ == nullptr);
258 0 : ASSERT(!current_buffer_reservation_.has_value());
259 :
260 0 : current_buffer_reservation_.emplace(buffer_.reserveSingleSlice(FLUSH_SLICE_SIZE_BYTES));
261 :
262 0 : ASSERT(current_buffer_reservation_->slice().len_ >= FLUSH_SLICE_SIZE_BYTES);
263 0 : current_slice_mem_ = reinterpret_cast<char*>(current_buffer_reservation_->slice().mem_);
264 0 : }
265 :
266 0 : void TcpStatsdSink::TlsSink::commonFlush(const std::string& name, uint64_t value, char stat_type) {
267 0 : ASSERT(current_slice_mem_ != nullptr);
268 : // 36 > 1 ("." after prefix) + 1 (":" after name) + 4 (postfix chars, e.g., "|ms\n") + 30 for
269 : // number (bigger than it will ever be)
270 0 : const uint32_t max_size = name.size() + parent_.getPrefix().size() + 36;
271 0 : if (current_buffer_reservation_->slice().len_ - usedBuffer() < max_size) {
272 0 : endFlush(false);
273 0 : beginFlush(false);
274 0 : }
275 :
276 : // Produces something like "envoy.{}:{}|c\n"
277 : // This written this way for maximum perf since with a large number of stats and at a high flush
278 : // rate this can become expensive.
279 0 : const char* snapped_current = current_slice_mem_;
280 0 : const std::string prefix = parent_.getPrefix();
281 0 : memcpy(current_slice_mem_, prefix.data(), prefix.size()); // NOLINT(safe-memcpy)
282 0 : current_slice_mem_ += prefix.size();
283 0 : *current_slice_mem_++ = '.';
284 0 : memcpy(current_slice_mem_, name.data(), name.size()); // NOLINT(safe-memcpy)
285 0 : current_slice_mem_ += name.size();
286 0 : *current_slice_mem_++ = ':';
287 0 : current_slice_mem_ += StringUtil::itoa(current_slice_mem_, 30, value);
288 0 : *current_slice_mem_++ = '|';
289 0 : *current_slice_mem_++ = stat_type;
290 :
291 0 : *current_slice_mem_++ = '\n';
292 :
293 0 : ASSERT(static_cast<uint64_t>(current_slice_mem_ - snapped_current) < max_size);
294 0 : }
295 :
296 0 : void TcpStatsdSink::TlsSink::flushCounter(const std::string& name, uint64_t delta) {
297 0 : commonFlush(name, delta, 'c');
298 0 : }
299 :
300 0 : void TcpStatsdSink::TlsSink::flushGauge(const std::string& name, uint64_t value) {
301 0 : commonFlush(name, value, 'g');
302 0 : }
303 :
304 0 : void TcpStatsdSink::TlsSink::endFlush(bool do_write) {
305 0 : ASSERT(current_slice_mem_ != nullptr);
306 0 : ASSERT(current_buffer_reservation_.has_value());
307 0 : current_buffer_reservation_->commit(usedBuffer());
308 0 : current_buffer_reservation_.reset();
309 0 : current_slice_mem_ = nullptr;
310 0 : if (do_write) {
311 0 : write(buffer_);
312 0 : ASSERT(buffer_.length() == 0);
313 0 : }
314 0 : }
315 :
316 0 : void TcpStatsdSink::TlsSink::onEvent(Network::ConnectionEvent event) {
317 0 : if (event == Network::ConnectionEvent::LocalClose ||
318 0 : event == Network::ConnectionEvent::RemoteClose) {
319 0 : dispatcher_.deferredDelete(std::move(connection_));
320 0 : }
321 0 : }
322 :
323 : void TcpStatsdSink::TlsSink::onTimespanComplete(const std::string& name,
324 0 : std::chrono::milliseconds ms) {
325 : // Ultimately it would be nice to perf optimize this path also, but it's not very frequent. It's
326 : // also currently not possible that this interleaves with any counter/gauge flushing.
327 : // See the comment at UdpStatsdSink::onHistogramComplete with respect to unit suffixes.
328 0 : ASSERT(current_slice_mem_ == nullptr);
329 0 : Buffer::OwnedImpl buffer(
330 0 : fmt::format("{}.{}:{}|ms\n", parent_.getPrefix().c_str(), name, ms.count()));
331 0 : write(buffer);
332 0 : }
333 :
334 0 : void TcpStatsdSink::TlsSink::onPercentHistogramComplete(const std::string& name, float value) {
335 0 : ASSERT(current_slice_mem_ == nullptr);
336 0 : Buffer::OwnedImpl buffer(fmt::format("{}.{}:{}|h\n", parent_.getPrefix().c_str(), name, value));
337 0 : write(buffer);
338 0 : }
339 :
340 0 : void TcpStatsdSink::TlsSink::write(Buffer::Instance& buffer) {
341 : // Guard against the stats connection backing up. In this case we probably have no visibility
342 : // into what is going on externally, but we also increment a stat that should be viewable
343 : // locally.
344 : // NOTE: In the current implementation, we write most stats on the main thread, but timers
345 : // get emitted on the worker threads. Since this is using global buffered data, it's
346 : // possible that we are about to kill the connection that is not actually backed up.
347 : // This is essentially a panic state, so it's not worth keeping per thread buffer stats,
348 : // since if we stay over, the other threads will eventually kill their connections too.
349 : // TODO(mattklein123): The use of the stat is somewhat of a hack, and should be replaced with
350 : // real flow control callbacks once they are available.
351 0 : Upstream::ClusterTrafficStats& cluster_traffic_stats = *parent_.cluster_info_->trafficStats();
352 0 : if (cluster_traffic_stats.upstream_cx_tx_bytes_buffered_.value() > MAX_BUFFERED_STATS_BYTES) {
353 0 : if (connection_) {
354 0 : connection_->close(Network::ConnectionCloseType::NoFlush);
355 0 : }
356 0 : parent_.cx_overflow_stat_.inc();
357 0 : buffer.drain(buffer.length());
358 0 : return;
359 0 : }
360 :
361 0 : if (!connection_) {
362 0 : const auto thread_local_cluster =
363 0 : parent_.cluster_manager_.getThreadLocalCluster(parent_.cluster_info_->name());
364 0 : Upstream::Host::CreateConnectionData info;
365 0 : if (thread_local_cluster != nullptr) {
366 0 : info = thread_local_cluster->tcpConn(nullptr);
367 0 : }
368 0 : if (!info.connection_) {
369 0 : buffer.drain(buffer.length());
370 0 : return;
371 0 : }
372 :
373 0 : connection_ = std::move(info.connection_);
374 0 : connection_->addConnectionCallbacks(*this);
375 0 : connection_->setConnectionStats({cluster_traffic_stats.upstream_cx_rx_bytes_total_,
376 0 : cluster_traffic_stats.upstream_cx_rx_bytes_buffered_,
377 0 : cluster_traffic_stats.upstream_cx_tx_bytes_total_,
378 0 : cluster_traffic_stats.upstream_cx_tx_bytes_buffered_,
379 0 : &cluster_traffic_stats.bind_errors_, nullptr});
380 0 : connection_->connect();
381 0 : }
382 :
383 0 : connection_->write(buffer, false);
384 0 : }
385 :
386 0 : uint64_t TcpStatsdSink::TlsSink::usedBuffer() const {
387 0 : ASSERT(current_slice_mem_ != nullptr);
388 0 : ASSERT(current_buffer_reservation_.has_value());
389 0 : return current_slice_mem_ - reinterpret_cast<char*>(current_buffer_reservation_->slice().mem_);
390 0 : }
391 :
392 : } // namespace Statsd
393 : } // namespace Common
394 : } // namespace StatSinks
395 : } // namespace Extensions
396 : } // namespace Envoy
|