1
#pragma once
2

            
3
#include "envoy/event/dispatcher.h"
4
#include "envoy/service/load_stats/v3/lrs.pb.h"
5
#include "envoy/stats/scope.h"
6
#include "envoy/stats/stats_macros.h"
7
#include "envoy/upstream/cluster_manager.h"
8
#include "envoy/upstream/load_stats_reporter.h"
9

            
10
#include "source/common/common/logger.h"
11
#include "source/common/grpc/async_client_impl.h"
12
#include "source/common/grpc/typed_async_client.h"
13

            
14
namespace Envoy {
15
namespace Upstream {
16

            
17
class LoadStatsReporterImpl
18
    : public LoadStatsReporter,
19
      public Grpc::AsyncStreamCallbacks<envoy::service::load_stats::v3::LoadStatsResponse>,
20
      public Logger::Loggable<Logger::Id::upstream> {
21
public:
22
  LoadStatsReporterImpl(const LocalInfo::LocalInfo& local_info, ClusterManager& cluster_manager,
23
                        Stats::Scope& scope, Grpc::RawAsyncClientSharedPtr async_client,
24
                        Event::Dispatcher& dispatcher);
25
  ~LoadStatsReporterImpl() override;
26

            
27
  // Grpc::AsyncStreamCallbacks
28
  void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override;
29
  void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) override;
30
  void onReceiveMessage(
31
      std::unique_ptr<envoy::service::load_stats::v3::LoadStatsResponse>&& message) override;
32
  void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) override;
33
  void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override;
34

            
35
  // Upstream::LoadStatsReporter
36
4
  const LoadReporterStats& getStats() const override { return stats_; };
37

            
38
  // TODO(htuch): Make this configurable or some static.
39
  const uint32_t RETRY_DELAY_MS = 5000;
40

            
41
private:
42
  void setRetryTimer();
43
  void establishNewStream();
44
  void sendLoadStatsRequest();
45
  void handleFailure();
46
  void startLoadReportPeriod();
47

            
48
  ClusterManager& cm_;
49
  LoadReporterStats stats_;
50
  Grpc::AsyncClient<envoy::service::load_stats::v3::LoadStatsRequest,
51
                    envoy::service::load_stats::v3::LoadStatsResponse>
52
      async_client_;
53
  Grpc::AsyncStream<envoy::service::load_stats::v3::LoadStatsRequest> stream_{};
54
  const Protobuf::MethodDescriptor& service_method_;
55
  Event::TimerPtr retry_timer_;
56
  Event::TimerPtr response_timer_;
57
  const envoy::service::load_stats::v3::LoadStatsRequest request_template_;
58
  std::unique_ptr<envoy::service::load_stats::v3::LoadStatsResponse> message_;
59
  // Map from cluster name to start of measurement interval.
60
  absl::node_hash_map<std::string, std::chrono::steady_clock::duration> clusters_;
61
  TimeSource& time_source_;
62
};
63

            
64
} // namespace Upstream
65
} // namespace Envoy