Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/common/http/codec_helper.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include "envoy/event/dispatcher.h"
4
#include "envoy/event/timer.h"
5
#include "envoy/http/codec.h"
6
7
#include "source/common/common/assert.h"
8
9
#include "absl/container/inlined_vector.h"
10
11
namespace Envoy {
12
namespace Http {
13
14
class StreamCallbackHelper {
15
public:
16
2.06k
  void runLowWatermarkCallbacks() {
17
2.06k
    if (reset_callbacks_started_ || local_end_stream_) {
18
418
      return;
19
418
    }
20
1.65k
    ASSERT(high_watermark_callbacks_ > 0);
21
1.65k
    --high_watermark_callbacks_;
22
1.65k
    for (StreamCallbacks* callbacks : callbacks_) {
23
1.65k
      if (callbacks) {
24
1.65k
        callbacks->onBelowWriteBufferLowWatermark();
25
1.65k
      }
26
1.65k
    }
27
1.65k
  }
28
29
2.62k
  void runHighWatermarkCallbacks() {
30
2.62k
    if (reset_callbacks_started_ || local_end_stream_) {
31
439
      return;
32
439
    }
33
2.18k
    ++high_watermark_callbacks_;
34
2.18k
    for (StreamCallbacks* callbacks : callbacks_) {
35
2.18k
      if (callbacks) {
36
2.18k
        callbacks->onAboveWriteBufferHighWatermark();
37
2.18k
      }
38
2.18k
    }
39
2.18k
  }
40
41
48.7k
  void runResetCallbacks(StreamResetReason reason, absl::string_view details) {
42
    // Reset callbacks are a special case, and the only StreamCallbacks allowed
43
    // to run after local_end_stream_.
44
48.7k
    if (reset_callbacks_started_) {
45
5.82k
      return;
46
5.82k
    }
47
48
42.8k
    reset_callbacks_started_ = true;
49
42.8k
    for (StreamCallbacks* callbacks : callbacks_) {
50
36.7k
      if (callbacks) {
51
35.9k
        callbacks->onResetStream(reason, details);
52
35.9k
      }
53
36.7k
    }
54
42.8k
  }
55
56
  bool local_end_stream_{};
57
58
protected:
59
161k
  void addCallbacksHelper(StreamCallbacks& callbacks) {
60
161k
    ASSERT(!reset_callbacks_started_ && !local_end_stream_);
61
161k
    callbacks_.push_back(&callbacks);
62
161k
    for (uint32_t i = 0; i < high_watermark_callbacks_; ++i) {
63
0
      callbacks.onAboveWriteBufferHighWatermark();
64
0
    }
65
161k
  }
66
67
5.92k
  void removeCallbacksHelper(StreamCallbacks& callbacks) {
68
    // For performance reasons we just clear the callback and do not resize the vector.
69
    // Reset callbacks scale with the number of filters per request and do not get added and
70
    // removed multiple times.
71
    // The vector may not be safely resized without making sure the run.*Callbacks() helper
72
    // functions above still handle removeCallbacksHelper() calls mid-loop.
73
6.77k
    for (auto& callback : callbacks_) {
74
6.77k
      if (callback == &callbacks) {
75
5.23k
        callback = nullptr;
76
5.23k
        return;
77
5.23k
      }
78
6.77k
    }
79
5.92k
  }
80
81
private:
82
  absl::InlinedVector<StreamCallbacks*, 8> callbacks_;
83
  bool reset_callbacks_started_{};
84
  uint32_t high_watermark_callbacks_{};
85
};
86
87
// A base class shared between Http2 codec and Http3 codec to set a timeout for locally ended stream
88
// with buffered data and register the stream adapter.
89
class MultiplexedStreamImplBase : public Stream, public StreamCallbackHelper {
90
public:
91
169k
  MultiplexedStreamImplBase(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
92
169k
  ~MultiplexedStreamImplBase() override { ASSERT(stream_idle_timer_ == nullptr); }
93
  // TODO(mattklein123): Optimally this would be done in the destructor but there are currently
94
  // deferred delete lifetime issues that need sorting out if the destructor of the stream is
95
  // going to be able to refer to the parent connection.
96
169k
  virtual void destroy() { disarmStreamIdleTimer(); }
97
98
4.28k
  void onLocalEndStream() {
99
4.28k
    ASSERT(local_end_stream_);
100
4.28k
    if (hasPendingData()) {
101
733
      createPendingFlushTimer();
102
733
    }
103
4.28k
  }
104
105
199k
  void disarmStreamIdleTimer() {
106
199k
    if (stream_idle_timer_ != nullptr) {
107
      // To ease testing and the destructor assertion.
108
49
      stream_idle_timer_->disableTimer();
109
49
      stream_idle_timer_.reset();
110
49
    }
111
199k
  }
112
113
3.29k
  CodecEventCallbacks* registerCodecEventCallbacks(CodecEventCallbacks* codec_callbacks) override {
114
3.29k
    std::swap(codec_callbacks, codec_callbacks_);
115
3.29k
    return codec_callbacks;
116
3.29k
  }
117
118
protected:
119
1.64k
  void setFlushTimeout(std::chrono::milliseconds timeout) override {
120
1.64k
    stream_idle_timeout_ = timeout;
121
1.64k
  }
122
123
733
  void createPendingFlushTimer() {
124
733
    ASSERT(stream_idle_timer_ == nullptr);
125
733
    if (stream_idle_timeout_.count() > 0) {
126
49
      stream_idle_timer_ = dispatcher_.createTimer([this] { onPendingFlushTimer(); });
127
49
      stream_idle_timer_->enableTimer(stream_idle_timeout_);
128
49
    }
129
733
  }
130
131
0
  virtual void onPendingFlushTimer() { stream_idle_timer_.reset(); }
132
133
  virtual bool hasPendingData() PURE;
134
135
  CodecEventCallbacks* codec_callbacks_{nullptr};
136
137
private:
138
  Event::Dispatcher& dispatcher_;
139
  // See HttpConnectionManager.stream_idle_timeout.
140
  std::chrono::milliseconds stream_idle_timeout_{};
141
  Event::TimerPtr stream_idle_timer_;
142
};
143
144
} // namespace Http
145
} // namespace Envoy