1
#pragma once
2

            
3
#include "envoy/event/dispatcher.h"
4
#include "envoy/event/scaled_timer.h"
5
#include "envoy/event/timer.h"
6
#include "envoy/http/codec.h"
7

            
8
#include "source/common/common/assert.h"
9

            
10
#include "absl/container/inlined_vector.h"
11

            
12
namespace Envoy {
13
namespace Http {
14

            
15
class StreamCallbackHelper {
16
public:
17
254896
  void runLowWatermarkCallbacks() {
18
254896
    if (reset_callbacks_started_ || local_end_stream_) {
19
1514
      return;
20
1514
    }
21
253382
    ASSERT(high_watermark_callbacks_ > 0);
22
253382
    --high_watermark_callbacks_;
23
559714
    for (StreamCallbacks* callbacks : callbacks_) {
24
559714
      if (callbacks) {
25
559705
        callbacks->onBelowWriteBufferLowWatermark();
26
559705
      }
27
559714
    }
28
253382
  }
29

            
30
255087
  void runHighWatermarkCallbacks() {
31
255087
    if (reset_callbacks_started_ || local_end_stream_) {
32
1528
      return;
33
1528
    }
34
253559
    ++high_watermark_callbacks_;
35
559816
    for (StreamCallbacks* callbacks : callbacks_) {
36
559805
      if (callbacks) {
37
559794
        callbacks->onAboveWriteBufferHighWatermark();
38
559794
      }
39
559805
    }
40
253559
  }
41

            
42
32381
  void runResetCallbacks(StreamResetReason reason, absl::string_view details) {
43
    // Reset callbacks are a special case, and the only StreamCallbacks allowed
44
    // to run after local_end_stream_.
45
32381
    if (reset_callbacks_started_) {
46
8022
      return;
47
8022
    }
48

            
49
24359
    reset_callbacks_started_ = true;
50
41881
    for (StreamCallbacks* callbacks : callbacks_) {
51
41788
      if (callbacks) {
52
34992
        callbacks->onResetStream(reason, details);
53
34992
      }
54
41788
    }
55
24359
  }
56

            
57
  bool local_end_stream_{};
58

            
59
protected:
60
363391
  void addCallbacksHelper(StreamCallbacks& callbacks) {
61
363391
    ASSERT(!reset_callbacks_started_ && !local_end_stream_);
62
363391
    callbacks_.push_back(&callbacks);
63
363504
    for (uint32_t i = 0; i < high_watermark_callbacks_; ++i) {
64
113
      callbacks.onAboveWriteBufferHighWatermark();
65
113
    }
66
363391
  }
67

            
68
226419
  void removeCallbacksHelper(StreamCallbacks& callbacks) {
69
    // For performance reasons we just clear the callback and do not resize the vector.
70
    // Reset callbacks scale with the number of filters per request and do not get added and
71
    // removed multiple times.
72
    // The vector may not be safely resized without making sure the run.*Callbacks() helper
73
    // functions above still handle removeCallbacksHelper() calls mid-loop.
74
234838
    for (auto& callback : callbacks_) {
75
234838
      if (callback == &callbacks) {
76
181805
        callback = nullptr;
77
181805
        return;
78
181805
      }
79
234838
    }
80
226419
  }
81

            
82
private:
83
  absl::InlinedVector<StreamCallbacks*, 8> callbacks_;
84
  bool reset_callbacks_started_{};
85
  uint32_t high_watermark_callbacks_{};
86
};
87

            
88
// A base class shared between Http2 codec and Http3 codec to set a timeout for locally ended stream
89
// with buffered data and register the stream adapter.
90
class MultiplexedStreamImplBase : public Stream, public StreamCallbackHelper {
91
public:
92
157089
  MultiplexedStreamImplBase(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
93
157070
  ~MultiplexedStreamImplBase() override { ASSERT(stream_flush_timer_ == nullptr); }
94
  // TODO(mattklein123): Optimally this would be done in the destructor but there are currently
95
  // deferred delete lifetime issues that need sorting out if the destructor of the stream is
96
  // going to be able to refer to the parent connection.
97
157081
  virtual void destroy() { disarmStreamFlushTimer(); }
98

            
99
25034
  void onLocalEndStream() {
100
25034
    ASSERT(local_end_stream_);
101
25034
    if (hasPendingData()) {
102
1665
      createPendingFlushTimer();
103
1665
    }
104
25034
  }
105

            
106
157139
  void disarmStreamFlushTimer() {
107
157139
    if (stream_flush_timer_ != nullptr) {
108
      // To ease testing and the destructor assertion.
109
396
      stream_flush_timer_->disableTimer();
110
396
      stream_flush_timer_.reset();
111
396
    }
112
157139
  }
113

            
114
155087
  CodecEventCallbacks* registerCodecEventCallbacks(CodecEventCallbacks* codec_callbacks) override {
115
155087
    std::swap(codec_callbacks, codec_callbacks_);
116
155087
    return codec_callbacks;
117
155087
  }
118

            
119
protected:
120
78021
  void setFlushTimeout(std::chrono::milliseconds timeout) override {
121
78021
    stream_flush_timeout_ = timeout;
122
78021
  }
123

            
124
1665
  void createPendingFlushTimer() {
125
1665
    ASSERT(stream_flush_timer_ == nullptr);
126
1665
    if (stream_flush_timeout_.count() > 0) {
127
411
      stream_flush_timer_ = dispatcher_.createScaledTimer(
128
411
          Event::ScaledTimerType::HttpDownstreamStreamFlush, [this] { onPendingFlushTimer(); });
129
411
      stream_flush_timer_->enableTimer(stream_flush_timeout_);
130
411
    }
131
1665
  }
132

            
133
15
  virtual void onPendingFlushTimer() { stream_flush_timer_.reset(); }
134

            
135
  virtual bool hasPendingData() PURE;
136

            
137
  CodecEventCallbacks* codec_callbacks_{nullptr};
138
  bool codec_low_level_reset_is_called_{false};
139

            
140
private:
141
  Event::Dispatcher& dispatcher_;
142
  // See HttpConnectionManager.stream_flush_timeout.
143
  std::chrono::milliseconds stream_flush_timeout_{};
144
  Event::TimerPtr stream_flush_timer_;
145
};
146

            
147
} // namespace Http
148
} // namespace Envoy