/proc/self/cwd/test/integration/integration_stream_decoder.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "test/integration/integration_stream_decoder.h" |
2 | | |
3 | | #include <algorithm> |
4 | | #include <cstdint> |
5 | | #include <memory> |
6 | | #include <string> |
7 | | #include <utility> |
8 | | |
9 | | #include "envoy/buffer/buffer.h" |
10 | | #include "envoy/event/dispatcher.h" |
11 | | #include "envoy/http/header_map.h" |
12 | | |
13 | | #include "source/common/common/assert.h" |
14 | | |
15 | | #include "gmock/gmock.h" |
16 | | #include "gtest/gtest.h" |
17 | | |
18 | | using testing::AssertionFailure; |
19 | | using testing::AssertionResult; |
20 | | using testing::AssertionSuccess; |
21 | | |
22 | | namespace Envoy { |
23 | | |
24 | | IntegrationStreamDecoder::IntegrationStreamDecoder(Event::Dispatcher& dispatcher) |
25 | 1.10k | : dispatcher_(dispatcher) {}Unexecuted instantiation: Envoy::IntegrationStreamDecoder::IntegrationStreamDecoder(Envoy::Event::Dispatcher&) Envoy::IntegrationStreamDecoder::IntegrationStreamDecoder(Envoy::Event::Dispatcher&) Line | Count | Source | 25 | 1.10k | : dispatcher_(dispatcher) {} |
|
26 | | |
27 | 0 | void IntegrationStreamDecoder::waitFor1xxHeaders() { |
28 | 0 | if (!continue_headers_.get()) { |
29 | 0 | waiting_for_continue_headers_ = true; |
30 | 0 | dispatcher_.run(Event::Dispatcher::RunType::Block); |
31 | 0 | } |
32 | 0 | } |
33 | | |
34 | 0 | void IntegrationStreamDecoder::waitForHeaders() { |
35 | 0 | if (!headers_.get()) { |
36 | 0 | waiting_for_headers_ = true; |
37 | 0 | dispatcher_.run(Event::Dispatcher::RunType::Block); |
38 | 0 | } |
39 | 0 | } |
40 | | |
41 | 0 | void IntegrationStreamDecoder::waitForBodyData(uint64_t size) { |
42 | 0 | ASSERT(body_data_waiting_length_ == 0); |
43 | 0 | body_data_waiting_length_ = size; |
44 | 0 | body_data_waiting_length_ -= |
45 | 0 | std::min(body_data_waiting_length_, static_cast<uint64_t>(body_.size())); |
46 | 0 | if (body_data_waiting_length_ > 0) { |
47 | 0 | dispatcher_.run(Event::Dispatcher::RunType::Block); |
48 | 0 | } |
49 | 0 | } |
50 | | |
51 | 1.10k | AssertionResult IntegrationStreamDecoder::waitForEndStream(std::chrono::milliseconds timeout) { |
52 | 1.10k | bool timer_fired = false; |
53 | 2.16k | while (!saw_end_stream_) { |
54 | 1.07k | Event::TimerPtr timer(dispatcher_.createTimer([this, &timer_fired]() -> void { |
55 | 15 | timer_fired = true; |
56 | 15 | dispatcher_.exit(); |
57 | 15 | })); |
58 | 1.07k | timer->enableTimer(timeout); |
59 | 1.07k | waiting_for_end_stream_ = true; |
60 | 1.07k | dispatcher_.run(Event::Dispatcher::RunType::Block); |
61 | 1.07k | if (!saw_end_stream_) { |
62 | 15 | ENVOY_LOG_MISC(warn, "non-end stream event."); |
63 | 15 | } |
64 | 1.07k | if (timer_fired) { |
65 | 15 | return AssertionFailure() << "Timed out waiting for end stream\n"; |
66 | 15 | } |
67 | 1.07k | } |
68 | 1.09k | return AssertionSuccess(); |
69 | 1.10k | } |
70 | | |
71 | 0 | AssertionResult IntegrationStreamDecoder::waitForReset(std::chrono::milliseconds timeout) { |
72 | 0 | if (!saw_reset_) { |
73 | | // Set a timer to stop the dispatcher if the timeout has been exceeded. |
74 | 0 | Event::TimerPtr timer(dispatcher_.createTimer([this]() -> void { dispatcher_.exit(); })); |
75 | 0 | timer->enableTimer(timeout); |
76 | 0 | waiting_for_reset_ = true; |
77 | 0 | dispatcher_.run(Event::Dispatcher::RunType::Block); |
78 | | // If the timer has fired, this timed out before a reset was received. |
79 | 0 | if (!timer->enabled()) { |
80 | 0 | return AssertionFailure() << "Timed out waiting for reset."; |
81 | 0 | } |
82 | 0 | } |
83 | 0 | return AssertionSuccess(); |
84 | 0 | } |
85 | | |
86 | 0 | void IntegrationStreamDecoder::decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) { |
87 | 0 | continue_headers_ = std::move(headers); |
88 | 0 | if (waiting_for_continue_headers_) { |
89 | 0 | dispatcher_.exit(); |
90 | 0 | } |
91 | 0 | } |
92 | | |
93 | | void IntegrationStreamDecoder::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, |
94 | 1.10k | bool end_stream) { |
95 | 1.10k | saw_end_stream_ = end_stream; |
96 | 1.10k | headers_ = std::move(headers); |
97 | 1.10k | if ((end_stream && waiting_for_end_stream_) || waiting_for_headers_) { |
98 | 17 | dispatcher_.exit(); |
99 | 17 | } |
100 | 1.10k | } |
101 | | |
102 | 2.13k | void IntegrationStreamDecoder::decodeData(Buffer::Instance& data, bool end_stream) { |
103 | 2.13k | saw_end_stream_ = end_stream; |
104 | 2.13k | body_ += data.toString(); |
105 | | |
106 | 2.13k | if (end_stream && waiting_for_end_stream_) { |
107 | 1.04k | dispatcher_.exit(); |
108 | 1.09k | } else if (body_data_waiting_length_ > 0) { |
109 | 0 | body_data_waiting_length_ -= std::min(body_data_waiting_length_, data.length()); |
110 | 0 | if (body_data_waiting_length_ == 0) { |
111 | 0 | dispatcher_.exit(); |
112 | 0 | } |
113 | 0 | } |
114 | 2.13k | } |
115 | | |
116 | 0 | void IntegrationStreamDecoder::decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) { |
117 | 0 | saw_end_stream_ = true; |
118 | 0 | trailers_ = std::move(trailers); |
119 | 0 | if (waiting_for_end_stream_) { |
120 | 0 | dispatcher_.exit(); |
121 | 0 | } |
122 | 0 | } |
123 | | |
124 | 0 | void IntegrationStreamDecoder::decodeMetadata(Http::MetadataMapPtr&& metadata_map) { |
125 | 0 | metadata_maps_decoded_count_++; |
126 | | // Combines newly received metadata with the existing metadata. |
127 | 0 | for (const auto& metadata : *metadata_map) { |
128 | 0 | duplicated_metadata_key_count_[metadata.first]++; |
129 | 0 | metadata_map_->insert(metadata); |
130 | 0 | } |
131 | 0 | } |
132 | | |
133 | 15 | void IntegrationStreamDecoder::onResetStream(Http::StreamResetReason reason, absl::string_view) { |
134 | 15 | saw_reset_ = true; |
135 | 15 | reset_reason_ = reason; |
136 | 15 | if (waiting_for_reset_) { |
137 | 0 | dispatcher_.exit(); |
138 | 0 | } |
139 | 15 | } |
140 | | |
141 | | } // namespace Envoy |