Line data Source code
1 : #pragma once
2 :
3 : #include <chrono>
4 : #include <cstdint>
5 : #include <functional>
6 : #include <list>
7 : #include <memory>
8 : #include <optional>
9 : #include <string>
10 : #include <vector>
11 :
12 : #include "envoy/access_log/access_log.h"
13 : #include "envoy/common/optref.h"
14 : #include "envoy/common/random_generator.h"
15 : #include "envoy/common/scope_tracker.h"
16 : #include "envoy/common/time.h"
17 : #include "envoy/event/deferred_deletable.h"
18 : #include "envoy/http/api_listener.h"
19 : #include "envoy/http/codec.h"
20 : #include "envoy/http/codes.h"
21 : #include "envoy/http/context.h"
22 : #include "envoy/http/filter.h"
23 : #include "envoy/http/header_map.h"
24 : #include "envoy/network/connection.h"
25 : #include "envoy/network/drain_decision.h"
26 : #include "envoy/network/filter.h"
27 : #include "envoy/router/rds.h"
28 : #include "envoy/router/scopes.h"
29 : #include "envoy/runtime/runtime.h"
30 : #include "envoy/server/overload/overload_manager.h"
31 : #include "envoy/ssl/connection.h"
32 : #include "envoy/stats/scope.h"
33 : #include "envoy/stats/stats_macros.h"
34 : #include "envoy/stream_info/filter_state.h"
35 : #include "envoy/tracing/tracer.h"
36 : #include "envoy/upstream/upstream.h"
37 :
38 : #include "source/common/buffer/watermark_buffer.h"
39 : #include "source/common/common/dump_state_utils.h"
40 : #include "source/common/common/linked_object.h"
41 : #include "source/common/grpc/common.h"
42 : #include "source/common/http/conn_manager_config.h"
43 : #include "source/common/http/filter_manager.h"
44 : #include "source/common/http/user_agent.h"
45 : #include "source/common/http/utility.h"
46 : #include "source/common/local_reply/local_reply.h"
47 : #include "source/common/network/proxy_protocol_filter_state.h"
48 : #include "source/common/router/scoped_rds.h"
49 : #include "source/common/stream_info/stream_info_impl.h"
50 : #include "source/common/tracing/http_tracer_impl.h"
51 :
52 : namespace Envoy {
53 : namespace Http {
54 :
55 : /**
56 : * Implementation of both ConnectionManager and ServerConnectionCallbacks. This is a
57 : * Network::Filter that can be installed on a connection that will perform HTTP protocol agnostic
58 : * handling of a connection and all requests/pushes that occur on a connection.
59 : */
60 : class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
61 : public Network::ReadFilter,
62 : public ServerConnectionCallbacks,
63 : public Network::ConnectionCallbacks,
64 : public Http::ApiListener {
65 : public:
66 : ConnectionManagerImpl(ConnectionManagerConfig& config, const Network::DrainDecision& drain_close,
67 : Random::RandomGenerator& random_generator, Http::Context& http_context,
68 : Runtime::Loader& runtime, const LocalInfo::LocalInfo& local_info,
69 : Upstream::ClusterManager& cluster_manager,
70 : Server::OverloadManager& overload_manager, TimeSource& time_system);
71 : ~ConnectionManagerImpl() override;
72 :
73 : static ConnectionManagerStats generateStats(const std::string& prefix, Stats::Scope& scope);
74 : static ConnectionManagerTracingStats generateTracingStats(const std::string& prefix,
75 : Stats::Scope& scope);
76 : static void chargeTracingStats(const Tracing::Reason& tracing_reason,
77 : ConnectionManagerTracingStats& tracing_stats);
78 : static ConnectionManagerListenerStats generateListenerStats(const std::string& prefix,
79 : Stats::Scope& scope);
80 : static const ResponseHeaderMap& continueHeader();
81 :
82 : // Currently the ConnectionManager creates a codec lazily when either:
83 : // a) onConnection for H3.
84 : // b) onData for H1 and H2.
85 : // With the introduction of ApiListeners, neither event occurs. This function allows consumer code
86 : // to manually create a codec.
87 : // TODO(junr03): consider passing a synthetic codec instead of creating once. The codec in the
88 : // ApiListener case is solely used to determine the protocol version.
89 : void createCodec(Buffer::Instance& data);
90 :
91 : // Network::ReadFilter
92 : Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override;
93 : Network::FilterStatus onNewConnection() override;
94 : void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override;
95 :
96 : // Http::ConnectionCallbacks
97 : void onGoAway(GoAwayErrorCode error_code) override;
98 :
99 : // Http::ServerConnectionCallbacks
100 : RequestDecoder& newStream(ResponseEncoder& response_encoder,
101 : bool is_internally_created = false) override;
102 :
103 : RequestDecoderHandlePtr newStreamHandle(ResponseEncoder& response_encoder,
104 : bool is_internally_created = false) override;
105 :
106 : // Network::ConnectionCallbacks
107 : void onEvent(Network::ConnectionEvent event) override;
108 : // Pass connection watermark events on to all the streams associated with that connection.
109 0 : void onAboveWriteBufferHighWatermark() override {
110 0 : codec_->onUnderlyingConnectionAboveWriteBufferHighWatermark();
111 0 : }
112 0 : void onBelowWriteBufferLowWatermark() override {
113 0 : codec_->onUnderlyingConnectionBelowWriteBufferLowWatermark();
114 0 : }
115 :
116 1342 : TimeSource& timeSource() { return time_source_; }
117 :
118 0 : void setClearHopByHopResponseHeaders(bool value) { clear_hop_by_hop_response_headers_ = value; }
119 0 : bool clearHopByHopResponseHeaders() const { return clear_hop_by_hop_response_headers_; }
120 :
121 : // This runtime key configures the number of streams which must be closed on a connection before
122 : // envoy will potentially drain a connection due to excessive prematurely reset streams.
123 : static const absl::string_view PrematureResetTotalStreamCountKey;
124 :
125 : // The minimum lifetime of a stream, in seconds, in order not to be considered
126 : // prematurely closed.
127 : static const absl::string_view PrematureResetMinStreamLifetimeSecondsKey;
128 : static const absl::string_view MaxRequestsPerIoCycle;
129 : static const absl::string_view OptionallyDelayClose;
130 :
131 : private:
132 : struct ActiveStream;
133 : class MobileConnectionManagerImpl;
134 :
135 : class RdsRouteConfigUpdateRequester {
136 : public:
137 : RdsRouteConfigUpdateRequester(Router::RouteConfigProvider* route_config_provider,
138 : ActiveStream& parent)
139 573 : : route_config_provider_(route_config_provider), parent_(parent) {}
140 :
141 : RdsRouteConfigUpdateRequester(Config::ConfigProvider* scoped_route_config_provider,
142 : OptRef<const Router::ScopeKeyBuilder> scope_key_builder,
143 : ActiveStream& parent)
144 : // Expect the dynamic cast to succeed because only ScopedRdsConfigProvider is fully
145 : // implemented. Inline provider will be cast to nullptr here but it is not full implemented
146 : // and can't not be used at this point. Should change this implementation if we have a
147 : // functional inline scope route provider in the future.
148 : : scoped_route_config_provider_(
149 : dynamic_cast<Router::ScopedRdsConfigProvider*>(scoped_route_config_provider)),
150 0 : scope_key_builder_(scope_key_builder), parent_(parent) {}
151 :
152 : void
153 : requestRouteConfigUpdate(Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb);
154 : void requestVhdsUpdate(const std::string& host_header,
155 : Event::Dispatcher& thread_local_dispatcher,
156 : Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb);
157 : void requestSrdsUpdate(Router::ScopeKeyPtr scope_key,
158 : Event::Dispatcher& thread_local_dispatcher,
159 : Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb);
160 :
161 : private:
162 : Router::RouteConfigProvider* route_config_provider_;
163 : Router::ScopedRdsConfigProvider* scoped_route_config_provider_;
164 : OptRef<const Router::ScopeKeyBuilder> scope_key_builder_;
165 : ActiveStream& parent_;
166 : };
167 :
168 : /**
169 : * Wraps a single active stream on the connection. These are either full request/response pairs
170 : * or pushes.
171 : */
172 : struct ActiveStream final : LinkedObject<ActiveStream>,
173 : public Event::DeferredDeletable,
174 : public StreamCallbacks,
175 : public CodecEventCallbacks,
176 : public RequestDecoder,
177 : public Tracing::Config,
178 : public ScopeTrackedObject,
179 : public FilterManagerCallbacks,
180 : public DownstreamStreamFilterCallbacks {
181 : ActiveStream(ConnectionManagerImpl& connection_manager, uint32_t buffer_limit,
182 : Buffer::BufferMemoryAccountSharedPtr account);
183 : void completeRequest();
184 :
185 : const Network::Connection* connection();
186 0 : uint64_t streamId() { return stream_id_; }
187 :
188 : // Http::StreamCallbacks
189 : void onResetStream(StreamResetReason reason,
190 : absl::string_view transport_failure_reason) override;
191 : void onAboveWriteBufferHighWatermark() override;
192 : void onBelowWriteBufferLowWatermark() override;
193 :
194 : // Http::CodecEventCallbacks
195 : void onCodecEncodeComplete() override;
196 : void onCodecLowLevelReset() override;
197 :
198 : // Http::StreamDecoder
199 : void decodeData(Buffer::Instance& data, bool end_stream) override;
200 : void decodeMetadata(MetadataMapPtr&&) override;
201 :
202 : // Mark that the last downstream byte is received, and the downstream stream is complete.
203 : void maybeEndDecode(bool end_stream);
204 :
205 : // Http::RequestDecoder
206 : void decodeHeaders(RequestHeaderMapSharedPtr&& headers, bool end_stream) override;
207 : void decodeTrailers(RequestTrailerMapPtr&& trailers) override;
208 671 : StreamInfo::StreamInfo& streamInfo() override { return filter_manager_.streamInfo(); }
209 : void sendLocalReply(Code code, absl::string_view body,
210 : const std::function<void(ResponseHeaderMap& headers)>& modify_headers,
211 : const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
212 121 : absl::string_view details) override {
213 121 : return filter_manager_.sendLocalReply(code, body, modify_headers, grpc_status, details);
214 121 : }
215 0 : std::list<AccessLog::InstanceSharedPtr> accessLogHandlers() override {
216 0 : return filter_manager_.accessLogHandlers();
217 0 : }
218 : // Hand off headers/trailers and stream info to the codec's response encoder, for logging later
219 : // (i.e. possibly after this stream has been destroyed).
220 : //
221 : // TODO(paulsohn): Investigate whether we can move the headers/trailers and stream info required
222 : // for logging instead of copying them (as is currently done in the HTTP/3 implementation) or
223 : // using a shared pointer. See
224 : // https://github.com/envoyproxy/envoy/pull/23648#discussion_r1066095564 for more details.
225 0 : void deferHeadersAndTrailers() {
226 0 : response_encoder_->setDeferredLoggingHeadersAndTrailers(request_headers_, response_headers_,
227 0 : response_trailers_, streamInfo());
228 0 : }
229 :
230 : // ScopeTrackedObject
231 0 : void dumpState(std::ostream& os, int indent_level = 0) const override {
232 0 : const char* spaces = spacesForLevel(indent_level);
233 0 : os << spaces << "ActiveStream " << this << DUMP_MEMBER(stream_id_);
234 :
235 0 : DUMP_DETAILS(&filter_manager_);
236 0 : }
237 :
238 : // FilterManagerCallbacks
239 : void encodeHeaders(ResponseHeaderMap& response_headers, bool end_stream) override;
240 : void encode1xxHeaders(ResponseHeaderMap& response_headers) override;
241 : void encodeData(Buffer::Instance& data, bool end_stream) override;
242 : void encodeTrailers(ResponseTrailerMap& trailers) override;
243 : void encodeMetadata(MetadataMapPtr&& metadata) override;
244 0 : void setRequestTrailers(Http::RequestTrailerMapPtr&& request_trailers) override {
245 0 : ASSERT(!request_trailers_);
246 0 : request_trailers_ = std::move(request_trailers);
247 0 : }
248 3 : void setInformationalHeaders(Http::ResponseHeaderMapPtr&& informational_headers) override {
249 3 : ASSERT(!informational_headers_);
250 3 : informational_headers_ = std::move(informational_headers);
251 3 : }
252 562 : void setResponseHeaders(Http::ResponseHeaderMapPtr&& response_headers) override {
253 0 : // We'll overwrite the headers in the case where we fail the stream after upstream headers
254 0 : // have begun filter processing but before they have been sent downstream.
255 562 : response_headers_ = std::move(response_headers);
256 562 : }
257 1 : void setResponseTrailers(Http::ResponseTrailerMapPtr&& response_trailers) override {
258 1 : response_trailers_ = std::move(response_trailers);
259 1 : }
260 : void chargeStats(const ResponseHeaderMap& headers) override;
261 :
262 3909 : Http::RequestHeaderMapOptRef requestHeaders() override {
263 3909 : return makeOptRefFromPtr(request_headers_.get());
264 3909 : }
265 641 : Http::RequestTrailerMapOptRef requestTrailers() override {
266 641 : return makeOptRefFromPtr(request_trailers_.get());
267 641 : }
268 386 : Http::ResponseHeaderMapOptRef informationalHeaders() override {
269 386 : return makeOptRefFromPtr(informational_headers_.get());
270 386 : }
271 2193 : Http::ResponseHeaderMapOptRef responseHeaders() override {
272 2193 : return makeOptRefFromPtr(response_headers_.get());
273 2193 : }
274 1597 : Http::ResponseTrailerMapOptRef responseTrailers() override {
275 1597 : return makeOptRefFromPtr(response_trailers_.get());
276 1597 : }
277 :
278 555 : void endStream() override {
279 555 : ASSERT(!state_.codec_saw_local_complete_);
280 555 : state_.codec_saw_local_complete_ = true;
281 555 : connection_manager_.doEndStream(*this);
282 555 : }
283 : void onDecoderFilterBelowWriteBufferLowWatermark() override;
284 : void onDecoderFilterAboveWriteBufferHighWatermark() override;
285 2 : void upgradeFilterChainCreated() override {
286 2 : connection_manager_.stats_.named_.downstream_cx_upgrades_total_.inc();
287 2 : connection_manager_.stats_.named_.downstream_cx_upgrades_active_.inc();
288 2 : state_.successful_upgrade_ = true;
289 2 : }
290 : void disarmRequestTimeout() override;
291 : void resetIdleTimer() override;
292 : void recreateStream(StreamInfo::FilterStateSharedPtr filter_state) override;
293 : void resetStream(Http::StreamResetReason reset_reason = Http::StreamResetReason::LocalReset,
294 : absl::string_view transport_failure_reason = "") override;
295 : const Router::RouteEntry::UpgradeMap* upgradeMap() override;
296 : Upstream::ClusterInfoConstSharedPtr clusterInfo() override;
297 : Tracing::Span& activeSpan() override;
298 : void onResponseDataTooLarge() override;
299 : void onRequestDataTooLarge() override;
300 : Http1StreamEncoderOptionsOptRef http1StreamEncoderOptions() override;
301 : void onLocalReply(Code code) override;
302 : OptRef<const Tracing::Config> tracingConfig() const override;
303 : const ScopeTrackedObject& scope() override;
304 2493 : OptRef<DownstreamStreamFilterCallbacks> downstreamCallbacks() override { return *this; }
305 :
306 : // DownstreamStreamFilterCallbacks
307 : void setRoute(Router::RouteConstSharedPtr route) override;
308 : Router::RouteConstSharedPtr route(const Router::RouteCallback& cb) override;
309 : void clearRouteCache() override;
310 : void requestRouteConfigUpdate(
311 : Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) override;
312 :
313 : // Set cached route. This method should never be called directly. This is only called in the
314 : // setRoute(), clearRouteCache(), and refreshCachedRoute() methods.
315 : void setCachedRoute(absl::optional<Router::RouteConstSharedPtr>&& route);
316 : // Block the route cache and clear the snapped route config. By doing this the route cache will
317 : // not be updated. And if the route config is updated by the RDS, the snapped route config may
318 : // be freed before the stream is destroyed.
319 : // This will be called automatically at the end of handle response headers.
320 : void blockRouteCache();
321 : // Return true if the cached route is blocked.
322 1256 : bool routeCacheBlocked() const {
323 1256 : ENVOY_BUG(!route_cache_blocked_, "Should never try to refresh or clear the route cache when "
324 1256 : "it is blocked!");
325 1256 : return route_cache_blocked_;
326 1256 : }
327 :
328 : absl::optional<Router::ConfigConstSharedPtr> routeConfig();
329 : void traceRequest();
330 :
331 : // Updates the snapped_route_config_ (by reselecting scoped route configuration), if a scope is
332 : // not found, snapped_route_config_ is set to Router::NullConfigImpl.
333 : void snapScopedRouteConfig();
334 :
335 : void refreshCachedRoute();
336 : void refreshCachedRoute(const Router::RouteCallback& cb);
337 :
338 : void refreshCachedTracingCustomTags();
339 : void refreshDurationTimeout();
340 : void refreshIdleTimeout();
341 : void refreshAccessLogFlushTimer();
342 :
343 : // All state for the stream. Put here for readability.
344 : struct State {
345 : State()
346 : : codec_saw_local_complete_(false), codec_encode_complete_(false),
347 : on_reset_stream_called_(false), is_zombie_stream_(false), successful_upgrade_(false),
348 : is_internally_destroyed_(false), is_internally_created_(false), is_tunneling_(false),
349 671 : decorated_propagate_(true), deferred_to_next_io_iteration_(false) {}
350 :
351 : // It's possibly for the codec to see the completed response but not fully
352 : // encode it.
353 : bool codec_saw_local_complete_ : 1; // This indicates that local is complete as the completed
354 : // response has made its way to the codec.
355 : bool codec_encode_complete_ : 1; // This indicates that the codec has
356 : // completed encoding the response.
357 : bool on_reset_stream_called_ : 1; // Whether the stream has been reset.
358 : bool is_zombie_stream_ : 1; // Whether stream is waiting for signal
359 : // the underlying codec to be destroyed.
360 : bool successful_upgrade_ : 1;
361 :
362 : // True if this stream was the original externally created stream, but was
363 : // destroyed as part of internal redirect.
364 : bool is_internally_destroyed_ : 1;
365 : // True if this stream is internally created. Currently only used for
366 : // internal redirects or other streams created via recreateStream().
367 : bool is_internally_created_ : 1;
368 :
369 : // True if the response headers indicate a successful upgrade or connect
370 : // response.
371 : bool is_tunneling_ : 1;
372 :
373 : bool decorated_propagate_ : 1;
374 :
375 : // Indicates that sending headers to the filter manager is deferred to the
376 : // next I/O cycle. If data or trailers are received when this flag is set
377 : // they are deferred too.
378 : // TODO(yanavlasov): encapsulate the entire state of deferred streams into a separate
379 : // structure, so it can be atomically created and cleared.
380 : bool deferred_to_next_io_iteration_ : 1;
381 : bool deferred_end_stream_ : 1;
382 : };
383 :
384 765 : bool canDestroyStream() const {
385 765 : return state_.on_reset_stream_called_ || state_.codec_encode_complete_ ||
386 765 : state_.is_internally_destroyed_;
387 765 : }
388 :
389 : // Per-stream idle timeout callback.
390 : void onIdleTimeout();
391 : // Per-stream request timeout callback.
392 : void onRequestTimeout();
393 : // Per-stream request header timeout callback.
394 : void onRequestHeaderTimeout();
395 : // Per-stream alive duration reached.
396 : void onStreamMaxDurationReached();
397 1765 : bool hasCachedRoute() { return cached_route_.has_value() && cached_route_.value(); }
398 :
399 : // Return local port of the connection.
400 : uint32_t localPort();
401 :
402 0 : friend std::ostream& operator<<(std::ostream& os, const ActiveStream& s) {
403 0 : s.dumpState(os);
404 0 : return os;
405 0 : }
406 :
407 0 : Tracing::CustomTagMap& getOrMakeTracingCustomTagMap() {
408 0 : if (tracing_custom_tags_ == nullptr) {
409 0 : tracing_custom_tags_ = std::make_unique<Tracing::CustomTagMap>();
410 0 : }
411 0 : return *tracing_custom_tags_;
412 0 : }
413 :
414 : // Note: this method is a noop unless ENVOY_ENABLE_UHV is defined
415 : // Call header validator extension to validate request header map after it was deserialized.
416 : // If header map failed validation, it sends an error response and returns false.
417 : bool validateHeaders();
418 :
419 : // Note: this method is a noop unless ENVOY_ENABLE_UHV is defined
420 : // Call header validator extension to validate the request trailer map after it was
421 : // deserialized. If the trailer map failed validation, this method does the following:
422 : // 1. For H/1 it sends 400 response and returns false.
423 : // 2. For H/2 and H/3 it resets the stream (without error response). Issue #24735 is filed to
424 : // harmonize this behavior with H/1.
425 : // 3. If the `stream_error_on_invalid_http_message` is set to `false` (it is by default) in the
426 : // HTTP connection manager configuration, then the entire connection is closed.
427 : bool validateTrailers();
428 :
429 0 : std::weak_ptr<bool> stillAlive() { return {still_alive_}; }
430 :
431 : // Dispatch deferred headers, body and trailers to the filter manager.
432 : // Return true if this stream was deferred and dispatched pending headers, body and trailers (if
433 : // present). Return false if this stream was not deferred.
434 : bool onDeferredRequestProcessing();
435 :
436 : ConnectionManagerImpl& connection_manager_;
437 : OptRef<const TracingConnectionManagerConfig> connection_manager_tracing_config_;
438 : // TODO(snowp): It might make sense to move this to the FilterManager to avoid storing it in
439 : // both locations, then refer to the FM when doing stream logs.
440 : const uint64_t stream_id_;
441 :
442 : RequestHeaderMapSharedPtr request_headers_;
443 : RequestTrailerMapPtr request_trailers_;
444 :
445 : ResponseHeaderMapPtr informational_headers_;
446 : ResponseHeaderMapSharedPtr response_headers_;
447 : ResponseTrailerMapSharedPtr response_trailers_;
448 :
449 : // Note: The FM must outlive the above headers, as they are possibly accessed during filter
450 : // destruction.
451 : DownstreamFilterManager filter_manager_;
452 :
453 : Tracing::SpanPtr active_span_;
454 : ResponseEncoder* response_encoder_{};
455 : Stats::TimespanPtr request_response_timespan_;
456 : // Per-stream idle timeout. This timer gets reset whenever activity occurs on the stream, and,
457 : // when triggered, will close the stream.
458 : Event::TimerPtr stream_idle_timer_;
459 : // Per-stream request timeout. This timer is enabled when the stream is created and disabled
460 : // when the stream ends. If triggered, it will close the stream.
461 : Event::TimerPtr request_timer_;
462 : // Per-stream request header timeout. This timer is enabled when the stream is created and
463 : // disabled when the downstream finishes sending headers. If triggered, it will close the
464 : // stream.
465 : Event::TimerPtr request_header_timer_;
466 : // Per-stream alive duration. This timer is enabled once when the stream is created and, if
467 : // triggered, will close the stream.
468 : Event::TimerPtr max_stream_duration_timer_;
469 : // Per-stream access log flush duration. This timer is enabled once when the stream is created
470 : // and will log to all access logs once per trigger.
471 : Event::TimerPtr access_log_flush_timer_;
472 :
473 : std::chrono::milliseconds idle_timeout_ms_{};
474 : State state_;
475 :
476 : // Snapshot of the route configuration at the time of request is started. This is used to ensure
477 : // that the same route configuration is used throughout the lifetime of the request. This
478 : // snapshot will be cleared when the cached route is blocked. Because after that we will not
479 : // refresh the cached route and release this snapshot can help to release the memory when the
480 : // route configuration is updated frequently and the request is long-lived.
481 : Router::ConfigConstSharedPtr snapped_route_config_;
482 : Router::ScopedConfigConstSharedPtr snapped_scoped_routes_config_;
483 : // This is used to track the route that has been cached in the request. And we will keep this
484 : // route alive until the request is finished.
485 : absl::optional<Router::RouteConstSharedPtr> cached_route_;
486 : // This is used to track whether the route has been blocked. If the route is blocked, we can not
487 : // clear it or refresh it.
488 : bool route_cache_blocked_{false};
489 : // This is used to track routes that have been cleared from the request. By this way, all the
490 : // configurations that have been used in the processing of the request will be alive until the
491 : // request is finished.
492 : // For example, if a filter stored a per-route config in the decoding phase and may try to
493 : // use it in the encoding phase, but the route is cleared and refreshed by another decoder
494 : // filter, we must keep the per-route config alive to avoid use-after-free.
495 : // Note that we assume that the number of routes that have been cleared is small. So we use
496 : // inline vector to avoid heap allocation. If this assumption is wrong, we should consider using
497 : // a list or other data structures.
498 : //
499 : // TODO(wbpcode): This is a helpless compromise. To avoid exposing the complexity of the route
500 : // lifetime management to every HTTP filter, we do a hack here. But if every filter could manage
501 : // the lifetime of the route config by itself easily, we could remove this hack.
502 : absl::InlinedVector<Router::RouteConstSharedPtr, 3> cleared_cached_routes_;
503 :
504 : absl::optional<Upstream::ClusterInfoConstSharedPtr> cached_cluster_info_;
505 : const std::string* decorated_operation_{nullptr};
506 : std::unique_ptr<RdsRouteConfigUpdateRequester> route_config_update_requester_;
507 : std::unique_ptr<Tracing::CustomTagMap> tracing_custom_tags_{nullptr};
508 : Http::ServerHeaderValidatorPtr header_validator_;
509 :
510 : friend FilterManager;
511 :
512 : private:
513 : // Keep these methods private to ensure that these methods are only called by the reference
514 : // returned by the public tracingConfig() method.
515 : // Tracing::TracingConfig
516 : Tracing::OperationName operationName() const override;
517 : const Tracing::CustomTagMap* customTags() const override;
518 : bool verbose() const override;
519 : uint32_t maxPathTagLength() const override;
520 : bool spawnUpstreamSpan() const override;
521 :
522 : std::shared_ptr<bool> still_alive_ = std::make_shared<bool>(true);
523 : std::unique_ptr<Buffer::OwnedImpl> deferred_data_;
524 : std::queue<MetadataMapPtr> deferred_metadata_;
525 : };
526 :
527 : using ActiveStreamPtr = std::unique_ptr<ActiveStream>;
528 :
529 : class ActiveStreamHandle : public RequestDecoderHandle {
530 : public:
531 : explicit ActiveStreamHandle(ActiveStream& stream)
532 0 : : valid_(stream.stillAlive()), stream_(stream) {}
533 :
534 0 : ~ActiveStreamHandle() override = default;
535 :
536 0 : OptRef<RequestDecoder> get() override {
537 0 : if (valid_.expired()) {
538 0 : return {};
539 0 : }
540 0 : return stream_;
541 0 : }
542 :
543 : private:
544 : std::weak_ptr<bool> valid_;
545 : ActiveStream& stream_;
546 : };
547 :
548 : class HttpStreamIdProviderImpl : public StreamInfo::StreamIdProvider {
549 : public:
550 671 : HttpStreamIdProviderImpl(ActiveStream& parent) : parent_(parent) {}
551 :
552 : // StreamInfo::StreamIdProvider
553 : absl::optional<absl::string_view> toStringView() const override;
554 : absl::optional<uint64_t> toInteger() const override;
555 :
556 : ActiveStream& parent_;
557 : };
558 :
559 : /**
560 : * Check to see if the connection can be closed after gracefully waiting to send pending codec
561 : * data.
562 : */
563 : void checkForDeferredClose(bool skip_deferred_close);
564 :
565 : /**
566 : * Do a delayed destruction of a stream to allow for stack unwind. Also calls onDestroy() for
567 : * each filter.
568 : */
569 : void doDeferredStreamDestroy(ActiveStream& stream);
570 :
571 : /**
572 : * Process a stream that is ending due to upstream response or reset.
573 : * If check_for_deferred_close is true, the ConnectionManager will check to
574 : * see if the connection was drained and should be closed if no streams remain.
575 : */
576 : void doEndStream(ActiveStream& stream, bool check_for_deferred_close = true);
577 :
578 : void resetAllStreams(absl::optional<StreamInfo::ResponseFlag> response_flag,
579 : absl::string_view details);
580 : void onIdleTimeout();
581 : void onConnectionDurationTimeout();
582 : void onDrainTimeout();
583 : void startDrainSequence();
584 0 : Tracing::Tracer& tracer() { return *config_.tracer(); }
585 : void handleCodecErrorImpl(absl::string_view error, absl::string_view details,
586 : StreamInfo::ResponseFlag response_flag);
587 : void handleCodecError(absl::string_view error);
588 : void handleCodecOverloadError(absl::string_view error);
589 : void doConnectionClose(absl::optional<Network::ConnectionCloseType> close_type,
590 : absl::optional<StreamInfo::ResponseFlag> response_flag,
591 : absl::string_view details);
592 : // Returns true if a RST_STREAM for the given stream is premature. Premature
593 : // means the RST_STREAM arrived before response headers were sent and than
594 : // the stream was alive for short period of time. This period is specified
595 : // by the optional runtime value PrematureResetMinStreamLifetimeSecondsKey,
596 : // or one second if that is not present.
597 : bool isPrematureRstStream(const ActiveStream& stream) const;
598 : // Sends a GOAWAY if both sufficient streams have been closed on a connection
599 : // and at least half have been prematurely reset?
600 : void maybeDrainDueToPrematureResets();
601 :
602 : bool shouldDeferRequestProxyingToNextIoCycle();
603 : void onDeferredRequestProcessing();
604 :
605 : enum class DrainState { NotDraining, Draining, Closing };
606 :
607 : ConnectionManagerConfig& config_;
608 : ConnectionManagerStats& stats_; // We store a reference here to avoid an extra stats() call on
609 : // the config in the hot path.
610 : ServerConnectionPtr codec_;
611 : std::list<ActiveStreamPtr> streams_;
612 : Stats::TimespanPtr conn_length_;
613 : const Network::DrainDecision& drain_close_;
614 : DrainState drain_state_{DrainState::NotDraining};
615 : UserAgent user_agent_;
616 : // An idle timer for the connection. This is only armed when there are no streams on the
617 : // connection. When there are active streams it is disarmed in favor of each stream's
618 : // stream_idle_timer_.
619 : Event::TimerPtr connection_idle_timer_;
620 : // A connection duration timer. Armed during handling new connection if enabled in config.
621 : Event::TimerPtr connection_duration_timer_;
622 : Event::TimerPtr drain_timer_;
623 : Random::RandomGenerator& random_generator_;
624 : Runtime::Loader& runtime_;
625 : const LocalInfo::LocalInfo& local_info_;
626 : Upstream::ClusterManager& cluster_manager_;
627 : Network::ReadFilterCallbacks* read_callbacks_{};
628 : Event::Dispatcher* dispatcher_{};
629 : ConnectionManagerListenerStats& listener_stats_;
630 : Server::OverloadManager& overload_manager_;
631 : Server::ThreadLocalOverloadState& overload_state_;
632 : Server::LoadShedPoint* accept_new_http_stream_{nullptr};
633 : // References into the overload manager thread local state map. Using these lets us avoid a
634 : // map lookup in the hot path of processing each request.
635 : const Server::OverloadActionState& overload_stop_accepting_requests_ref_;
636 : const Server::OverloadActionState& overload_disable_keepalive_ref_;
637 : TimeSource& time_source_;
638 : bool remote_close_{};
639 : // Hop by hop headers should always be cleared for Envoy-as-a-proxy but will
640 : // not be for Envoy-mobile.
641 : bool clear_hop_by_hop_response_headers_{true};
642 : // The number of requests accumulated on the current connection.
643 : uint64_t accumulated_requests_{};
644 : // The number of requests closed on the current connection which were
645 : // not internally destroyed
646 : uint64_t closed_non_internally_destroyed_requests_{};
647 : // The number of requests that received a premature RST_STREAM, according to
648 : // the definition given in `isPrematureRstStream()`.
649 : uint64_t number_premature_stream_resets_{0};
650 : const std::string proxy_name_; // for Proxy-Status.
651 : uint32_t requests_during_dispatch_count_{0};
652 : const uint32_t max_requests_during_dispatch_{UINT32_MAX};
653 : Event::SchedulableCallbackPtr deferred_request_processing_callback_;
654 :
655 : const bool refresh_rtt_after_request_{};
656 : };
657 :
658 : } // namespace Http
659 : } // namespace Envoy
|