LCOV - code coverage report
Current view: top level - source/extensions/stat_sinks/common/statsd - statsd.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 252 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 37 0.0 %

          Line data    Source code
       1             : #include "source/extensions/stat_sinks/common/statsd/statsd.h"
       2             : 
       3             : #include <chrono>
       4             : #include <cstdint>
       5             : #include <string>
       6             : 
       7             : #include "envoy/buffer/buffer.h"
       8             : #include "envoy/common/exception.h"
       9             : #include "envoy/common/platform.h"
      10             : #include "envoy/event/dispatcher.h"
      11             : #include "envoy/stats/scope.h"
      12             : #include "envoy/upstream/cluster_manager.h"
      13             : 
      14             : #include "source/common/api/os_sys_calls_impl.h"
      15             : #include "source/common/buffer/buffer_impl.h"
      16             : #include "source/common/common/assert.h"
      17             : #include "source/common/common/fmt.h"
      18             : #include "source/common/common/utility.h"
      19             : #include "source/common/config/utility.h"
      20             : #include "source/common/network/socket_interface.h"
      21             : #include "source/common/network/utility.h"
      22             : #include "source/common/stats/symbol_table.h"
      23             : 
      24             : #include "absl/strings/str_join.h"
      25             : 
      26             : namespace Envoy {
      27             : namespace Extensions {
      28             : namespace StatSinks {
      29             : namespace Common {
      30             : namespace Statsd {
      31             : 
      32             : UdpStatsdSink::WriterImpl::WriterImpl(UdpStatsdSink& parent)
      33             :     : parent_(parent), io_handle_(Network::ioHandleForAddr(Network::Socket::Type::Datagram,
      34           0 :                                                            parent_.server_address_, {})) {}
      35             : 
      36           0 : void UdpStatsdSink::WriterImpl::write(const std::string& message) {
      37             :   // TODO(mattklein123): We can avoid this const_cast pattern by having a constant variant of
      38             :   // RawSlice. This can be fixed elsewhere as well.
      39           0 :   Buffer::RawSlice slice{const_cast<char*>(message.c_str()), message.size()};
      40           0 :   Network::Utility::writeToSocket(*io_handle_, &slice, 1, nullptr, *parent_.server_address_);
      41           0 : }
      42             : 
      43           0 : void UdpStatsdSink::WriterImpl::writeBuffer(Buffer::Instance& data) {
      44           0 :   Network::Utility::writeToSocket(*io_handle_, data, nullptr, *parent_.server_address_);
      45           0 : }
      46             : 
      47             : UdpStatsdSink::UdpStatsdSink(ThreadLocal::SlotAllocator& tls,
      48             :                              Network::Address::InstanceConstSharedPtr address, const bool use_tag,
      49             :                              const std::string& prefix, absl::optional<uint64_t> buffer_size,
      50             :                              const Statsd::TagFormat& tag_format)
      51             :     : tls_(tls.allocateSlot()), server_address_(std::move(address)), use_tag_(use_tag),
      52             :       prefix_(prefix.empty() ? Statsd::getDefaultPrefix() : prefix),
      53           0 :       buffer_size_(buffer_size.value_or(0)), tag_format_(tag_format) {
      54           0 :   tls_->set([this](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr {
      55           0 :     return std::make_shared<WriterImpl>(*this);
      56           0 :   });
      57           0 : }
      58             : 
      59           0 : void UdpStatsdSink::flush(Stats::MetricSnapshot& snapshot) {
      60           0 :   Writer& writer = tls_->getTyped<Writer>();
      61           0 :   Buffer::OwnedImpl buffer;
      62             : 
      63           0 :   for (const auto& counter : snapshot.counters()) {
      64           0 :     if (counter.counter_.get().used()) {
      65           0 :       const std::string counter_str = buildMessage(counter.counter_.get(), counter.delta_, "|c");
      66           0 :       writeBuffer(buffer, writer, counter_str);
      67           0 :     }
      68           0 :   }
      69             : 
      70           0 :   for (const auto& counter : snapshot.hostCounters()) {
      71           0 :     const std::string counter_str = buildMessage(counter, counter.delta(), "|c");
      72           0 :     writeBuffer(buffer, writer, counter_str);
      73           0 :   }
      74             : 
      75           0 :   for (const auto& gauge : snapshot.gauges()) {
      76           0 :     if (gauge.get().used()) {
      77           0 :       const std::string gauge_str = buildMessage(gauge.get(), gauge.get().value(), "|g");
      78           0 :       writeBuffer(buffer, writer, gauge_str);
      79           0 :     }
      80           0 :   }
      81             : 
      82           0 :   for (const auto& gauge : snapshot.hostGauges()) {
      83           0 :     const std::string gauge_str = buildMessage(gauge, gauge.value(), "|g");
      84           0 :     writeBuffer(buffer, writer, gauge_str);
      85           0 :   }
      86             : 
      87           0 :   flushBuffer(buffer, writer);
      88             :   // TODO(efimki): Add support of text readouts stats.
      89           0 : }
      90             : 
      91             : void UdpStatsdSink::writeBuffer(Buffer::OwnedImpl& buffer, Writer& writer,
      92           0 :                                 const std::string& statsd_metric) const {
      93           0 :   if (statsd_metric.length() >= buffer_size_) {
      94             :     // Our statsd_metric is too large to fit into the buffer, skip buffering and write directly
      95           0 :     writer.write(statsd_metric);
      96           0 :   } else {
      97           0 :     if ((buffer.length() + statsd_metric.length() + 1) > buffer_size_) {
      98             :       // If we add the new statsd_metric, we'll overflow our buffer. Flush the buffer to make
      99             :       // room for the new statsd_metric.
     100           0 :       flushBuffer(buffer, writer);
     101           0 :     } else if (buffer.length() > 0) {
     102             :       // We have room and have metrics already in the buffer, add a newline to separate
     103             :       // metric entries.
     104           0 :       buffer.add("\n");
     105           0 :     }
     106           0 :     buffer.add(statsd_metric);
     107           0 :   }
     108           0 : }
     109             : 
     110           0 : void UdpStatsdSink::flushBuffer(Buffer::OwnedImpl& buffer, Writer& writer) const {
     111           0 :   if (buffer.length() == 0) {
     112           0 :     return;
     113           0 :   }
     114           0 :   writer.writeBuffer(buffer);
     115           0 :   buffer.drain(buffer.length());
     116           0 : }
     117             : 
     118           0 : void UdpStatsdSink::onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) {
     119             :   // For statsd histograms are all timers in milliseconds, Envoy histograms are however
     120             :   // not necessarily timers in milliseconds, for Envoy histograms suffixed with their corresponding
     121             :   // SI unit symbol this is acceptable, but for histograms without a suffix, especially those which
     122             :   // are timers but record in units other than milliseconds, it may make sense to scale the value to
     123             :   // milliseconds here and potentially suffix the names accordingly (minus the pre-existing ones for
     124             :   // backwards compatibility).
     125           0 :   std::string message;
     126           0 :   if (histogram.unit() == Stats::Histogram::Unit::Percent) {
     127             :     // 32-bit floating point values should have plenty of range for these values, and are faster to
     128             :     // operate on than 64-bit doubles.
     129           0 :     constexpr float divisor = Stats::Histogram::PercentScale;
     130           0 :     const float float_value = value;
     131           0 :     const float scaled = float_value / divisor;
     132           0 :     message = buildMessage(histogram, scaled, "|h");
     133           0 :   } else {
     134           0 :     message = buildMessage(histogram, std::chrono::milliseconds(value).count(), "|ms");
     135           0 :   }
     136           0 :   tls_->getTyped<Writer>().write(message);
     137           0 : }
     138             : 
     139             : template <class StatType, typename ValueType>
     140             : const std::string UdpStatsdSink::buildMessage(const StatType& metric, ValueType value,
     141           0 :                                               const std::string& type) const {
     142           0 :   switch (tag_format_.tag_position) {
     143           0 :   case Statsd::TagPosition::TagAfterValue: {
     144           0 :     const std::string message = absl::StrCat(
     145             :         // metric name
     146           0 :         prefix_, ".", getName(metric),
     147             :         // value and type
     148           0 :         ":", value, type,
     149             :         // tags
     150           0 :         buildTagStr(metric.tags()));
     151           0 :     return message;
     152           0 :   }
     153             : 
     154           0 :   case Statsd::TagPosition::TagAfterName: {
     155           0 :     const std::string message = absl::StrCat(
     156             :         // metric name
     157           0 :         prefix_, ".", getName(metric),
     158             :         // tags
     159           0 :         buildTagStr(metric.tags()),
     160             :         // value and type
     161           0 :         ":", value, type);
     162           0 :     return message;
     163           0 :   }
     164           0 :   }
     165           0 :   PANIC_DUE_TO_CORRUPT_ENUM;
     166           0 : }
     167             : 
     168           0 : template <class StatType> const std::string UdpStatsdSink::getName(const StatType& metric) const {
     169           0 :   if (use_tag_) {
     170           0 :     return metric.tagExtractedName();
     171           0 :   } else {
     172           0 :     return metric.name();
     173           0 :   }
     174           0 : }
     175             : 
     176           0 : const std::string UdpStatsdSink::buildTagStr(const std::vector<Stats::Tag>& tags) const {
     177           0 :   if (!use_tag_ || tags.empty()) {
     178           0 :     return "";
     179           0 :   }
     180             : 
     181           0 :   std::vector<std::string> tag_strings;
     182           0 :   tag_strings.reserve(tags.size());
     183           0 :   for (const Stats::Tag& tag : tags) {
     184           0 :     tag_strings.emplace_back(tag.name_ + tag_format_.assign + tag.value_);
     185           0 :   }
     186           0 :   return tag_format_.start + absl::StrJoin(tag_strings, tag_format_.separator);
     187           0 : }
     188             : 
     189             : TcpStatsdSink::TcpStatsdSink(const LocalInfo::LocalInfo& local_info,
     190             :                              const std::string& cluster_name, ThreadLocal::SlotAllocator& tls,
     191             :                              Upstream::ClusterManager& cluster_manager, Stats::Scope& scope,
     192             :                              const std::string& prefix)
     193             :     : prefix_(prefix.empty() ? Statsd::getDefaultPrefix() : prefix), tls_(tls.allocateSlot()),
     194             :       cluster_manager_(cluster_manager),
     195             :       cx_overflow_stat_(scope.counterFromStatName(
     196           0 :           Stats::StatNameManagedStorage("statsd.cx_overflow", scope.symbolTable()).statName())) {
     197           0 :   const auto cluster = Config::Utility::checkClusterAndLocalInfo("tcp statsd", cluster_name,
     198           0 :                                                                  cluster_manager, local_info);
     199           0 :   cluster_info_ = cluster->get().info();
     200           0 :   tls_->set([this](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr {
     201           0 :     return std::make_shared<TlsSink>(*this, dispatcher);
     202           0 :   });
     203           0 : }
     204             : 
     205           0 : void TcpStatsdSink::flush(Stats::MetricSnapshot& snapshot) {
     206           0 :   TlsSink& tls_sink = tls_->getTyped<TlsSink>();
     207           0 :   tls_sink.beginFlush(true);
     208           0 :   for (const auto& counter : snapshot.counters()) {
     209           0 :     if (counter.counter_.get().used()) {
     210           0 :       tls_sink.flushCounter(counter.counter_.get().name(), counter.delta_);
     211           0 :     }
     212           0 :   }
     213             : 
     214           0 :   for (const auto& counter : snapshot.hostCounters()) {
     215           0 :     tls_sink.flushCounter(counter.name(), counter.delta());
     216           0 :   }
     217             : 
     218           0 :   for (const auto& gauge : snapshot.gauges()) {
     219           0 :     if (gauge.get().used()) {
     220           0 :       tls_sink.flushGauge(gauge.get().name(), gauge.get().value());
     221           0 :     }
     222             : 
     223           0 :     for (const auto& gauge : snapshot.hostGauges()) {
     224           0 :       tls_sink.flushGauge(gauge.name(), gauge.value());
     225           0 :     }
     226           0 :   }
     227             :   // TODO(efimki): Add support of text readouts stats.
     228           0 :   tls_sink.endFlush(true);
     229           0 : }
     230             : 
     231           0 : void TcpStatsdSink::onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) {
     232             :   // For statsd histograms are all timers except percents.
     233           0 :   if (histogram.unit() == Stats::Histogram::Unit::Percent) {
     234             :     // 32-bit floating point values should have plenty of range for these values, and are faster to
     235             :     // operate on than 64-bit doubles.
     236           0 :     constexpr float divisor = Stats::Histogram::PercentScale;
     237           0 :     const float float_value = value;
     238           0 :     const float scaled = float_value / divisor;
     239           0 :     tls_->getTyped<TlsSink>().onPercentHistogramComplete(histogram.name(), scaled);
     240           0 :   } else {
     241           0 :     tls_->getTyped<TlsSink>().onTimespanComplete(histogram.name(),
     242           0 :                                                  std::chrono::milliseconds(value));
     243           0 :   }
     244           0 : }
     245             : 
     246             : TcpStatsdSink::TlsSink::TlsSink(TcpStatsdSink& parent, Event::Dispatcher& dispatcher)
     247           0 :     : parent_(parent), dispatcher_(dispatcher) {}
     248             : 
     249           0 : TcpStatsdSink::TlsSink::~TlsSink() {
     250           0 :   if (connection_) {
     251           0 :     connection_->close(Network::ConnectionCloseType::NoFlush);
     252           0 :   }
     253           0 : }
     254             : 
     255           0 : void TcpStatsdSink::TlsSink::beginFlush(bool expect_empty_buffer) {
     256           0 :   ASSERT(!expect_empty_buffer || buffer_.length() == 0);
     257           0 :   ASSERT(current_slice_mem_ == nullptr);
     258           0 :   ASSERT(!current_buffer_reservation_.has_value());
     259             : 
     260           0 :   current_buffer_reservation_.emplace(buffer_.reserveSingleSlice(FLUSH_SLICE_SIZE_BYTES));
     261             : 
     262           0 :   ASSERT(current_buffer_reservation_->slice().len_ >= FLUSH_SLICE_SIZE_BYTES);
     263           0 :   current_slice_mem_ = reinterpret_cast<char*>(current_buffer_reservation_->slice().mem_);
     264           0 : }
     265             : 
     266           0 : void TcpStatsdSink::TlsSink::commonFlush(const std::string& name, uint64_t value, char stat_type) {
     267           0 :   ASSERT(current_slice_mem_ != nullptr);
     268             :   // 36 > 1 ("." after prefix) + 1 (":" after name) + 4 (postfix chars, e.g., "|ms\n") + 30 for
     269             :   // number (bigger than it will ever be)
     270           0 :   const uint32_t max_size = name.size() + parent_.getPrefix().size() + 36;
     271           0 :   if (current_buffer_reservation_->slice().len_ - usedBuffer() < max_size) {
     272           0 :     endFlush(false);
     273           0 :     beginFlush(false);
     274           0 :   }
     275             : 
     276             :   // Produces something like "envoy.{}:{}|c\n"
     277             :   // This written this way for maximum perf since with a large number of stats and at a high flush
     278             :   // rate this can become expensive.
     279           0 :   const char* snapped_current = current_slice_mem_;
     280           0 :   const std::string prefix = parent_.getPrefix();
     281           0 :   memcpy(current_slice_mem_, prefix.data(), prefix.size()); // NOLINT(safe-memcpy)
     282           0 :   current_slice_mem_ += prefix.size();
     283           0 :   *current_slice_mem_++ = '.';
     284           0 :   memcpy(current_slice_mem_, name.data(), name.size()); // NOLINT(safe-memcpy)
     285           0 :   current_slice_mem_ += name.size();
     286           0 :   *current_slice_mem_++ = ':';
     287           0 :   current_slice_mem_ += StringUtil::itoa(current_slice_mem_, 30, value);
     288           0 :   *current_slice_mem_++ = '|';
     289           0 :   *current_slice_mem_++ = stat_type;
     290             : 
     291           0 :   *current_slice_mem_++ = '\n';
     292             : 
     293           0 :   ASSERT(static_cast<uint64_t>(current_slice_mem_ - snapped_current) < max_size);
     294           0 : }
     295             : 
     296           0 : void TcpStatsdSink::TlsSink::flushCounter(const std::string& name, uint64_t delta) {
     297           0 :   commonFlush(name, delta, 'c');
     298           0 : }
     299             : 
     300           0 : void TcpStatsdSink::TlsSink::flushGauge(const std::string& name, uint64_t value) {
     301           0 :   commonFlush(name, value, 'g');
     302           0 : }
     303             : 
     304           0 : void TcpStatsdSink::TlsSink::endFlush(bool do_write) {
     305           0 :   ASSERT(current_slice_mem_ != nullptr);
     306           0 :   ASSERT(current_buffer_reservation_.has_value());
     307           0 :   current_buffer_reservation_->commit(usedBuffer());
     308           0 :   current_buffer_reservation_.reset();
     309           0 :   current_slice_mem_ = nullptr;
     310           0 :   if (do_write) {
     311           0 :     write(buffer_);
     312           0 :     ASSERT(buffer_.length() == 0);
     313           0 :   }
     314           0 : }
     315             : 
     316           0 : void TcpStatsdSink::TlsSink::onEvent(Network::ConnectionEvent event) {
     317           0 :   if (event == Network::ConnectionEvent::LocalClose ||
     318           0 :       event == Network::ConnectionEvent::RemoteClose) {
     319           0 :     dispatcher_.deferredDelete(std::move(connection_));
     320           0 :   }
     321           0 : }
     322             : 
     323             : void TcpStatsdSink::TlsSink::onTimespanComplete(const std::string& name,
     324           0 :                                                 std::chrono::milliseconds ms) {
     325             :   // Ultimately it would be nice to perf optimize this path also, but it's not very frequent. It's
     326             :   // also currently not possible that this interleaves with any counter/gauge flushing.
     327             :   // See the comment at UdpStatsdSink::onHistogramComplete with respect to unit suffixes.
     328           0 :   ASSERT(current_slice_mem_ == nullptr);
     329           0 :   Buffer::OwnedImpl buffer(
     330           0 :       fmt::format("{}.{}:{}|ms\n", parent_.getPrefix().c_str(), name, ms.count()));
     331           0 :   write(buffer);
     332           0 : }
     333             : 
     334           0 : void TcpStatsdSink::TlsSink::onPercentHistogramComplete(const std::string& name, float value) {
     335           0 :   ASSERT(current_slice_mem_ == nullptr);
     336           0 :   Buffer::OwnedImpl buffer(fmt::format("{}.{}:{}|h\n", parent_.getPrefix().c_str(), name, value));
     337           0 :   write(buffer);
     338           0 : }
     339             : 
     340           0 : void TcpStatsdSink::TlsSink::write(Buffer::Instance& buffer) {
     341             :   // Guard against the stats connection backing up. In this case we probably have no visibility
     342             :   // into what is going on externally, but we also increment a stat that should be viewable
     343             :   // locally.
     344             :   // NOTE: In the current implementation, we write most stats on the main thread, but timers
     345             :   //       get emitted on the worker threads. Since this is using global buffered data, it's
     346             :   //       possible that we are about to kill the connection that is not actually backed up.
     347             :   //       This is essentially a panic state, so it's not worth keeping per thread buffer stats,
     348             :   //       since if we stay over, the other threads will eventually kill their connections too.
     349             :   // TODO(mattklein123): The use of the stat is somewhat of a hack, and should be replaced with
     350             :   // real flow control callbacks once they are available.
     351           0 :   Upstream::ClusterTrafficStats& cluster_traffic_stats = *parent_.cluster_info_->trafficStats();
     352           0 :   if (cluster_traffic_stats.upstream_cx_tx_bytes_buffered_.value() > MAX_BUFFERED_STATS_BYTES) {
     353           0 :     if (connection_) {
     354           0 :       connection_->close(Network::ConnectionCloseType::NoFlush);
     355           0 :     }
     356           0 :     parent_.cx_overflow_stat_.inc();
     357           0 :     buffer.drain(buffer.length());
     358           0 :     return;
     359           0 :   }
     360             : 
     361           0 :   if (!connection_) {
     362           0 :     const auto thread_local_cluster =
     363           0 :         parent_.cluster_manager_.getThreadLocalCluster(parent_.cluster_info_->name());
     364           0 :     Upstream::Host::CreateConnectionData info;
     365           0 :     if (thread_local_cluster != nullptr) {
     366           0 :       info = thread_local_cluster->tcpConn(nullptr);
     367           0 :     }
     368           0 :     if (!info.connection_) {
     369           0 :       buffer.drain(buffer.length());
     370           0 :       return;
     371           0 :     }
     372             : 
     373           0 :     connection_ = std::move(info.connection_);
     374           0 :     connection_->addConnectionCallbacks(*this);
     375           0 :     connection_->setConnectionStats({cluster_traffic_stats.upstream_cx_rx_bytes_total_,
     376           0 :                                      cluster_traffic_stats.upstream_cx_rx_bytes_buffered_,
     377           0 :                                      cluster_traffic_stats.upstream_cx_tx_bytes_total_,
     378           0 :                                      cluster_traffic_stats.upstream_cx_tx_bytes_buffered_,
     379           0 :                                      &cluster_traffic_stats.bind_errors_, nullptr});
     380           0 :     connection_->connect();
     381           0 :   }
     382             : 
     383           0 :   connection_->write(buffer, false);
     384           0 : }
     385             : 
     386           0 : uint64_t TcpStatsdSink::TlsSink::usedBuffer() const {
     387           0 :   ASSERT(current_slice_mem_ != nullptr);
     388           0 :   ASSERT(current_buffer_reservation_.has_value());
     389           0 :   return current_slice_mem_ - reinterpret_cast<char*>(current_buffer_reservation_->slice().mem_);
     390           0 : }
     391             : 
     392             : } // namespace Statsd
     393             : } // namespace Common
     394             : } // namespace StatSinks
     395             : } // namespace Extensions
     396             : } // namespace Envoy

Generated by: LCOV version 1.15