Line data Source code
1 : #pragma once 2 : 3 : #include "envoy/buffer/buffer.h" 4 : #include "envoy/common/platform.h" 5 : #include "envoy/local_info/local_info.h" 6 : #include "envoy/network/connection.h" 7 : #include "envoy/stats/histogram.h" 8 : #include "envoy/stats/scope.h" 9 : #include "envoy/stats/sink.h" 10 : #include "envoy/stats/stats.h" 11 : #include "envoy/stats/tag.h" 12 : #include "envoy/thread_local/thread_local.h" 13 : #include "envoy/upstream/cluster_manager.h" 14 : 15 : #include "source/common/buffer/buffer_impl.h" 16 : #include "source/common/common/macros.h" 17 : #include "source/common/network/io_socket_handle_impl.h" 18 : #include "source/extensions/stat_sinks/common/statsd/tag_formats.h" 19 : 20 : #include "absl/types/optional.h" 21 : 22 : namespace Envoy { 23 : namespace Extensions { 24 : namespace StatSinks { 25 : namespace Common { 26 : namespace Statsd { 27 : 28 0 : static const std::string& getDefaultPrefix() { CONSTRUCT_ON_FIRST_USE(std::string, "envoy"); } 29 : 30 : /** 31 : * Implementation of Sink that writes to a UDP statsd address. 32 : */ 33 : class UdpStatsdSink : public Stats::Sink { 34 : public: 35 : /** 36 : * Base interface for writing UDP datagrams. 37 : */ 38 : class Writer : public ThreadLocal::ThreadLocalObject { 39 : public: 40 : virtual void write(const std::string& message) PURE; 41 : virtual void writeBuffer(Buffer::Instance& data) PURE; 42 : }; 43 : 44 : UdpStatsdSink(ThreadLocal::SlotAllocator& tls, Network::Address::InstanceConstSharedPtr address, 45 : const bool use_tag, const std::string& prefix = getDefaultPrefix(), 46 : absl::optional<uint64_t> buffer_size = absl::nullopt, 47 : const Statsd::TagFormat& tag_format = Statsd::getDefaultTagFormat()); 48 : // For testing. 49 : UdpStatsdSink(ThreadLocal::SlotAllocator& tls, const std::shared_ptr<Writer>& writer, 50 : const bool use_tag, const std::string& prefix = getDefaultPrefix(), 51 : absl::optional<uint64_t> buffer_size = absl::nullopt, 52 : const Statsd::TagFormat& tag_format = Statsd::getDefaultTagFormat()) 53 : : tls_(tls.allocateSlot()), use_tag_(use_tag), 54 : prefix_(prefix.empty() ? getDefaultPrefix() : prefix), 55 0 : buffer_size_(buffer_size.value_or(0)), tag_format_(tag_format) { 56 0 : tls_->set( 57 0 : [writer](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { return writer; }); 58 0 : } 59 : 60 : // Stats::Sink 61 : void flush(Stats::MetricSnapshot& snapshot) override; 62 : void onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) override; 63 : 64 0 : bool getUseTagForTest() { return use_tag_; } 65 0 : uint64_t getBufferSizeForTest() { return buffer_size_; } 66 0 : const std::string& getPrefix() { return prefix_; } 67 : 68 : private: 69 : /** 70 : * This is a simple UDP localhost writer for statsd messages. 71 : */ 72 : class WriterImpl : public Writer { 73 : public: 74 : WriterImpl(UdpStatsdSink& parent); 75 : 76 : // Writer 77 : void write(const std::string& message) override; 78 : void writeBuffer(Buffer::Instance& data) override; 79 : 80 : private: 81 : UdpStatsdSink& parent_; 82 : const Network::IoHandlePtr io_handle_; 83 : }; 84 : 85 : void flushBuffer(Buffer::OwnedImpl& buffer, Writer& writer) const; 86 : void writeBuffer(Buffer::OwnedImpl& buffer, Writer& writer, const std::string& data) const; 87 : 88 : template <class StatType, typename ValueType> 89 : const std::string buildMessage(const StatType& metric, ValueType value, 90 : const std::string& type) const; 91 : template <class StatType> const std::string getName(const StatType& metric) const; 92 : const std::string buildTagStr(const std::vector<Stats::Tag>& tags) const; 93 : 94 : const ThreadLocal::SlotPtr tls_; 95 : const Network::Address::InstanceConstSharedPtr server_address_; 96 : const bool use_tag_; 97 : // Prefix for all flushed stats. 98 : const std::string prefix_; 99 : const uint64_t buffer_size_; 100 : const Statsd::TagFormat tag_format_; 101 : }; 102 : 103 : /** 104 : * Per thread implementation of a TCP stats flusher for statsd. 105 : */ 106 : class TcpStatsdSink : public Stats::Sink { 107 : public: 108 : TcpStatsdSink(const LocalInfo::LocalInfo& local_info, const std::string& cluster_name, 109 : ThreadLocal::SlotAllocator& tls, Upstream::ClusterManager& cluster_manager, 110 : Stats::Scope& scope, const std::string& prefix = getDefaultPrefix()); 111 : 112 : // Stats::Sink 113 : void flush(Stats::MetricSnapshot& snapshot) override; 114 : void onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) override; 115 : 116 0 : const std::string& getPrefix() { return prefix_; } 117 : 118 : private: 119 : struct TlsSink : public ThreadLocal::ThreadLocalObject, public Network::ConnectionCallbacks { 120 : TlsSink(TcpStatsdSink& parent, Event::Dispatcher& dispatcher); 121 : ~TlsSink() override; 122 : 123 : void beginFlush(bool expect_empty_buffer); 124 : void commonFlush(const std::string& name, uint64_t value, char stat_type); 125 : void flushCounter(const std::string& name, uint64_t delta); 126 : void flushGauge(const std::string& name, uint64_t value); 127 : void endFlush(bool do_write); 128 : void onTimespanComplete(const std::string& name, std::chrono::milliseconds ms); 129 : void onPercentHistogramComplete(const std::string& name, float value); 130 : uint64_t usedBuffer() const; 131 : void write(Buffer::Instance& buffer); 132 : 133 : // Network::ConnectionCallbacks 134 : void onEvent(Network::ConnectionEvent event) override; 135 0 : void onAboveWriteBufferHighWatermark() override {} 136 0 : void onBelowWriteBufferLowWatermark() override {} 137 : 138 : TcpStatsdSink& parent_; 139 : Event::Dispatcher& dispatcher_; 140 : Network::ClientConnectionPtr connection_; 141 : Buffer::OwnedImpl buffer_; 142 : absl::optional<Buffer::ReservationSingleSlice> current_buffer_reservation_; 143 : char* current_slice_mem_{}; 144 : }; 145 : 146 : // Somewhat arbitrary 16MiB limit for buffered stats. 147 : static constexpr uint32_t MAX_BUFFERED_STATS_BYTES = (1024 * 1024 * 16); 148 : 149 : // 16KiB intermediate buffer for flushing. 150 : static constexpr uint32_t FLUSH_SLICE_SIZE_BYTES = (1024 * 16); 151 : 152 : // Prefix for all flushed stats. 153 : const std::string prefix_; 154 : 155 : Upstream::ClusterInfoConstSharedPtr cluster_info_; 156 : ThreadLocal::SlotPtr tls_; 157 : Upstream::ClusterManager& cluster_manager_; 158 : Stats::Counter& cx_overflow_stat_; 159 : }; 160 : 161 : } // namespace Statsd 162 : } // namespace Common 163 : } // namespace StatSinks 164 : } // namespace Extensions 165 : } // namespace Envoy