LCOV - code coverage report
Current view: top level - source/extensions/stat_sinks/hystrix - hystrix.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 288 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 22 0.0 %

          Line data    Source code
       1             : #include "source/extensions/stat_sinks/hystrix/hystrix.h"
       2             : 
       3             : #include <chrono>
       4             : #include <ctime>
       5             : #include <iostream>
       6             : #include <sstream>
       7             : 
       8             : #include "envoy/stats/scope.h"
       9             : 
      10             : #include "source/common/buffer/buffer_impl.h"
      11             : #include "source/common/common/logger.h"
      12             : #include "source/common/config/well_known_names.h"
      13             : #include "source/common/http/headers.h"
      14             : #include "source/common/stats/utility.h"
      15             : 
      16             : #include "absl/strings/str_cat.h"
      17             : #include "absl/strings/str_split.h"
      18             : #include "fmt/printf.h"
      19             : 
      20             : namespace Envoy {
      21             : namespace Extensions {
      22             : namespace StatSinks {
      23             : namespace Hystrix {
      24             : 
      25             : Http::RegisterCustomInlineHeader<Http::CustomInlineHeaderRegistry::Type::ResponseHeaders>
      26             :     access_control_allow_origin_handle(Http::CustomHeaders::get().AccessControlAllowOrigin);
      27             : Http::RegisterCustomInlineHeader<Http::CustomInlineHeaderRegistry::Type::ResponseHeaders>
      28             :     access_control_allow_headers_handle(Http::CustomHeaders::get().AccessControlAllowHeaders);
      29             : Http::RegisterCustomInlineHeader<Http::CustomInlineHeaderRegistry::Type::ResponseHeaders>
      30             :     cache_control_handle(Http::CustomHeaders::get().CacheControl);
      31             : 
      32             : const uint64_t HystrixSink::DEFAULT_NUM_BUCKETS;
      33             : ClusterStatsCache::ClusterStatsCache(const std::string& cluster_name)
      34           0 :     : cluster_name_(cluster_name) {}
      35             : 
      36           0 : void ClusterStatsCache::printToStream(std::stringstream& out_str) {
      37           0 :   const std::string cluster_name_prefix = absl::StrCat(cluster_name_, ".");
      38             : 
      39           0 :   printRollingWindow(absl::StrCat(cluster_name_prefix, "success"), success_, out_str);
      40           0 :   printRollingWindow(absl::StrCat(cluster_name_prefix, "errors"), errors_, out_str);
      41           0 :   printRollingWindow(absl::StrCat(cluster_name_prefix, "timeouts"), timeouts_, out_str);
      42           0 :   printRollingWindow(absl::StrCat(cluster_name_prefix, "rejected"), rejected_, out_str);
      43           0 :   printRollingWindow(absl::StrCat(cluster_name_prefix, "total"), total_, out_str);
      44           0 : }
      45             : 
      46             : void ClusterStatsCache::printRollingWindow(absl::string_view name, RollingWindow rolling_window,
      47           0 :                                            std::stringstream& out_str) {
      48           0 :   out_str << name << " | ";
      49           0 :   for (uint64_t& specific_stat_vec_itr : rolling_window) {
      50           0 :     out_str << specific_stat_vec_itr << " | ";
      51           0 :   }
      52           0 :   out_str << std::endl;
      53           0 : }
      54             : 
      55             : void HystrixSink::addHistogramToStream(const QuantileLatencyMap& latency_map, absl::string_view key,
      56           0 :                                        std::stringstream& ss) {
      57             :   // TODO: Consider if we better use join here
      58           0 :   ss << ", \"" << key << "\": {";
      59           0 :   bool is_first = true;
      60           0 :   for (const auto& element : latency_map) {
      61           0 :     const std::string quantile = fmt::sprintf("%g", element.first * 100);
      62           0 :     HystrixSink::addDoubleToStream(quantile, element.second, ss, is_first);
      63           0 :     is_first = false;
      64           0 :   }
      65           0 :   ss << "}";
      66           0 : }
      67             : 
      68             : // Add new value to rolling window, in place of oldest one.
      69           0 : void HystrixSink::pushNewValue(RollingWindow& rolling_window, uint64_t value) {
      70           0 :   if (rolling_window.empty()) {
      71           0 :     rolling_window.resize(window_size_, value);
      72           0 :   } else {
      73           0 :     rolling_window[current_index_] = value;
      74           0 :   }
      75           0 : }
      76             : 
      77           0 : uint64_t HystrixSink::getRollingValue(RollingWindow rolling_window) {
      78             : 
      79           0 :   if (rolling_window.empty()) {
      80           0 :     return 0;
      81           0 :   }
      82             :   // If the counter was reset, the result is negative
      83             :   // better return 0, will be back to normal once one rolling window passes.
      84           0 :   if (rolling_window[current_index_] < rolling_window[(current_index_ + 1) % window_size_]) {
      85           0 :     return 0;
      86           0 :   } else {
      87           0 :     return rolling_window[current_index_] - rolling_window[(current_index_ + 1) % window_size_];
      88           0 :   }
      89           0 : }
      90             : 
      91             : void HystrixSink::updateRollingWindowMap(const Upstream::ClusterInfo& cluster_info,
      92           0 :                                          ClusterStatsCache& cluster_stats_cache) {
      93           0 :   Upstream::ClusterTrafficStats& cluster_stats = *cluster_info.trafficStats();
      94           0 :   Stats::Scope& cluster_stats_scope = cluster_info.statsScope();
      95             : 
      96             :   // Combining timeouts+retries - retries are counted  as separate requests
      97             :   // (alternative: each request including the retries counted as 1).
      98           0 :   uint64_t timeouts = cluster_stats.upstream_rq_timeout_.value() +
      99           0 :                       cluster_stats.upstream_rq_per_try_timeout_.value();
     100             : 
     101           0 :   pushNewValue(cluster_stats_cache.timeouts_, timeouts);
     102             : 
     103             :   // Combining errors+retry errors - retries are counted as separate requests
     104             :   // (alternative: each request including the retries counted as 1)
     105             :   // since timeouts are 504 (or 408), deduce them from here ("-" sign).
     106             :   // Timeout retries were not counted here anyway.
     107           0 :   uint64_t errors = cluster_stats_scope.counterFromStatName(upstream_rq_5xx_).value() +
     108           0 :                     cluster_stats_scope.counterFromStatName(retry_upstream_rq_5xx_).value() +
     109           0 :                     cluster_stats_scope.counterFromStatName(upstream_rq_4xx_).value() +
     110           0 :                     cluster_stats_scope.counterFromStatName(retry_upstream_rq_4xx_).value() -
     111           0 :                     cluster_stats.upstream_rq_timeout_.value();
     112             : 
     113           0 :   pushNewValue(cluster_stats_cache.errors_, errors);
     114             : 
     115           0 :   uint64_t success = cluster_stats_scope.counterFromStatName(upstream_rq_2xx_).value();
     116           0 :   pushNewValue(cluster_stats_cache.success_, success);
     117             : 
     118           0 :   uint64_t rejected = cluster_stats.upstream_rq_pending_overflow_.value();
     119           0 :   pushNewValue(cluster_stats_cache.rejected_, rejected);
     120             : 
     121             :   // should not take from upstream_rq_total since it is updated before its components,
     122             :   // leading to wrong results such as error percentage higher than 100%
     123           0 :   uint64_t total = errors + timeouts + success + rejected;
     124           0 :   pushNewValue(cluster_stats_cache.total_, total);
     125             : 
     126           0 :   ENVOY_LOG(trace, "{}", printRollingWindows());
     127           0 : }
     128             : 
     129           0 : void HystrixSink::resetRollingWindow() { cluster_stats_cache_map_.clear(); }
     130             : 
     131             : void HystrixSink::addStringToStream(absl::string_view key, absl::string_view value,
     132           0 :                                     std::stringstream& info, bool is_first) {
     133           0 :   std::string quoted_value = absl::StrCat("\"", value, "\"");
     134           0 :   addInfoToStream(key, quoted_value, info, is_first);
     135           0 : }
     136             : 
     137             : void HystrixSink::addIntToStream(absl::string_view key, uint64_t value, std::stringstream& info,
     138           0 :                                  bool is_first) {
     139           0 :   addInfoToStream(key, std::to_string(value), info, is_first);
     140           0 : }
     141             : 
     142             : void HystrixSink::addDoubleToStream(absl::string_view key, double value, std::stringstream& info,
     143           0 :                                     bool is_first) {
     144           0 :   addInfoToStream(key, std::to_string(value), info, is_first);
     145           0 : }
     146             : 
     147             : void HystrixSink::addInfoToStream(absl::string_view key, absl::string_view value,
     148           0 :                                   std::stringstream& info, bool is_first) {
     149           0 :   if (!is_first) {
     150           0 :     info << ", ";
     151           0 :   }
     152           0 :   std::string added_info = absl::StrCat("\"", key, "\": ", value);
     153           0 :   info << added_info;
     154           0 : }
     155             : 
     156             : void HystrixSink::addHystrixCommand(ClusterStatsCache& cluster_stats_cache,
     157             :                                     absl::string_view cluster_name,
     158             :                                     uint64_t max_concurrent_requests, uint64_t reporting_hosts,
     159             :                                     std::chrono::milliseconds rolling_window_ms,
     160           0 :                                     const QuantileLatencyMap& histogram, std::stringstream& ss) {
     161             : 
     162           0 :   std::time_t currentTime = std::chrono::system_clock::to_time_t(server_.timeSource().systemTime());
     163             : 
     164           0 :   ss << "data: {";
     165           0 :   addStringToStream("type", "HystrixCommand", ss, true);
     166           0 :   addStringToStream("name", cluster_name, ss);
     167           0 :   addStringToStream("group", "NA", ss);
     168           0 :   addIntToStream("currentTime", static_cast<uint64_t>(currentTime), ss);
     169           0 :   addInfoToStream("isCircuitBreakerOpen", "false", ss);
     170             : 
     171           0 :   uint64_t errors = getRollingValue(cluster_stats_cache.errors_);
     172           0 :   uint64_t timeouts = getRollingValue(cluster_stats_cache.timeouts_);
     173           0 :   uint64_t rejected = getRollingValue(cluster_stats_cache.rejected_);
     174           0 :   uint64_t total = getRollingValue(cluster_stats_cache.total_);
     175             : 
     176           0 :   uint64_t error_rate = total == 0 ? 0 : (100 * (errors + timeouts + rejected)) / total;
     177             : 
     178           0 :   addIntToStream("errorPercentage", error_rate, ss);
     179           0 :   addIntToStream("errorCount", errors, ss);
     180           0 :   addIntToStream("requestCount", total, ss);
     181           0 :   addIntToStream("rollingCountCollapsedRequests", 0, ss);
     182           0 :   addIntToStream("rollingCountExceptionsThrown", 0, ss);
     183           0 :   addIntToStream("rollingCountFailure", errors, ss);
     184           0 :   addIntToStream("rollingCountFallbackFailure", 0, ss);
     185           0 :   addIntToStream("rollingCountFallbackRejection", 0, ss);
     186           0 :   addIntToStream("rollingCountFallbackSuccess", 0, ss);
     187           0 :   addIntToStream("rollingCountResponsesFromCache", 0, ss);
     188             : 
     189             :   // Envoy's "circuit breaker" has similar meaning to hystrix's isolation
     190             :   // so we count upstream_rq_pending_overflow and present it as rollingCountSemaphoreRejected
     191           0 :   addIntToStream("rollingCountSemaphoreRejected", rejected, ss);
     192             : 
     193             :   // Hystrix's short circuit is not similar to Envoy's since it is triggered by 503 responses
     194             :   // there is no parallel counter in Envoy since as a result of errors (outlier detection)
     195             :   // requests are not rejected, but rather the node is removed from load balancer healthy pool.
     196           0 :   addIntToStream("rollingCountShortCircuited", 0, ss);
     197           0 :   addIntToStream("rollingCountSuccess", getRollingValue(cluster_stats_cache.success_), ss);
     198           0 :   addIntToStream("rollingCountThreadPoolRejected", 0, ss);
     199           0 :   addIntToStream("rollingCountTimeout", timeouts, ss);
     200           0 :   addIntToStream("rollingCountBadRequests", 0, ss);
     201           0 :   addIntToStream("currentConcurrentExecutionCount", 0, ss);
     202           0 :   addStringToStream("latencyExecute_mean", "null", ss);
     203           0 :   addHistogramToStream(histogram, "latencyExecute", ss);
     204           0 :   addIntToStream("propertyValue_circuitBreakerRequestVolumeThreshold", 0, ss);
     205           0 :   addIntToStream("propertyValue_circuitBreakerSleepWindowInMilliseconds", 0, ss);
     206           0 :   addIntToStream("propertyValue_circuitBreakerErrorThresholdPercentage", 0, ss);
     207           0 :   addInfoToStream("propertyValue_circuitBreakerForceOpen", "false", ss);
     208           0 :   addInfoToStream("propertyValue_circuitBreakerForceClosed", "true", ss);
     209           0 :   addStringToStream("propertyValue_executionIsolationStrategy", "SEMAPHORE", ss);
     210           0 :   addIntToStream("propertyValue_executionIsolationThreadTimeoutInMilliseconds", 0, ss);
     211           0 :   addInfoToStream("propertyValue_executionIsolationThreadInterruptOnTimeout", "false", ss);
     212           0 :   addIntToStream("propertyValue_executionIsolationSemaphoreMaxConcurrentRequests",
     213           0 :                  max_concurrent_requests, ss);
     214           0 :   addIntToStream("propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests", 0, ss);
     215           0 :   addInfoToStream("propertyValue_requestCacheEnabled", "false", ss);
     216           0 :   addInfoToStream("propertyValue_requestLogEnabled", "true", ss);
     217           0 :   addIntToStream("reportingHosts", reporting_hosts, ss);
     218           0 :   addIntToStream("propertyValue_metricsRollingStatisticalWindowInMilliseconds",
     219           0 :                  rolling_window_ms.count(), ss);
     220             : 
     221           0 :   ss << "}" << std::endl << std::endl;
     222           0 : }
     223             : 
     224             : void HystrixSink::addHystrixThreadPool(absl::string_view cluster_name, uint64_t queue_size,
     225             :                                        uint64_t reporting_hosts,
     226             :                                        std::chrono::milliseconds rolling_window_ms,
     227           0 :                                        std::stringstream& ss) {
     228             : 
     229           0 :   ss << "data: {";
     230           0 :   addIntToStream("currentPoolSize", 0, ss, true);
     231           0 :   addIntToStream("rollingMaxActiveThreads", 0, ss);
     232           0 :   addIntToStream("currentActiveCount", 0, ss);
     233           0 :   addIntToStream("currentCompletedTaskCount", 0, ss);
     234           0 :   addIntToStream("propertyValue_queueSizeRejectionThreshold", queue_size, ss);
     235           0 :   addStringToStream("type", "HystrixThreadPool", ss);
     236           0 :   addIntToStream("reportingHosts", reporting_hosts, ss);
     237           0 :   addIntToStream("propertyValue_metricsRollingStatisticalWindowInMilliseconds",
     238           0 :                  rolling_window_ms.count(), ss);
     239           0 :   addStringToStream("name", cluster_name, ss);
     240           0 :   addIntToStream("currentLargestPoolSize", 0, ss);
     241           0 :   addIntToStream("currentCorePoolSize", 0, ss);
     242           0 :   addIntToStream("currentQueueSize", 0, ss);
     243           0 :   addIntToStream("currentTaskCount", 0, ss);
     244           0 :   addIntToStream("rollingCountThreadsExecuted", 0, ss);
     245           0 :   addIntToStream("currentMaximumPoolSize", 0, ss);
     246             : 
     247           0 :   ss << "}" << std::endl << std::endl;
     248           0 : }
     249             : 
     250             : void HystrixSink::addClusterStatsToStream(ClusterStatsCache& cluster_stats_cache,
     251             :                                           absl::string_view cluster_name,
     252             :                                           uint64_t max_concurrent_requests,
     253             :                                           uint64_t reporting_hosts,
     254             :                                           std::chrono::milliseconds rolling_window_ms,
     255             :                                           const QuantileLatencyMap& histogram,
     256           0 :                                           std::stringstream& ss) {
     257             : 
     258           0 :   addHystrixCommand(cluster_stats_cache, cluster_name, max_concurrent_requests, reporting_hosts,
     259           0 :                     rolling_window_ms, histogram, ss);
     260           0 :   addHystrixThreadPool(cluster_name, max_concurrent_requests, reporting_hosts, rolling_window_ms,
     261           0 :                        ss);
     262           0 : }
     263             : 
     264           0 : const std::string HystrixSink::printRollingWindows() {
     265           0 :   std::stringstream out_str;
     266             : 
     267           0 :   for (auto& itr : cluster_stats_cache_map_) {
     268           0 :     ClusterStatsCache& cluster_stats_cache = *(itr.second);
     269           0 :     cluster_stats_cache.printToStream(out_str);
     270           0 :   }
     271           0 :   return out_str.str();
     272           0 : }
     273             : 
     274             : HystrixSink::HystrixSink(Server::Configuration::ServerFactoryContext& server,
     275             :                          const uint64_t num_buckets)
     276             :     : server_(server), current_index_(num_buckets > 0 ? num_buckets : DEFAULT_NUM_BUCKETS),
     277             :       window_size_(current_index_ + 1), stat_name_pool_(server.scope().symbolTable()),
     278             :       cluster_name_(stat_name_pool_.add(Config::TagNames::get().CLUSTER_NAME)),
     279             :       cluster_upstream_rq_time_(stat_name_pool_.add("cluster.upstream_rq_time")),
     280             :       membership_total_(stat_name_pool_.add("membership_total")),
     281             :       retry_upstream_rq_4xx_(stat_name_pool_.add("retry.upstream_rq_4xx")),
     282             :       retry_upstream_rq_5xx_(stat_name_pool_.add("retry.upstream_rq_5xx")),
     283             :       upstream_rq_2xx_(stat_name_pool_.add("upstream_rq_2xx")),
     284             :       upstream_rq_4xx_(stat_name_pool_.add("upstream_rq_4xx")),
     285           0 :       upstream_rq_5xx_(stat_name_pool_.add("upstream_rq_5xx")) {
     286           0 :   if (!server.admin().has_value()) {
     287           0 :     return;
     288           0 :   }
     289           0 :   ENVOY_LOG(debug,
     290           0 :             "adding hystrix_event_stream endpoint to enable connection to hystrix dashboard");
     291           0 :   server.admin()->addHandler("/hystrix_event_stream", "send hystrix event stream",
     292           0 :                              MAKE_ADMIN_HANDLER(handlerHystrixEventStream), false, false);
     293           0 : }
     294             : 
     295             : Http::Code HystrixSink::handlerHystrixEventStream(Http::ResponseHeaderMap& response_headers,
     296             :                                                   Buffer::Instance&,
     297           0 :                                                   Server::AdminStream& admin_stream) {
     298             : 
     299           0 :   response_headers.setReferenceContentType(Http::Headers::get().ContentTypeValues.TextEventStream);
     300           0 :   response_headers.setReferenceInline(cache_control_handle.handle(),
     301           0 :                                       Http::CustomHeaders::get().CacheControlValues.NoCache);
     302           0 :   response_headers.setReferenceConnection(Http::Headers::get().ConnectionValues.Close);
     303           0 :   response_headers.setReferenceInline(access_control_allow_headers_handle.handle(),
     304           0 :                                       AccessControlAllowHeadersValue.AllowHeadersHystrix);
     305           0 :   response_headers.setReferenceInline(access_control_allow_origin_handle.handle(),
     306           0 :                                       Http::CustomHeaders::get().AccessControlAllowOriginValue.All);
     307             : 
     308           0 :   Http::StreamDecoderFilterCallbacks& stream_decoder_filter_callbacks =
     309           0 :       admin_stream.getDecoderFilterCallbacks();
     310             : 
     311             :   // Disable chunk-encoding in HTTP/1.x.
     312           0 :   if (stream_decoder_filter_callbacks.streamInfo().protocol() < Http::Protocol::Http2) {
     313           0 :     admin_stream.http1StreamEncoderOptions().value().get().disableChunkEncoding();
     314           0 :   }
     315             : 
     316           0 :   registerConnection(&stream_decoder_filter_callbacks);
     317             : 
     318           0 :   admin_stream.setEndStreamOnComplete(false); // set streaming
     319             : 
     320             :   // Separated out just so it's easier to understand
     321           0 :   auto on_destroy_callback = [this, &stream_decoder_filter_callbacks]() {
     322           0 :     ENVOY_LOG(debug, "stopped sending data to hystrix dashboard on port {}",
     323           0 :               stream_decoder_filter_callbacks.connection()
     324           0 :                   ->connectionInfoProvider()
     325           0 :                   .remoteAddress()
     326           0 :                   ->asString());
     327             : 
     328             :     // Unregister the callbacks from the sink so data is no longer encoded through them.
     329           0 :     unregisterConnection(&stream_decoder_filter_callbacks);
     330           0 :   };
     331             : 
     332             :   // Add the callback to the admin_filter list of callbacks
     333           0 :   admin_stream.addOnDestroyCallback(std::move(on_destroy_callback));
     334             : 
     335           0 :   ENVOY_LOG(debug, "started sending data to hystrix dashboard on port {}",
     336           0 :             stream_decoder_filter_callbacks.connection()
     337           0 :                 ->connectionInfoProvider()
     338           0 :                 .remoteAddress()
     339           0 :                 ->asString());
     340           0 :   return Http::Code::OK;
     341           0 : }
     342             : 
     343           0 : void HystrixSink::flush(Stats::MetricSnapshot& snapshot) {
     344           0 :   if (callbacks_list_.empty()) {
     345           0 :     return;
     346           0 :   }
     347           0 :   incCounter();
     348           0 :   std::stringstream ss;
     349           0 :   Upstream::ClusterManager::ClusterInfoMaps all_clusters = server_.clusterManager().clusters();
     350             : 
     351             :   // Save a map of the relevant histograms per cluster in a convenient format.
     352           0 :   absl::node_hash_map<std::string, QuantileLatencyMap> time_histograms;
     353           0 :   for (const auto& histogram : snapshot.histograms()) {
     354           0 :     if (histogram.get().tagExtractedStatName() == cluster_upstream_rq_time_) {
     355           0 :       absl::optional<Stats::StatName> value =
     356           0 :           Stats::Utility::findTag(histogram.get(), cluster_name_);
     357             :       // Make sure we found the cluster name tag
     358           0 :       ASSERT(value);
     359           0 :       std::string value_str = server_.scope().symbolTable().toString(*value);
     360           0 :       auto it_bool_pair = time_histograms.emplace(std::make_pair(value_str, QuantileLatencyMap()));
     361             :       // Make sure histogram with this name was not already added
     362           0 :       ASSERT(it_bool_pair.second);
     363           0 :       QuantileLatencyMap& hist_map = it_bool_pair.first->second;
     364             : 
     365           0 :       const std::vector<double>& supported_quantiles =
     366           0 :           histogram.get().intervalStatistics().supportedQuantiles();
     367           0 :       for (size_t i = 0; i < supported_quantiles.size(); ++i) {
     368             :         // binary-search here is likely not worth it, as hystrix_quantiles has <10 elements.
     369           0 :         if (std::find(hystrix_quantiles.begin(), hystrix_quantiles.end(), supported_quantiles[i]) !=
     370           0 :             hystrix_quantiles.end()) {
     371           0 :           const double value = histogram.get().intervalStatistics().computedQuantiles()[i];
     372           0 :           if (!std::isnan(value)) {
     373           0 :             hist_map[supported_quantiles[i]] = value;
     374           0 :           }
     375           0 :         }
     376           0 :       }
     377           0 :     }
     378           0 :   }
     379             : 
     380           0 :   for (auto& cluster : all_clusters.active_clusters_) {
     381           0 :     Upstream::ClusterInfoConstSharedPtr cluster_info = cluster.second.get().info();
     382             : 
     383           0 :     std::unique_ptr<ClusterStatsCache>& cluster_stats_cache_ptr =
     384           0 :         cluster_stats_cache_map_[cluster_info->name()];
     385           0 :     if (cluster_stats_cache_ptr == nullptr) {
     386           0 :       cluster_stats_cache_ptr = std::make_unique<ClusterStatsCache>(cluster_info->name());
     387           0 :     }
     388             : 
     389             :     // update rolling window with cluster stats
     390           0 :     updateRollingWindowMap(*cluster_info, *cluster_stats_cache_ptr);
     391             : 
     392             :     // append it to stream to be sent
     393           0 :     addClusterStatsToStream(
     394           0 :         *cluster_stats_cache_ptr, cluster_info->name(),
     395           0 :         cluster_info->resourceManager(Upstream::ResourcePriority::Default).pendingRequests().max(),
     396           0 :         cluster_info->statsScope()
     397           0 :             .gaugeFromStatName(membership_total_, Stats::Gauge::ImportMode::NeverImport)
     398           0 :             .value(),
     399           0 :         server_.statsConfig().flushInterval(), time_histograms[cluster_info->name()], ss);
     400           0 :   }
     401             : 
     402           0 :   Buffer::OwnedImpl data;
     403           0 :   for (auto callbacks : callbacks_list_) {
     404           0 :     data.add(ss.str());
     405           0 :     callbacks->encodeData(data, false);
     406           0 :   }
     407             : 
     408             :   // send keep alive ping
     409             :   // TODO (@trabetti) : is it ok to send together with data?
     410           0 :   Buffer::OwnedImpl ping_data;
     411           0 :   for (auto callbacks : callbacks_list_) {
     412           0 :     ping_data.add(":\n\n");
     413           0 :     callbacks->encodeData(ping_data, false);
     414           0 :   }
     415             : 
     416             :   // check if any clusters were removed, and remove from cache
     417           0 :   if (all_clusters.active_clusters_.size() < cluster_stats_cache_map_.size()) {
     418           0 :     for (auto it = cluster_stats_cache_map_.begin(); it != cluster_stats_cache_map_.end();) {
     419           0 :       if (all_clusters.active_clusters_.find(it->first) == all_clusters.active_clusters_.end()) {
     420           0 :         auto next_it = std::next(it);
     421           0 :         cluster_stats_cache_map_.erase(it);
     422           0 :         it = next_it;
     423           0 :       } else {
     424           0 :         ++it;
     425           0 :       }
     426           0 :     }
     427           0 :   }
     428           0 : }
     429             : 
     430           0 : void HystrixSink::registerConnection(Http::StreamDecoderFilterCallbacks* callbacks_to_register) {
     431           0 :   callbacks_list_.emplace_back(callbacks_to_register);
     432           0 : }
     433             : 
     434           0 : void HystrixSink::unregisterConnection(Http::StreamDecoderFilterCallbacks* callbacks_to_remove) {
     435           0 :   for (auto it = callbacks_list_.begin(); it != callbacks_list_.end(); ++it) {
     436           0 :     if ((*it)->streamId() == callbacks_to_remove->streamId()) {
     437           0 :       callbacks_list_.erase(it);
     438           0 :       break;
     439           0 :     }
     440           0 :   }
     441             :   // If there are no callbacks, clear the map to avoid stale values or having to keep updating the
     442             :   // map. When a new callback is assigned, the rollingWindow is initialized with current statistics
     443             :   // and within RollingWindow time, the results showed in the dashboard will be reliable
     444           0 :   if (callbacks_list_.empty()) {
     445           0 :     resetRollingWindow();
     446           0 :   }
     447           0 : }
     448             : 
     449             : } // namespace Hystrix
     450             : } // namespace StatSinks
     451             : } // namespace Extensions
     452             : } // namespace Envoy

Generated by: LCOV version 1.15