1
#include "source/extensions/config_subscription/grpc/pausable_ack_queue.h"
2

            
3
#include <list>
4

            
5
#include "source/common/common/assert.h"
6

            
7
namespace Envoy {
8
namespace Config {
9

            
10
2867
void PausableAckQueue::push(UpdateAck x) { storage_.push_back(std::move(x)); }
11

            
12
9360
size_t PausableAckQueue::size() const { return storage_.size(); }
13

            
14
19119
bool PausableAckQueue::empty() {
15
19119
  for (const auto& entry : storage_) {
16
5822
    if (pauses_[entry.type_url_] == 0) {
17
5685
      return false;
18
5685
    }
19
5822
  }
20
13434
  return true;
21
19119
}
22

            
23
// In the event of a reconnection, clear all the cached nonces.
24
1009
void PausableAckQueue::clear() { storage_.clear(); }
25

            
26
2879
const UpdateAck& PausableAckQueue::front() {
27
2893
  for (const auto& entry : storage_) {
28
2893
    if (pauses_[entry.type_url_] == 0) {
29
2879
      return entry;
30
2879
    }
31
2893
  }
32
  PANIC("front() on an empty queue is undefined behavior!");
33
}
34

            
35
2824
UpdateAck PausableAckQueue::popFront() {
36
2831
  for (auto it = storage_.begin(); it != storage_.end(); ++it) {
37
2831
    if (pauses_[it->type_url_] == 0) {
38
2824
      UpdateAck ret = *it;
39
2824
      storage_.erase(it);
40
2824
      return ret;
41
2824
    }
42
2831
  }
43
  PANIC("popFront() on an empty queue is undefined behavior!");
44
}
45

            
46
3883
void PausableAckQueue::pause(const std::string& type_url) {
47
  // It's ok to pause a subscription that doesn't exist yet.
48
3883
  auto& pause_entry = pauses_[type_url];
49
3883
  ++pause_entry;
50
3883
}
51

            
52
3880
void PausableAckQueue::resume(const std::string& type_url) {
53
3880
  auto& pause_entry = pauses_[type_url];
54
3880
  ASSERT(pause_entry > 0);
55
3880
  --pause_entry;
56
3880
}
57

            
58
10840
bool PausableAckQueue::paused(const std::string& type_url) const {
59
10840
  auto entry = pauses_.find(type_url);
60
10840
  if (entry == pauses_.end()) {
61
3140
    return false;
62
3140
  }
63
7700
  return entry->second > 0;
64
10840
}
65

            
66
} // namespace Config
67
} // namespace Envoy