1
#pragma once
2

            
3
#include <queue>
4

            
5
#include "envoy/event/dispatcher.h"
6

            
7
namespace Envoy {
8
namespace Grpc {
9

            
10
using BufferedMessageExpirationCallback = std::function<void(uint64_t)>;
11

            
12
// This class is used to manage the TTL for pending uploads within a BufferedAsyncClient. Multiple
13
// IDs can be inserted into the TTL manager at once, all with the same TTL (specified in the
14
// constructor). Upon expiry, the TTL manager will invoke the provided expiry callback for each ID.
15
// Note that there is no way to disable the expiration, and so it's up to the recipient of the
16
// callback to handle this. BufferedAsyncClient will do the right thing here: if the expired ID is
17
// still in flight it will be returned to the buffer, otherwise it does nothing. The TTL manager is
18
// designed to handle multiple sets of IDs inserted at various times, backing this with a single
19
// Timer. This allows us to track a large amount of IDs inserted at different times without using a
20
// lot of different timers, which could put undue pressure on the event loop.
21
class BufferedMessageTtlManager {
22
public:
23
  BufferedMessageTtlManager(Event::Dispatcher& dispatcher,
24
                            BufferedMessageExpirationCallback&& expiry_callback,
25
                            std::chrono::milliseconds message_ack_timeout)
26
4
      : dispatcher_(dispatcher), message_ack_timeout_(message_ack_timeout),
27
4
        expiry_callback_(expiry_callback),
28
6
        timer_(dispatcher_.createTimer([this] { checkExpiredMessages(); })) {}
29

            
30
4
  ~BufferedMessageTtlManager() { timer_->disableTimer(); }
31

            
32
8
  void addDeadlineEntry(const absl::flat_hash_set<uint64_t>& ids) {
33
8
    const auto expires_at = dispatcher_.timeSource().monotonicTime() + message_ack_timeout_;
34
8
    deadline_.emplace(expires_at, std::move(ids));
35

            
36
8
    if (!timer_->enabled()) {
37
7
      timer_->enableTimer(message_ack_timeout_);
38
7
    }
39
8
  }
40

            
41
6
  const std::queue<std::pair<MonotonicTime, absl::flat_hash_set<uint64_t>>>& deadlineForTest() {
42
6
    return deadline_;
43
6
  }
44

            
45
private:
46
5
  void checkExpiredMessages() {
47
5
    const auto now = dispatcher_.timeSource().monotonicTime();
48

            
49
11
    while (!deadline_.empty()) {
50
7
      auto& it = deadline_.front();
51
7
      if (it.first > now) {
52
1
        break;
53
1
      }
54
7
      for (auto&& id : it.second) {
55
7
        expiry_callback_(id);
56
7
      }
57
6
      deadline_.pop();
58
6
    }
59

            
60
5
    if (!deadline_.empty()) {
61
1
      const auto earliest_timepoint = deadline_.front().first;
62
1
      timer_->enableTimer(
63
1
          std::chrono::duration_cast<std::chrono::milliseconds>(earliest_timepoint - now));
64
1
    }
65
5
  }
66

            
67
  Event::Dispatcher& dispatcher_;
68
  std::chrono::milliseconds message_ack_timeout_;
69
  BufferedMessageExpirationCallback expiry_callback_;
70
  Event::TimerPtr timer_;
71
  std::queue<std::pair<MonotonicTime, absl::flat_hash_set<uint64_t>>> deadline_;
72
};
73
} // namespace Grpc
74
} // namespace Envoy