/proc/self/cwd/source/common/http/codec_helper.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #include "envoy/event/dispatcher.h" |
4 | | #include "envoy/event/timer.h" |
5 | | #include "envoy/http/codec.h" |
6 | | |
7 | | #include "source/common/common/assert.h" |
8 | | |
9 | | #include "absl/container/inlined_vector.h" |
10 | | |
11 | | namespace Envoy { |
12 | | namespace Http { |
13 | | |
14 | | class StreamCallbackHelper { |
15 | | public: |
16 | 2.26k | void runLowWatermarkCallbacks() { |
17 | 2.26k | if (reset_callbacks_started_ || local_end_stream_) { |
18 | 480 | return; |
19 | 480 | } |
20 | 1.78k | ASSERT(high_watermark_callbacks_ > 0); |
21 | 1.78k | --high_watermark_callbacks_; |
22 | 1.78k | for (StreamCallbacks* callbacks : callbacks_) { |
23 | 1.78k | if (callbacks) { |
24 | 1.78k | callbacks->onBelowWriteBufferLowWatermark(); |
25 | 1.78k | } |
26 | 1.78k | } |
27 | 1.78k | } |
28 | | |
29 | 2.91k | void runHighWatermarkCallbacks() { |
30 | 2.91k | if (reset_callbacks_started_ || local_end_stream_) { |
31 | 349 | return; |
32 | 349 | } |
33 | 2.57k | ++high_watermark_callbacks_; |
34 | 2.57k | for (StreamCallbacks* callbacks : callbacks_) { |
35 | 2.57k | if (callbacks) { |
36 | 2.57k | callbacks->onAboveWriteBufferHighWatermark(); |
37 | 2.57k | } |
38 | 2.57k | } |
39 | 2.57k | } |
40 | | |
41 | 67.7k | void runResetCallbacks(StreamResetReason reason) { |
42 | | // Reset callbacks are a special case, and the only StreamCallbacks allowed |
43 | | // to run after local_end_stream_. |
44 | 67.7k | if (reset_callbacks_started_) { |
45 | 9.69k | return; |
46 | 9.69k | } |
47 | | |
48 | 58.0k | reset_callbacks_started_ = true; |
49 | 58.0k | for (StreamCallbacks* callbacks : callbacks_) { |
50 | 30.4k | if (callbacks) { |
51 | 29.2k | callbacks->onResetStream(reason, absl::string_view()); |
52 | 29.2k | } |
53 | 30.4k | } |
54 | 58.0k | } |
55 | | |
56 | | bool local_end_stream_{}; |
57 | | |
58 | | protected: |
59 | 156k | void addCallbacksHelper(StreamCallbacks& callbacks) { |
60 | 156k | ASSERT(!reset_callbacks_started_ && !local_end_stream_); |
61 | 156k | callbacks_.push_back(&callbacks); |
62 | 156k | for (uint32_t i = 0; i < high_watermark_callbacks_; ++i) { |
63 | 0 | callbacks.onAboveWriteBufferHighWatermark(); |
64 | 0 | } |
65 | 156k | } |
66 | | |
67 | 12.5k | void removeCallbacksHelper(StreamCallbacks& callbacks) { |
68 | | // For performance reasons we just clear the callback and do not resize the vector. |
69 | | // Reset callbacks scale with the number of filters per request and do not get added and |
70 | | // removed multiple times. |
71 | | // The vector may not be safely resized without making sure the run.*Callbacks() helper |
72 | | // functions above still handle removeCallbacksHelper() calls mid-loop. |
73 | 13.7k | for (auto& callback : callbacks_) { |
74 | 13.7k | if (callback == &callbacks) { |
75 | 11.6k | callback = nullptr; |
76 | 11.6k | return; |
77 | 11.6k | } |
78 | 13.7k | } |
79 | 12.5k | } |
80 | | |
81 | | private: |
82 | | absl::InlinedVector<StreamCallbacks*, 8> callbacks_; |
83 | | bool reset_callbacks_started_{}; |
84 | | uint32_t high_watermark_callbacks_{}; |
85 | | }; |
86 | | |
87 | | // A base class shared between Http2 codec and Http3 codec to set a timeout for locally ended stream |
88 | | // with buffered data and register the stream adapter. |
89 | | class MultiplexedStreamImplBase : public Stream, public StreamCallbackHelper { |
90 | | public: |
91 | 188k | MultiplexedStreamImplBase(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {} |
92 | 188k | ~MultiplexedStreamImplBase() override { ASSERT(stream_idle_timer_ == nullptr); } |
93 | | // TODO(mattklein123): Optimally this would be done in the destructor but there are currently |
94 | | // deferred delete lifetime issues that need sorting out if the destructor of the stream is |
95 | | // going to be able to refer to the parent connection. |
96 | 188k | virtual void destroy() { disarmStreamIdleTimer(); } |
97 | | |
98 | 5.12k | void onLocalEndStream() { |
99 | 5.12k | ASSERT(local_end_stream_); |
100 | 5.12k | if (hasPendingData()) { |
101 | 792 | createPendingFlushTimer(); |
102 | 792 | } |
103 | 5.12k | } |
104 | | |
105 | 214k | void disarmStreamIdleTimer() { |
106 | 214k | if (stream_idle_timer_ != nullptr) { |
107 | | // To ease testing and the destructor assertion. |
108 | 106 | stream_idle_timer_->disableTimer(); |
109 | 106 | stream_idle_timer_.reset(); |
110 | 106 | } |
111 | 214k | } |
112 | | |
113 | 3.63k | CodecEventCallbacks* registerCodecEventCallbacks(CodecEventCallbacks* codec_callbacks) override { |
114 | 3.63k | std::swap(codec_callbacks, codec_callbacks_); |
115 | 3.63k | return codec_callbacks; |
116 | 3.63k | } |
117 | | |
118 | | protected: |
119 | 1.81k | void setFlushTimeout(std::chrono::milliseconds timeout) override { |
120 | 1.81k | stream_idle_timeout_ = timeout; |
121 | 1.81k | } |
122 | | |
123 | 792 | void createPendingFlushTimer() { |
124 | 792 | ASSERT(stream_idle_timer_ == nullptr); |
125 | 792 | if (stream_idle_timeout_.count() > 0) { |
126 | 106 | stream_idle_timer_ = dispatcher_.createTimer([this] { onPendingFlushTimer(); }); |
127 | 106 | stream_idle_timer_->enableTimer(stream_idle_timeout_); |
128 | 106 | } |
129 | 792 | } |
130 | | |
131 | 0 | virtual void onPendingFlushTimer() { stream_idle_timer_.reset(); } |
132 | | |
133 | | virtual bool hasPendingData() PURE; |
134 | | |
135 | | CodecEventCallbacks* codec_callbacks_{nullptr}; |
136 | | |
137 | | private: |
138 | | Event::Dispatcher& dispatcher_; |
139 | | // See HttpConnectionManager.stream_idle_timeout. |
140 | | std::chrono::milliseconds stream_idle_timeout_{}; |
141 | | Event::TimerPtr stream_idle_timer_; |
142 | | }; |
143 | | |
144 | | } // namespace Http |
145 | | } // namespace Envoy |