1
#pragma once
2

            
3
#include "envoy/event/dispatcher.h"
4
#include "envoy/event/scaled_timer.h"
5
#include "envoy/event/timer.h"
6
#include "envoy/http/codec.h"
7

            
8
#include "source/common/common/assert.h"
9

            
10
#include "absl/container/inlined_vector.h"
11

            
12
namespace Envoy {
13
namespace Http {
14

            
15
class StreamCallbackHelper {
16
public:
17
258860
  void runLowWatermarkCallbacks() {
18
258860
    if (reset_callbacks_started_ || local_end_stream_) {
19
1528
      return;
20
1528
    }
21
257332
    ASSERT(high_watermark_callbacks_ > 0);
22
257332
    --high_watermark_callbacks_;
23
565543
    for (StreamCallbacks* callbacks : callbacks_) {
24
565543
      if (callbacks) {
25
565535
        callbacks->onBelowWriteBufferLowWatermark();
26
565535
      }
27
565543
    }
28
257332
  }
29

            
30
259057
  void runHighWatermarkCallbacks() {
31
259057
    if (reset_callbacks_started_ || local_end_stream_) {
32
1535
      return;
33
1535
    }
34
257522
    ++high_watermark_callbacks_;
35
565621
    for (StreamCallbacks* callbacks : callbacks_) {
36
565610
      if (callbacks) {
37
565600
        callbacks->onAboveWriteBufferHighWatermark();
38
565600
      }
39
565610
    }
40
257522
  }
41

            
42
33162
  void runResetCallbacks(StreamResetReason reason, absl::string_view details) {
43
    // Reset callbacks are a special case, and the only StreamCallbacks allowed
44
    // to run after local_end_stream_.
45
33162
    if (reset_callbacks_started_) {
46
8024
      return;
47
8024
    }
48

            
49
25138
    reset_callbacks_started_ = true;
50
42597
    for (StreamCallbacks* callbacks : callbacks_) {
51
42504
      if (callbacks) {
52
35701
        callbacks->onResetStream(reason, details);
53
35701
      }
54
42504
    }
55
25138
  }
56

            
57
  bool local_end_stream_{};
58

            
59
protected:
60
362643
  void addCallbacksHelper(StreamCallbacks& callbacks) {
61
362643
    ASSERT(!reset_callbacks_started_ && !local_end_stream_);
62
362643
    callbacks_.push_back(&callbacks);
63
362796
    for (uint32_t i = 0; i < high_watermark_callbacks_; ++i) {
64
153
      callbacks.onAboveWriteBufferHighWatermark();
65
153
    }
66
362643
  }
67

            
68
226228
  void removeCallbacksHelper(StreamCallbacks& callbacks) {
69
    // For performance reasons we just clear the callback and do not resize the vector.
70
    // Reset callbacks scale with the number of filters per request and do not get added and
71
    // removed multiple times.
72
    // The vector may not be safely resized without making sure the run.*Callbacks() helper
73
    // functions above still handle removeCallbacksHelper() calls mid-loop.
74
234639
    for (auto& callback : callbacks_) {
75
234639
      if (callback == &callbacks) {
76
181614
        callback = nullptr;
77
181614
        return;
78
181614
      }
79
234639
    }
80
226228
  }
81

            
82
private:
83
  absl::InlinedVector<StreamCallbacks*, 8> callbacks_;
84
  bool reset_callbacks_started_{};
85
  uint32_t high_watermark_callbacks_{};
86
};
87

            
88
// A base class shared between Http2 codec and Http3 codec to set a timeout for locally ended stream
89
// with buffered data and register the stream adapter.
90
class MultiplexedStreamImplBase : public Stream, public StreamCallbackHelper {
91
public:
92
156903
  MultiplexedStreamImplBase(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
93
156880
  ~MultiplexedStreamImplBase() override { ASSERT(stream_flush_timer_ == nullptr); }
94
  // TODO(mattklein123): Optimally this would be done in the destructor but there are currently
95
  // deferred delete lifetime issues that need sorting out if the destructor of the stream is
96
  // going to be able to refer to the parent connection.
97
156903
  virtual void destroy() { disarmStreamFlushTimer(); }
98

            
99
25097
  void onLocalEndStream() {
100
25097
    ASSERT(local_end_stream_);
101
25097
    if (hasPendingData()) {
102
1817
      createPendingFlushTimer();
103
1817
    }
104
25097
  }
105

            
106
156961
  void disarmStreamFlushTimer() {
107
156961
    if (stream_flush_timer_ != nullptr) {
108
      // To ease testing and the destructor assertion.
109
413
      stream_flush_timer_->disableTimer();
110
413
      stream_flush_timer_.reset();
111
413
    }
112
156961
  }
113

            
114
155094
  CodecEventCallbacks* registerCodecEventCallbacks(CodecEventCallbacks* codec_callbacks) override {
115
155094
    std::swap(codec_callbacks, codec_callbacks_);
116
155094
    return codec_callbacks;
117
155094
  }
118

            
119
protected:
120
78017
  void setFlushTimeout(std::chrono::milliseconds timeout) override {
121
78017
    stream_flush_timeout_ = timeout;
122
78017
  }
123

            
124
1817
  void createPendingFlushTimer() {
125
1817
    ASSERT(stream_flush_timer_ == nullptr);
126
1817
    if (stream_flush_timeout_.count() > 0) {
127
428
      stream_flush_timer_ = dispatcher_.createScaledTimer(
128
428
          Event::ScaledTimerType::HttpDownstreamStreamFlush, [this] { onPendingFlushTimer(); });
129
428
      stream_flush_timer_->enableTimer(stream_flush_timeout_);
130
428
    }
131
1817
  }
132

            
133
15
  virtual void onPendingFlushTimer() { stream_flush_timer_.reset(); }
134

            
135
  virtual bool hasPendingData() PURE;
136

            
137
  CodecEventCallbacks* codec_callbacks_{nullptr};
138
  bool codec_low_level_reset_is_called_{false};
139

            
140
private:
141
  Event::Dispatcher& dispatcher_;
142
  // See HttpConnectionManager.stream_flush_timeout.
143
  std::chrono::milliseconds stream_flush_timeout_{};
144
  Event::TimerPtr stream_flush_timer_;
145
};
146

            
147
} // namespace Http
148
} // namespace Envoy