1
#pragma once
2

            
3
#include <memory>
4

            
5
#include "envoy/config/metrics/v3/metrics_service.pb.h"
6
#include "envoy/grpc/async_client.h"
7
#include "envoy/local_info/local_info.h"
8
#include "envoy/network/connection.h"
9
#include "envoy/service/metrics/v3/metrics_service.pb.h"
10
#include "envoy/singleton/instance.h"
11
#include "envoy/stats/histogram.h"
12
#include "envoy/stats/sink.h"
13
#include "envoy/stats/stats.h"
14
#include "envoy/upstream/cluster_manager.h"
15

            
16
#include "source/common/buffer/buffer_impl.h"
17
#include "source/common/grpc/status.h"
18
#include "source/common/grpc/typed_async_client.h"
19

            
20
namespace Envoy {
21
namespace Extensions {
22
namespace StatSinks {
23
namespace MetricsService {
24

            
25
using MetricsPtr =
26
    std::unique_ptr<Envoy::Protobuf::RepeatedPtrField<io::prometheus::client::MetricFamily>>;
27
using HistogramEmitMode = envoy::config::metrics::v3::HistogramEmitMode;
28

            
29
/**
30
 * Interface for metrics streamer.
31
 */
32
template <class RequestProto, class ResponseProto>
33
class GrpcMetricsStreamer : public Grpc::AsyncStreamCallbacks<ResponseProto> {
34
public:
35
  explicit GrpcMetricsStreamer(const Grpc::RawAsyncClientSharedPtr& raw_async_client)
36
24
      : client_(raw_async_client) {}
37
24
  ~GrpcMetricsStreamer() override = default;
38

            
39
  /**
40
   * Send Metrics Message.
41
   * @param message supplies the metrics to send.
42
   */
43
  virtual void send(MetricsPtr&& metrics) PURE;
44

            
45
  // Grpc::AsyncStreamCallbacks
46
4
  void onCreateInitialMetadata(Http::RequestHeaderMap&) override {}
47
3
  void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {}
48
3
  void onReceiveMessage(std::unique_ptr<ResponseProto>&&) override {}
49
3
  void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {}
50
  void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override {};
51

            
52
protected:
53
  Grpc::AsyncStream<RequestProto> stream_{};
54
  Grpc::AsyncClient<RequestProto, ResponseProto> client_;
55
};
56

            
57
template <class RequestProto, class ResponseProto>
58
using GrpcMetricsStreamerSharedPtr =
59
    std::shared_ptr<GrpcMetricsStreamer<RequestProto, ResponseProto>>;
60

            
61
/**
62
 * Production implementation of GrpcMetricsStreamer
63
 */
64
class GrpcMetricsStreamerImpl
65
    : public Singleton::Instance,
66
      public GrpcMetricsStreamer<envoy::service::metrics::v3::StreamMetricsMessage,
67
                                 envoy::service::metrics::v3::StreamMetricsResponse>,
68
      public Logger::Loggable<Logger::Id::stats_sinks> {
69
public:
70
  GrpcMetricsStreamerImpl(Grpc::RawAsyncClientSharedPtr raw_async_client,
71
                          const LocalInfo::LocalInfo& local_info, uint32_t batch_size);
72

            
73
  // GrpcMetricsStreamer
74
  void send(MetricsPtr&& metrics) override;
75

            
76
  // Grpc::AsyncStreamCallbacks
77
4
  void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override {
78
4
    ENVOY_LOG(debug, "metric service stream closed with status: {} message: {}",
79
4
              Grpc::Utility::grpcStatusToString(status), message);
80
4
    stream_ = nullptr;
81
4
  }
82

            
83
private:
84
  void
85
  sendBatch(const Envoy::Protobuf::RepeatedPtrField<io::prometheus::client::MetricFamily>& metrics,
86
            int start_idx, int end_idx, bool send_identifier);
87

            
88
  const LocalInfo::LocalInfo& local_info_;
89
  const Protobuf::MethodDescriptor& service_method_;
90
  const uint32_t batch_size_;
91
};
92

            
93
using GrpcMetricsStreamerImplPtr = std::unique_ptr<GrpcMetricsStreamerImpl>;
94

            
95
class MetricsFlusher {
96
public:
97
  MetricsFlusher(
98
      bool report_counters_as_deltas, bool emit_labels, HistogramEmitMode histogram_emit_mode,
99
      std::function<bool(const Stats::Metric&)> predicate =
100
2054
          [](const auto& metric) { return metric.used(); })
101
15
      : report_counters_as_deltas_(report_counters_as_deltas), emit_labels_(emit_labels),
102
15
        emit_summary_(histogram_emit_mode == HistogramEmitMode::SUMMARY_AND_HISTOGRAM ||
103
15
                      histogram_emit_mode == HistogramEmitMode::SUMMARY),
104
15
        emit_histogram_(histogram_emit_mode == HistogramEmitMode::SUMMARY_AND_HISTOGRAM ||
105
15
                        histogram_emit_mode == HistogramEmitMode::HISTOGRAM),
106
15
        predicate_(predicate) {}
107

            
108
  MetricsPtr flush(Stats::MetricSnapshot& snapshot) const;
109

            
110
private:
111
  void flushCounter(io::prometheus::client::MetricFamily& metrics_family,
112
                    const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot,
113
                    int64_t snapshot_time_ms) const;
114
  void flushGauge(io::prometheus::client::MetricFamily& metrics_family, const Stats::Gauge& gauge,
115
                  int64_t snapshot_time_ms) const;
116
  void flushHistogram(io::prometheus::client::MetricFamily& metrics_family,
117
                      const Stats::ParentHistogram& envoy_histogram,
118
                      int64_t snapshot_time_ms) const;
119
  void flushSummary(io::prometheus::client::MetricFamily& metrics_family,
120
                    const Stats::ParentHistogram& envoy_histogram, int64_t snapshot_time_ms) const;
121

            
122
  io::prometheus::client::Metric*
123
  populateMetricsFamily(io::prometheus::client::MetricFamily& metrics_family,
124
                        io::prometheus::client::MetricType type, int64_t snapshot_time_ms,
125
                        const Stats::Metric& metric) const;
126

            
127
  const bool report_counters_as_deltas_;
128
  const bool emit_labels_;
129
  const bool emit_summary_;
130
  const bool emit_histogram_;
131
  const std::function<bool(const Stats::Metric&)> predicate_;
132
};
133

            
134
/**
135
 * Stat Sink that flushes metrics via a gRPC service.
136
 */
137
template <class RequestProto, class ResponseProto> class MetricsServiceSink : public Stats::Sink {
138
public:
139
  MetricsServiceSink(
140
      const GrpcMetricsStreamerSharedPtr<RequestProto, ResponseProto>& grpc_metrics_streamer,
141
      bool report_counters_as_deltas, bool emit_labels, HistogramEmitMode histogram_emit_mode)
142
12
      : MetricsServiceSink(
143
12
            grpc_metrics_streamer,
144
12
            MetricsFlusher(report_counters_as_deltas, emit_labels, histogram_emit_mode)) {}
145

            
146
  MetricsServiceSink(
147
      const GrpcMetricsStreamerSharedPtr<RequestProto, ResponseProto>& grpc_metrics_streamer,
148
      MetricsFlusher&& flusher)
149
12
      : flusher_(std::move(flusher)), grpc_metrics_streamer_(std::move(grpc_metrics_streamer)) {}
150

            
151
  // MetricsService::Sink
152
15
  void flush(Stats::MetricSnapshot& snapshot) override {
153
15
    grpc_metrics_streamer_->send(flusher_.flush(snapshot));
154
15
  }
155
25
  void onHistogramComplete(const Stats::Histogram&, uint64_t) override {}
156

            
157
private:
158
  const MetricsFlusher flusher_;
159
  GrpcMetricsStreamerSharedPtr<RequestProto, ResponseProto> grpc_metrics_streamer_;
160
};
161

            
162
} // namespace MetricsService
163
} // namespace StatSinks
164
} // namespace Extensions
165
} // namespace Envoy