LCOV - code coverage report
Current view: top level - source/common/http - codec_helper.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 76 79 96.2 %
Date: 2024-01-05 06:35:25 Functions: 13 15 86.7 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include "envoy/event/dispatcher.h"
       4             : #include "envoy/event/timer.h"
       5             : #include "envoy/http/codec.h"
       6             : 
       7             : #include "source/common/common/assert.h"
       8             : 
       9             : #include "absl/container/inlined_vector.h"
      10             : 
      11             : namespace Envoy {
      12             : namespace Http {
      13             : 
      14             : class StreamCallbackHelper {
      15             : public:
      16          70 :   void runLowWatermarkCallbacks() {
      17          70 :     if (reset_callbacks_started_ || local_end_stream_) {
      18           4 :       return;
      19           4 :     }
      20          66 :     ASSERT(high_watermark_callbacks_ > 0);
      21          66 :     --high_watermark_callbacks_;
      22          66 :     for (StreamCallbacks* callbacks : callbacks_) {
      23          66 :       if (callbacks) {
      24          66 :         callbacks->onBelowWriteBufferLowWatermark();
      25          66 :       }
      26          66 :     }
      27          66 :   }
      28             : 
      29          74 :   void runHighWatermarkCallbacks() {
      30          74 :     if (reset_callbacks_started_ || local_end_stream_) {
      31           4 :       return;
      32           4 :     }
      33          70 :     ++high_watermark_callbacks_;
      34          70 :     for (StreamCallbacks* callbacks : callbacks_) {
      35          70 :       if (callbacks) {
      36          70 :         callbacks->onAboveWriteBufferHighWatermark();
      37          70 :       }
      38          70 :     }
      39          70 :   }
      40             : 
      41         693 :   void runResetCallbacks(StreamResetReason reason) {
      42             :     // Reset callbacks are a special case, and the only StreamCallbacks allowed
      43             :     // to run after local_end_stream_.
      44         693 :     if (reset_callbacks_started_) {
      45          68 :       return;
      46          68 :     }
      47             : 
      48         625 :     reset_callbacks_started_ = true;
      49         775 :     for (StreamCallbacks* callbacks : callbacks_) {
      50         538 :       if (callbacks) {
      51         489 :         callbacks->onResetStream(reason, absl::string_view());
      52         489 :       }
      53         538 :     }
      54         625 :   }
      55             : 
      56             :   bool local_end_stream_{};
      57             : 
      58             : protected:
      59        2981 :   void addCallbacksHelper(StreamCallbacks& callbacks) {
      60        2981 :     ASSERT(!reset_callbacks_started_ && !local_end_stream_);
      61        2981 :     callbacks_.push_back(&callbacks);
      62        2981 :     for (uint32_t i = 0; i < high_watermark_callbacks_; ++i) {
      63           0 :       callbacks.onAboveWriteBufferHighWatermark();
      64           0 :     }
      65        2981 :   }
      66             : 
      67         983 :   void removeCallbacksHelper(StreamCallbacks& callbacks) {
      68             :     // For performance reasons we just clear the callback and do not resize the vector.
      69             :     // Reset callbacks scale with the number of filters per request and do not get added and
      70             :     // removed multiple times.
      71             :     // The vector may not be safely resized without making sure the run.*Callbacks() helper
      72             :     // functions above still handle removeCallbacksHelper() calls mid-loop.
      73        1036 :     for (auto& callback : callbacks_) {
      74        1036 :       if (callback == &callbacks) {
      75         875 :         callback = nullptr;
      76         875 :         return;
      77         875 :       }
      78        1036 :     }
      79         983 :   }
      80             : 
      81             : private:
      82             :   absl::InlinedVector<StreamCallbacks*, 8> callbacks_;
      83             :   bool reset_callbacks_started_{};
      84             :   uint32_t high_watermark_callbacks_{};
      85             : };
      86             : 
      87             : // A base class shared between Http2 codec and Http3 codec to set a timeout for locally ended stream
      88             : // with buffered data and register the stream adapter.
      89             : class MultiplexedStreamImplBase : public Stream, public StreamCallbackHelper {
      90             : public:
      91        2206 :   MultiplexedStreamImplBase(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
      92        2199 :   ~MultiplexedStreamImplBase() override { ASSERT(stream_idle_timer_ == nullptr); }
      93             :   // TODO(mattklein123): Optimally this would be done in the destructor but there are currently
      94             :   // deferred delete lifetime issues that need sorting out if the destructor of the stream is
      95             :   // going to be able to refer to the parent connection.
      96        2206 :   virtual void destroy() { disarmStreamIdleTimer(); }
      97             : 
      98         267 :   void onLocalEndStream() {
      99         267 :     ASSERT(local_end_stream_);
     100         267 :     if (hasPendingData()) {
     101          89 :       createPendingFlushTimer();
     102          89 :     }
     103         267 :   }
     104             : 
     105        3370 :   void disarmStreamIdleTimer() {
     106        3370 :     if (stream_idle_timer_ != nullptr) {
     107             :       // To ease testing and the destructor assertion.
     108          85 :       stream_idle_timer_->disableTimer();
     109          85 :       stream_idle_timer_.reset();
     110          85 :     }
     111        3370 :   }
     112             : 
     113         424 :   CodecEventCallbacks* registerCodecEventCallbacks(CodecEventCallbacks* codec_callbacks) override {
     114         424 :     std::swap(codec_callbacks, codec_callbacks_);
     115         424 :     return codec_callbacks;
     116         424 :   }
     117             : 
     118             : protected:
     119         212 :   void setFlushTimeout(std::chrono::milliseconds timeout) override {
     120         212 :     stream_idle_timeout_ = timeout;
     121         212 :   }
     122             : 
     123          89 :   void createPendingFlushTimer() {
     124          89 :     ASSERT(stream_idle_timer_ == nullptr);
     125          89 :     if (stream_idle_timeout_.count() > 0) {
     126          85 :       stream_idle_timer_ = dispatcher_.createTimer([this] { onPendingFlushTimer(); });
     127          85 :       stream_idle_timer_->enableTimer(stream_idle_timeout_);
     128          85 :     }
     129          89 :   }
     130             : 
     131           0 :   virtual void onPendingFlushTimer() { stream_idle_timer_.reset(); }
     132             : 
     133             :   virtual bool hasPendingData() PURE;
     134             : 
     135             :   CodecEventCallbacks* codec_callbacks_{nullptr};
     136             : 
     137             : private:
     138             :   Event::Dispatcher& dispatcher_;
     139             :   // See HttpConnectionManager.stream_idle_timeout.
     140             :   std::chrono::milliseconds stream_idle_timeout_{};
     141             :   Event::TimerPtr stream_idle_timer_;
     142             : };
     143             : 
     144             : } // namespace Http
     145             : } // namespace Envoy

Generated by: LCOV version 1.15