Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/common/http/codec_helper.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include "envoy/event/dispatcher.h"
4
#include "envoy/event/timer.h"
5
#include "envoy/http/codec.h"
6
7
#include "source/common/common/assert.h"
8
9
#include "absl/container/inlined_vector.h"
10
11
namespace Envoy {
12
namespace Http {
13
14
class StreamCallbackHelper {
15
public:
16
2.26k
  void runLowWatermarkCallbacks() {
17
2.26k
    if (reset_callbacks_started_ || local_end_stream_) {
18
480
      return;
19
480
    }
20
1.78k
    ASSERT(high_watermark_callbacks_ > 0);
21
1.78k
    --high_watermark_callbacks_;
22
1.78k
    for (StreamCallbacks* callbacks : callbacks_) {
23
1.78k
      if (callbacks) {
24
1.78k
        callbacks->onBelowWriteBufferLowWatermark();
25
1.78k
      }
26
1.78k
    }
27
1.78k
  }
28
29
2.91k
  void runHighWatermarkCallbacks() {
30
2.91k
    if (reset_callbacks_started_ || local_end_stream_) {
31
349
      return;
32
349
    }
33
2.57k
    ++high_watermark_callbacks_;
34
2.57k
    for (StreamCallbacks* callbacks : callbacks_) {
35
2.57k
      if (callbacks) {
36
2.57k
        callbacks->onAboveWriteBufferHighWatermark();
37
2.57k
      }
38
2.57k
    }
39
2.57k
  }
40
41
67.7k
  void runResetCallbacks(StreamResetReason reason) {
42
    // Reset callbacks are a special case, and the only StreamCallbacks allowed
43
    // to run after local_end_stream_.
44
67.7k
    if (reset_callbacks_started_) {
45
9.69k
      return;
46
9.69k
    }
47
48
58.0k
    reset_callbacks_started_ = true;
49
58.0k
    for (StreamCallbacks* callbacks : callbacks_) {
50
30.4k
      if (callbacks) {
51
29.2k
        callbacks->onResetStream(reason, absl::string_view());
52
29.2k
      }
53
30.4k
    }
54
58.0k
  }
55
56
  bool local_end_stream_{};
57
58
protected:
59
156k
  void addCallbacksHelper(StreamCallbacks& callbacks) {
60
156k
    ASSERT(!reset_callbacks_started_ && !local_end_stream_);
61
156k
    callbacks_.push_back(&callbacks);
62
156k
    for (uint32_t i = 0; i < high_watermark_callbacks_; ++i) {
63
0
      callbacks.onAboveWriteBufferHighWatermark();
64
0
    }
65
156k
  }
66
67
12.5k
  void removeCallbacksHelper(StreamCallbacks& callbacks) {
68
    // For performance reasons we just clear the callback and do not resize the vector.
69
    // Reset callbacks scale with the number of filters per request and do not get added and
70
    // removed multiple times.
71
    // The vector may not be safely resized without making sure the run.*Callbacks() helper
72
    // functions above still handle removeCallbacksHelper() calls mid-loop.
73
13.7k
    for (auto& callback : callbacks_) {
74
13.7k
      if (callback == &callbacks) {
75
11.6k
        callback = nullptr;
76
11.6k
        return;
77
11.6k
      }
78
13.7k
    }
79
12.5k
  }
80
81
private:
82
  absl::InlinedVector<StreamCallbacks*, 8> callbacks_;
83
  bool reset_callbacks_started_{};
84
  uint32_t high_watermark_callbacks_{};
85
};
86
87
// A base class shared between Http2 codec and Http3 codec to set a timeout for locally ended stream
88
// with buffered data and register the stream adapter.
89
class MultiplexedStreamImplBase : public Stream, public StreamCallbackHelper {
90
public:
91
188k
  MultiplexedStreamImplBase(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
92
188k
  ~MultiplexedStreamImplBase() override { ASSERT(stream_idle_timer_ == nullptr); }
93
  // TODO(mattklein123): Optimally this would be done in the destructor but there are currently
94
  // deferred delete lifetime issues that need sorting out if the destructor of the stream is
95
  // going to be able to refer to the parent connection.
96
188k
  virtual void destroy() { disarmStreamIdleTimer(); }
97
98
5.12k
  void onLocalEndStream() {
99
5.12k
    ASSERT(local_end_stream_);
100
5.12k
    if (hasPendingData()) {
101
792
      createPendingFlushTimer();
102
792
    }
103
5.12k
  }
104
105
214k
  void disarmStreamIdleTimer() {
106
214k
    if (stream_idle_timer_ != nullptr) {
107
      // To ease testing and the destructor assertion.
108
106
      stream_idle_timer_->disableTimer();
109
106
      stream_idle_timer_.reset();
110
106
    }
111
214k
  }
112
113
3.63k
  CodecEventCallbacks* registerCodecEventCallbacks(CodecEventCallbacks* codec_callbacks) override {
114
3.63k
    std::swap(codec_callbacks, codec_callbacks_);
115
3.63k
    return codec_callbacks;
116
3.63k
  }
117
118
protected:
119
1.81k
  void setFlushTimeout(std::chrono::milliseconds timeout) override {
120
1.81k
    stream_idle_timeout_ = timeout;
121
1.81k
  }
122
123
792
  void createPendingFlushTimer() {
124
792
    ASSERT(stream_idle_timer_ == nullptr);
125
792
    if (stream_idle_timeout_.count() > 0) {
126
106
      stream_idle_timer_ = dispatcher_.createTimer([this] { onPendingFlushTimer(); });
127
106
      stream_idle_timer_->enableTimer(stream_idle_timeout_);
128
106
    }
129
792
  }
130
131
0
  virtual void onPendingFlushTimer() { stream_idle_timer_.reset(); }
132
133
  virtual bool hasPendingData() PURE;
134
135
  CodecEventCallbacks* codec_callbacks_{nullptr};
136
137
private:
138
  Event::Dispatcher& dispatcher_;
139
  // See HttpConnectionManager.stream_idle_timeout.
140
  std::chrono::milliseconds stream_idle_timeout_{};
141
  Event::TimerPtr stream_idle_timer_;
142
};
143
144
} // namespace Http
145
} // namespace Envoy