Lines
100 %
Functions
#pragma once
#include "envoy/event/dispatcher.h"
#include "envoy/event/scaled_timer.h"
#include "envoy/event/timer.h"
#include "envoy/http/codec.h"
#include "source/common/common/assert.h"
#include "absl/container/inlined_vector.h"
namespace Envoy {
namespace Http {
class StreamCallbackHelper {
public:
void runLowWatermarkCallbacks() {
if (reset_callbacks_started_ || local_end_stream_) {
return;
}
ASSERT(high_watermark_callbacks_ > 0);
--high_watermark_callbacks_;
for (StreamCallbacks* callbacks : callbacks_) {
if (callbacks) {
callbacks->onBelowWriteBufferLowWatermark();
void runHighWatermarkCallbacks() {
++high_watermark_callbacks_;
callbacks->onAboveWriteBufferHighWatermark();
void runResetCallbacks(StreamResetReason reason, absl::string_view details) {
// Reset callbacks are a special case, and the only StreamCallbacks allowed
// to run after local_end_stream_.
if (reset_callbacks_started_) {
reset_callbacks_started_ = true;
callbacks->onResetStream(reason, details);
bool local_end_stream_{};
protected:
void addCallbacksHelper(StreamCallbacks& callbacks) {
ASSERT(!reset_callbacks_started_ && !local_end_stream_);
callbacks_.push_back(&callbacks);
for (uint32_t i = 0; i < high_watermark_callbacks_; ++i) {
callbacks.onAboveWriteBufferHighWatermark();
void removeCallbacksHelper(StreamCallbacks& callbacks) {
// For performance reasons we just clear the callback and do not resize the vector.
// Reset callbacks scale with the number of filters per request and do not get added and
// removed multiple times.
// The vector may not be safely resized without making sure the run.*Callbacks() helper
// functions above still handle removeCallbacksHelper() calls mid-loop.
for (auto& callback : callbacks_) {
if (callback == &callbacks) {
callback = nullptr;
private:
absl::InlinedVector<StreamCallbacks*, 8> callbacks_;
bool reset_callbacks_started_{};
uint32_t high_watermark_callbacks_{};
};
// A base class shared between Http2 codec and Http3 codec to set a timeout for locally ended stream
// with buffered data and register the stream adapter.
class MultiplexedStreamImplBase : public Stream, public StreamCallbackHelper {
MultiplexedStreamImplBase(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
~MultiplexedStreamImplBase() override { ASSERT(stream_flush_timer_ == nullptr); }
// TODO(mattklein123): Optimally this would be done in the destructor but there are currently
// deferred delete lifetime issues that need sorting out if the destructor of the stream is
// going to be able to refer to the parent connection.
virtual void destroy() { disarmStreamFlushTimer(); }
void onLocalEndStream() {
ASSERT(local_end_stream_);
if (hasPendingData()) {
createPendingFlushTimer();
void disarmStreamFlushTimer() {
if (stream_flush_timer_ != nullptr) {
// To ease testing and the destructor assertion.
stream_flush_timer_->disableTimer();
stream_flush_timer_.reset();
CodecEventCallbacks* registerCodecEventCallbacks(CodecEventCallbacks* codec_callbacks) override {
std::swap(codec_callbacks, codec_callbacks_);
return codec_callbacks;
void setFlushTimeout(std::chrono::milliseconds timeout) override {
stream_flush_timeout_ = timeout;
void createPendingFlushTimer() {
ASSERT(stream_flush_timer_ == nullptr);
if (stream_flush_timeout_.count() > 0) {
stream_flush_timer_ = dispatcher_.createScaledTimer(
Event::ScaledTimerType::HttpDownstreamStreamFlush, [this] { onPendingFlushTimer(); });
stream_flush_timer_->enableTimer(stream_flush_timeout_);
virtual void onPendingFlushTimer() { stream_flush_timer_.reset(); }
virtual bool hasPendingData() PURE;
CodecEventCallbacks* codec_callbacks_{nullptr};
bool codec_low_level_reset_is_called_{false};
Event::Dispatcher& dispatcher_;
// See HttpConnectionManager.stream_flush_timeout.
std::chrono::milliseconds stream_flush_timeout_{};
Event::TimerPtr stream_flush_timer_;
} // namespace Http
} // namespace Envoy