1
#pragma once
2

            
3
#include <map>
4
#include <memory>
5
#include <vector>
6

            
7
#include "envoy/server/admin.h"
8
#include "envoy/server/instance.h"
9
#include "envoy/stats/histogram.h"
10
#include "envoy/stats/sink.h"
11

            
12
#include "source/common/stats/symbol_table.h"
13

            
14
namespace Envoy {
15
namespace Extensions {
16
namespace StatSinks {
17
namespace Hystrix {
18

            
19
using RollingWindow = std::vector<uint64_t>;
20
using RollingStatsMap = std::map<const std::string, RollingWindow>;
21

            
22
using QuantileLatencyMap = absl::node_hash_map<double, double>;
23
static const std::vector<double> hystrix_quantiles = {0,    0.25, 0.5,   0.75, 0.90,
24
                                                      0.95, 0.99, 0.995, 1};
25

            
26
static const struct {
27
  absl::string_view AllowHeadersHystrix{"Accept, Cache-Control, X-Requested-With, Last-Event-ID"};
28
} AccessControlAllowHeadersValue;
29

            
30
struct ClusterStatsCache {
31
  ClusterStatsCache(const std::string& cluster_name);
32

            
33
  void printToStream(std::stringstream& out_str);
34
  void printRollingWindow(absl::string_view name, RollingWindow rolling_window,
35
                          std::stringstream& out_str);
36
  std::string cluster_name_;
37

            
38
  // Rolling windows
39
  RollingWindow errors_;
40
  RollingWindow success_;
41
  RollingWindow total_;
42
  RollingWindow timeouts_;
43
  RollingWindow rejected_;
44
};
45

            
46
using ClusterStatsCachePtr = std::unique_ptr<ClusterStatsCache>;
47

            
48
class HystrixSink : public Stats::Sink, public Logger::Loggable<Logger::Id::hystrix> {
49
public:
50
  HystrixSink(Server::Configuration::ServerFactoryContext& server, uint64_t num_buckets);
51
  Http::Code handlerHystrixEventStream(Http::ResponseHeaderMap& response_headers, Buffer::Instance&,
52
                                       Server::AdminStream& admin_stream);
53
  void flush(Stats::MetricSnapshot& snapshot) override;
54
12
  void onHistogramComplete(const Stats::Histogram&, uint64_t) override {};
55

            
56
  /**
57
   * Register a new connection.
58
   */
59
  void registerConnection(Http::StreamDecoderFilterCallbacks* callbacks_to_register);
60

            
61
  /**
62
   * Remove registered connection.
63
   */
64
  void unregisterConnection(Http::StreamDecoderFilterCallbacks* callbacks_to_remove);
65

            
66
  /**
67
   * Add new value to top of rolling window, pushing out the oldest value.
68
   */
69
  void pushNewValue(RollingWindow& rolling_window, uint64_t value);
70

            
71
  /**
72
   * Increment pointer of next value to add to rolling window.
73
   */
74
48
  void incCounter() { current_index_ = (current_index_ + 1) % window_size_; }
75

            
76
  /**
77
   * Generate the streams to be sent to hystrix dashboard.
78
   */
79
  void addClusterStatsToStream(ClusterStatsCache& cluster_stats_cache,
80
                               absl::string_view cluster_name, uint64_t max_concurrent_requests,
81
                               uint64_t reporting_hosts,
82
                               std::chrono::milliseconds rolling_window_ms,
83
                               const QuantileLatencyMap& histogram, std::stringstream& ss);
84

            
85
  /**
86
   * Calculate values needed to create the stream and write into the map.
87
   */
88
  void updateRollingWindowMap(const Upstream::ClusterInfo& cluster_info,
89
                              ClusterStatsCache& cluster_stats_cache);
90
  /**
91
   * Clear map.
92
   */
93
  void resetRollingWindow();
94

            
95
  /**
96
   * Return string representing current state of the map. for DEBUG.
97
   */
98
  const std::string printRollingWindows();
99

            
100
  /**
101
   * Get the statistic's value change over the rolling window time frame.
102
   */
103
  uint64_t getRollingValue(RollingWindow rolling_window);
104

            
105
  /**
106
   * Format the given key and value to "key"=value, and adding to the stringstream.
107
   */
108
  static void addInfoToStream(absl::string_view key, absl::string_view value,
109
                              std::stringstream& info, bool is_first = false);
110

            
111
  /**
112
   * Format the given key and double value to "key"=<string of uint64_t>, and adding to the
113
   * stringstream.
114
   */
115
  static void addDoubleToStream(absl::string_view key, double value, std::stringstream& info,
116
                                bool is_first);
117

            
118
  /**
119
   * Format the given key and absl::string_view value to "key"="value", and adding to the
120
   * stringstream.
121
   */
122
  static void addStringToStream(absl::string_view key, absl::string_view value,
123
                                std::stringstream& info, bool is_first = false);
124

            
125
  /**
126
   * Format the given key and uint64_t value to "key"=<string of uint64_t>, and adding to the
127
   * stringstream.
128
   */
129
  static void addIntToStream(absl::string_view key, uint64_t value, std::stringstream& info,
130
                             bool is_first = false);
131

            
132
  static void addHistogramToStream(const QuantileLatencyMap& latency_map, absl::string_view key,
133
                                   std::stringstream& ss);
134

            
135
private:
136
  /**
137
   * Generate HystrixCommand event stream.
138
   */
139
  void addHystrixCommand(ClusterStatsCache& cluster_stats_cache, absl::string_view cluster_name,
140
                         uint64_t max_concurrent_requests, uint64_t reporting_hosts,
141
                         std::chrono::milliseconds rolling_window_ms,
142
                         const QuantileLatencyMap& histogram, std::stringstream& ss);
143

            
144
  /**
145
   * Generate HystrixThreadPool event stream.
146
   */
147
  void addHystrixThreadPool(absl::string_view cluster_name, uint64_t queue_size,
148
                            uint64_t reporting_hosts, std::chrono::milliseconds rolling_window_ms,
149
                            std::stringstream& ss);
150

            
151
  std::vector<Http::StreamDecoderFilterCallbacks*> callbacks_list_;
152
  Server::Configuration::ServerFactoryContext& server_;
153
  uint64_t current_index_;
154
  const uint64_t window_size_;
155
  static const uint64_t DEFAULT_NUM_BUCKETS = 10;
156

            
157
  // Map from cluster names to a struct of all of that cluster's stat windows.
158
  absl::node_hash_map<std::string, ClusterStatsCachePtr> cluster_stats_cache_map_;
159

            
160
  // Saved StatNames for fast comparisons in loop.
161
  // TODO(mattklein123): Many/all of these stats should just be pulled directly from the cluster
162
  // stats directly. This needs some cleanup.
163
  Stats::StatNamePool stat_name_pool_;
164
  const Stats::StatName cluster_name_;
165
  const Stats::StatName cluster_upstream_rq_time_;
166
  const Stats::StatName membership_total_;
167
  const Stats::StatName retry_upstream_rq_4xx_;
168
  const Stats::StatName retry_upstream_rq_5xx_;
169
  const Stats::StatName upstream_rq_2xx_;
170
  const Stats::StatName upstream_rq_4xx_;
171
  const Stats::StatName upstream_rq_5xx_;
172
};
173

            
174
using HystrixSinkPtr = std::unique_ptr<HystrixSink>;
175

            
176
} // namespace Hystrix
177
} // namespace StatSinks
178
} // namespace Extensions
179
} // namespace Envoy