Line data Source code
1 : #pragma once 2 : 3 : #include "envoy/event/dispatcher.h" 4 : #include "envoy/event/timer.h" 5 : #include "envoy/http/codec.h" 6 : 7 : #include "source/common/common/assert.h" 8 : 9 : #include "absl/container/inlined_vector.h" 10 : 11 : namespace Envoy { 12 : namespace Http { 13 : 14 : class StreamCallbackHelper { 15 : public: 16 70 : void runLowWatermarkCallbacks() { 17 70 : if (reset_callbacks_started_ || local_end_stream_) { 18 4 : return; 19 4 : } 20 66 : ASSERT(high_watermark_callbacks_ > 0); 21 66 : --high_watermark_callbacks_; 22 66 : for (StreamCallbacks* callbacks : callbacks_) { 23 66 : if (callbacks) { 24 66 : callbacks->onBelowWriteBufferLowWatermark(); 25 66 : } 26 66 : } 27 66 : } 28 : 29 74 : void runHighWatermarkCallbacks() { 30 74 : if (reset_callbacks_started_ || local_end_stream_) { 31 4 : return; 32 4 : } 33 70 : ++high_watermark_callbacks_; 34 70 : for (StreamCallbacks* callbacks : callbacks_) { 35 70 : if (callbacks) { 36 70 : callbacks->onAboveWriteBufferHighWatermark(); 37 70 : } 38 70 : } 39 70 : } 40 : 41 693 : void runResetCallbacks(StreamResetReason reason) { 42 : // Reset callbacks are a special case, and the only StreamCallbacks allowed 43 : // to run after local_end_stream_. 44 693 : if (reset_callbacks_started_) { 45 68 : return; 46 68 : } 47 : 48 625 : reset_callbacks_started_ = true; 49 775 : for (StreamCallbacks* callbacks : callbacks_) { 50 538 : if (callbacks) { 51 489 : callbacks->onResetStream(reason, absl::string_view()); 52 489 : } 53 538 : } 54 625 : } 55 : 56 : bool local_end_stream_{}; 57 : 58 : protected: 59 2981 : void addCallbacksHelper(StreamCallbacks& callbacks) { 60 2981 : ASSERT(!reset_callbacks_started_ && !local_end_stream_); 61 2981 : callbacks_.push_back(&callbacks); 62 2981 : for (uint32_t i = 0; i < high_watermark_callbacks_; ++i) { 63 0 : callbacks.onAboveWriteBufferHighWatermark(); 64 0 : } 65 2981 : } 66 : 67 983 : void removeCallbacksHelper(StreamCallbacks& callbacks) { 68 : // For performance reasons we just clear the callback and do not resize the vector. 69 : // Reset callbacks scale with the number of filters per request and do not get added and 70 : // removed multiple times. 71 : // The vector may not be safely resized without making sure the run.*Callbacks() helper 72 : // functions above still handle removeCallbacksHelper() calls mid-loop. 73 1036 : for (auto& callback : callbacks_) { 74 1036 : if (callback == &callbacks) { 75 875 : callback = nullptr; 76 875 : return; 77 875 : } 78 1036 : } 79 983 : } 80 : 81 : private: 82 : absl::InlinedVector<StreamCallbacks*, 8> callbacks_; 83 : bool reset_callbacks_started_{}; 84 : uint32_t high_watermark_callbacks_{}; 85 : }; 86 : 87 : // A base class shared between Http2 codec and Http3 codec to set a timeout for locally ended stream 88 : // with buffered data and register the stream adapter. 89 : class MultiplexedStreamImplBase : public Stream, public StreamCallbackHelper { 90 : public: 91 2206 : MultiplexedStreamImplBase(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {} 92 2199 : ~MultiplexedStreamImplBase() override { ASSERT(stream_idle_timer_ == nullptr); } 93 : // TODO(mattklein123): Optimally this would be done in the destructor but there are currently 94 : // deferred delete lifetime issues that need sorting out if the destructor of the stream is 95 : // going to be able to refer to the parent connection. 96 2206 : virtual void destroy() { disarmStreamIdleTimer(); } 97 : 98 267 : void onLocalEndStream() { 99 267 : ASSERT(local_end_stream_); 100 267 : if (hasPendingData()) { 101 89 : createPendingFlushTimer(); 102 89 : } 103 267 : } 104 : 105 3370 : void disarmStreamIdleTimer() { 106 3370 : if (stream_idle_timer_ != nullptr) { 107 : // To ease testing and the destructor assertion. 108 85 : stream_idle_timer_->disableTimer(); 109 85 : stream_idle_timer_.reset(); 110 85 : } 111 3370 : } 112 : 113 424 : CodecEventCallbacks* registerCodecEventCallbacks(CodecEventCallbacks* codec_callbacks) override { 114 424 : std::swap(codec_callbacks, codec_callbacks_); 115 424 : return codec_callbacks; 116 424 : } 117 : 118 : protected: 119 212 : void setFlushTimeout(std::chrono::milliseconds timeout) override { 120 212 : stream_idle_timeout_ = timeout; 121 212 : } 122 : 123 89 : void createPendingFlushTimer() { 124 89 : ASSERT(stream_idle_timer_ == nullptr); 125 89 : if (stream_idle_timeout_.count() > 0) { 126 85 : stream_idle_timer_ = dispatcher_.createTimer([this] { onPendingFlushTimer(); }); 127 85 : stream_idle_timer_->enableTimer(stream_idle_timeout_); 128 85 : } 129 89 : } 130 : 131 0 : virtual void onPendingFlushTimer() { stream_idle_timer_.reset(); } 132 : 133 : virtual bool hasPendingData() PURE; 134 : 135 : CodecEventCallbacks* codec_callbacks_{nullptr}; 136 : 137 : private: 138 : Event::Dispatcher& dispatcher_; 139 : // See HttpConnectionManager.stream_idle_timeout. 140 : std::chrono::milliseconds stream_idle_timeout_{}; 141 : Event::TimerPtr stream_idle_timer_; 142 : }; 143 : 144 : } // namespace Http 145 : } // namespace Envoy