1
#pragma once
2

            
3
#include "envoy/buffer/buffer.h"
4
#include "envoy/common/platform.h"
5
#include "envoy/local_info/local_info.h"
6
#include "envoy/network/connection.h"
7
#include "envoy/stats/histogram.h"
8
#include "envoy/stats/scope.h"
9
#include "envoy/stats/sink.h"
10
#include "envoy/stats/stats.h"
11
#include "envoy/stats/tag.h"
12
#include "envoy/thread_local/thread_local.h"
13
#include "envoy/upstream/cluster_manager.h"
14

            
15
#include "source/common/buffer/buffer_impl.h"
16
#include "source/common/common/macros.h"
17
#include "source/common/network/io_socket_handle_impl.h"
18
#include "source/extensions/stat_sinks/common/statsd/tag_formats.h"
19

            
20
#include "absl/types/optional.h"
21

            
22
namespace Envoy {
23
namespace Extensions {
24
namespace StatSinks {
25
namespace Common {
26
namespace Statsd {
27

            
28
39
static const std::string& getDefaultPrefix() { CONSTRUCT_ON_FIRST_USE(std::string, "envoy"); }
29

            
30
/**
31
 * Implementation of Sink that writes to a UDP statsd address.
32
 */
33
class UdpStatsdSink : public Stats::Sink {
34
public:
35
  /**
36
   * Base interface for writing UDP datagrams.
37
   */
38
  class Writer : public ThreadLocal::ThreadLocalObject {
39
  public:
40
    virtual void write(const std::string& message) PURE;
41
    virtual void writeBuffer(Buffer::Instance& data) PURE;
42
  };
43

            
44
  UdpStatsdSink(ThreadLocal::SlotAllocator& tls, Network::Address::InstanceConstSharedPtr address,
45
                const bool use_tag, const std::string& prefix = getDefaultPrefix(),
46
                absl::optional<uint64_t> buffer_size = absl::nullopt,
47
                const Statsd::TagFormat& tag_format = Statsd::getDefaultTagFormat());
48
  // For testing.
49
  UdpStatsdSink(ThreadLocal::SlotAllocator& tls, const std::shared_ptr<Writer>& writer,
50
                const bool use_tag, const std::string& prefix = getDefaultPrefix(),
51
                absl::optional<uint64_t> buffer_size = absl::nullopt,
52
                const Statsd::TagFormat& tag_format = Statsd::getDefaultTagFormat())
53
10
      : tls_(tls.allocateSlot()), use_tag_(use_tag),
54
10
        prefix_(prefix.empty() ? getDefaultPrefix() : prefix),
55
10
        buffer_size_(buffer_size.value_or(0)), tag_format_(tag_format) {
56
10
    tls_->set(
57
10
        [writer](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { return writer; });
58
10
  }
59

            
60
  // Stats::Sink
61
  void flush(Stats::MetricSnapshot& snapshot) override;
62
  void onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) override;
63

            
64
3
  bool getUseTagForTest() { return use_tag_; }
65
4
  uint64_t getBufferSizeForTest() { return buffer_size_; }
66
6
  const std::string& getPrefix() { return prefix_; }
67

            
68
private:
69
  /**
70
   * This is a simple UDP localhost writer for statsd messages.
71
   */
72
  class WriterImpl : public Writer {
73
  public:
74
    WriterImpl(UdpStatsdSink& parent);
75

            
76
    // Writer
77
    void write(const std::string& message) override;
78
    void writeBuffer(Buffer::Instance& data) override;
79

            
80
  private:
81
    UdpStatsdSink& parent_;
82
    const Network::IoHandlePtr io_handle_;
83
  };
84

            
85
  void flushBuffer(Buffer::OwnedImpl& buffer, Writer& writer) const;
86
  void writeBuffer(Buffer::OwnedImpl& buffer, Writer& writer, const std::string& data) const;
87

            
88
  template <class StatType, typename ValueType>
89
  const std::string buildMessage(const StatType& metric, ValueType value,
90
                                 const std::string& type) const;
91
  template <class StatType> const std::string getName(const StatType& metric) const;
92
  const std::string buildTagStr(const std::vector<Stats::Tag>& tags) const;
93

            
94
  const ThreadLocal::SlotPtr tls_;
95
  const Network::Address::InstanceConstSharedPtr server_address_;
96
  const bool use_tag_;
97
  // Prefix for all flushed stats.
98
  const std::string prefix_;
99
  const uint64_t buffer_size_;
100
  const Statsd::TagFormat tag_format_;
101
};
102

            
103
/**
104
 * Per thread implementation of a TCP stats flusher for statsd.
105
 */
106
class TcpStatsdSink : public Stats::Sink {
107
public:
108
  /**
109
   * This is a wrapper around the constructor to return StatusOr.
110
   */
111
  static absl::StatusOr<std::unique_ptr<TcpStatsdSink>>
112
  create(const LocalInfo::LocalInfo& local_info, const std::string& cluster_name,
113
         ThreadLocal::SlotAllocator& tls, Upstream::ClusterManager& cluster_manager,
114
         Stats::Scope& scope, const std::string& prefix = getDefaultPrefix());
115

            
116
  // Stats::Sink
117
  void flush(Stats::MetricSnapshot& snapshot) override;
118
  void onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) override;
119

            
120
4028
  const std::string& getPrefix() { return prefix_; }
121

            
122
protected:
123
  TcpStatsdSink(const LocalInfo::LocalInfo& local_info, const std::string& cluster_name,
124
                ThreadLocal::SlotAllocator& tls, Upstream::ClusterManager& cluster_manager,
125
                Stats::Scope& scope, absl::Status& creation_status,
126
                const std::string& prefix = getDefaultPrefix());
127

            
128
private:
129
  struct TlsSink : public ThreadLocal::ThreadLocalObject, public Network::ConnectionCallbacks {
130
    TlsSink(TcpStatsdSink& parent, Event::Dispatcher& dispatcher);
131
    ~TlsSink() override;
132

            
133
    void beginFlush(bool expect_empty_buffer);
134
    void commonFlush(const std::string& name, uint64_t value, char stat_type);
135
    void flushCounter(const std::string& name, uint64_t delta);
136
    void flushGauge(const std::string& name, uint64_t value);
137
    void endFlush(bool do_write);
138
    void onTimespanComplete(const std::string& name, std::chrono::milliseconds ms);
139
    void onPercentHistogramComplete(const std::string& name, float value);
140
    uint64_t usedBuffer() const;
141
    void write(Buffer::Instance& buffer);
142

            
143
    // Network::ConnectionCallbacks
144
    void onEvent(Network::ConnectionEvent event) override;
145
1
    void onAboveWriteBufferHighWatermark() override {}
146
1
    void onBelowWriteBufferLowWatermark() override {}
147

            
148
    TcpStatsdSink& parent_;
149
    Event::Dispatcher& dispatcher_;
150
    Network::ClientConnectionPtr connection_;
151
    Buffer::OwnedImpl buffer_;
152
    absl::optional<Buffer::ReservationSingleSlice> current_buffer_reservation_;
153
    char* current_slice_mem_{};
154
  };
155

            
156
  // Somewhat arbitrary 16MiB limit for buffered stats.
157
  static constexpr uint32_t MAX_BUFFERED_STATS_BYTES = (1024 * 1024 * 16);
158

            
159
  // 16KiB intermediate buffer for flushing.
160
  static constexpr uint32_t FLUSH_SLICE_SIZE_BYTES = (1024 * 16);
161

            
162
  // Prefix for all flushed stats.
163
  const std::string prefix_;
164

            
165
  Upstream::ClusterInfoConstSharedPtr cluster_info_;
166
  ThreadLocal::SlotPtr tls_;
167
  Upstream::ClusterManager& cluster_manager_;
168
  Stats::Counter& cx_overflow_stat_;
169
};
170

            
171
} // namespace Statsd
172
} // namespace Common
173
} // namespace StatSinks
174
} // namespace Extensions
175
} // namespace Envoy