/proc/self/cwd/test/integration/integration_stream_decoder.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "test/integration/integration_stream_decoder.h" |
2 | | |
3 | | #include <algorithm> |
4 | | #include <cstdint> |
5 | | #include <memory> |
6 | | #include <string> |
7 | | #include <utility> |
8 | | |
9 | | #include "envoy/buffer/buffer.h" |
10 | | #include "envoy/event/dispatcher.h" |
11 | | #include "envoy/http/header_map.h" |
12 | | |
13 | | #include "source/common/common/assert.h" |
14 | | |
15 | | #include "gmock/gmock.h" |
16 | | #include "gtest/gtest.h" |
17 | | |
18 | | using testing::AssertionFailure; |
19 | | using testing::AssertionResult; |
20 | | using testing::AssertionSuccess; |
21 | | |
22 | | namespace Envoy { |
23 | | |
24 | | IntegrationStreamDecoder::IntegrationStreamDecoder(Event::Dispatcher& dispatcher) |
25 | 815 | : dispatcher_(dispatcher) {} Unexecuted instantiation: Envoy::IntegrationStreamDecoder::IntegrationStreamDecoder(Envoy::Event::Dispatcher&) Envoy::IntegrationStreamDecoder::IntegrationStreamDecoder(Envoy::Event::Dispatcher&) Line | Count | Source | 25 | 815 | : dispatcher_(dispatcher) {} |
|
26 | | |
27 | 815 | IntegrationStreamDecoder::~IntegrationStreamDecoder() { |
28 | 815 | ENVOY_LOG_MISC(trace, "Destroying IntegrationStreamDecoder"); |
29 | 815 | } |
30 | | |
31 | 0 | void IntegrationStreamDecoder::waitFor1xxHeaders() { |
32 | 0 | if (!continue_headers_.get()) { |
33 | 0 | waiting_for_continue_headers_ = true; |
34 | 0 | dispatcher_.run(Event::Dispatcher::RunType::Block); |
35 | 0 | } |
36 | 0 | } |
37 | | |
38 | 0 | void IntegrationStreamDecoder::waitForHeaders() { |
39 | 0 | if (!headers_.get()) { |
40 | 0 | waiting_for_headers_ = true; |
41 | 0 | dispatcher_.run(Event::Dispatcher::RunType::Block); |
42 | 0 | } |
43 | 0 | } |
44 | | |
45 | 0 | void IntegrationStreamDecoder::waitForBodyData(uint64_t size) { |
46 | 0 | ASSERT(body_data_waiting_length_ == 0); |
47 | 0 | body_data_waiting_length_ = size; |
48 | 0 | body_data_waiting_length_ -= |
49 | 0 | std::min(body_data_waiting_length_, static_cast<uint64_t>(body_.size())); |
50 | 0 | if (body_data_waiting_length_ > 0) { |
51 | 0 | dispatcher_.run(Event::Dispatcher::RunType::Block); |
52 | 0 | } |
53 | 0 | } |
54 | | |
55 | 815 | AssertionResult IntegrationStreamDecoder::waitForEndStream(std::chrono::milliseconds timeout) { |
56 | 815 | bool timer_fired = false; |
57 | 1.61k | while (!saw_end_stream_) { |
58 | 815 | Event::TimerPtr timer(dispatcher_.createTimer([this, &timer_fired]() -> void { |
59 | 11 | timer_fired = true; |
60 | 11 | dispatcher_.exit(); |
61 | 11 | })); |
62 | 815 | timer->enableTimer(timeout); |
63 | 815 | waiting_for_end_stream_ = true; |
64 | 815 | dispatcher_.run(Event::Dispatcher::RunType::Block); |
65 | 815 | if (!saw_end_stream_) { |
66 | 11 | ENVOY_LOG_MISC(warn, "non-end stream event."); |
67 | 11 | } |
68 | 815 | if (timer_fired) { |
69 | 11 | return AssertionFailure() << "Timed out waiting for end stream\n"; |
70 | 11 | } |
71 | 815 | } |
72 | 804 | return AssertionSuccess(); |
73 | 815 | } |
74 | | |
75 | 0 | AssertionResult IntegrationStreamDecoder::waitForReset(std::chrono::milliseconds timeout) { |
76 | 0 | if (!saw_reset_) { |
77 | | // Set a timer to stop the dispatcher if the timeout has been exceeded. |
78 | 0 | Event::TimerPtr timer(dispatcher_.createTimer([this]() -> void { dispatcher_.exit(); })); |
79 | 0 | timer->enableTimer(timeout); |
80 | 0 | waiting_for_reset_ = true; |
81 | 0 | dispatcher_.run(Event::Dispatcher::RunType::Block); |
82 | | // If the timer has fired, this timed out before a reset was received. |
83 | 0 | if (!timer->enabled()) { |
84 | 0 | return AssertionFailure() << "Timed out waiting for reset."; |
85 | 0 | } |
86 | 0 | } |
87 | 0 | return AssertionSuccess(); |
88 | 0 | } |
89 | | |
90 | 0 | void IntegrationStreamDecoder::decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) { |
91 | 0 | continue_headers_ = std::move(headers); |
92 | 0 | if (waiting_for_continue_headers_) { |
93 | 0 | dispatcher_.exit(); |
94 | 0 | } |
95 | 0 | } |
96 | | |
97 | | void IntegrationStreamDecoder::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, |
98 | 804 | bool end_stream) { |
99 | 804 | saw_end_stream_ = end_stream; |
100 | 804 | headers_ = std::move(headers); |
101 | 804 | if ((end_stream && waiting_for_end_stream_) || waiting_for_headers_) { |
102 | 18 | dispatcher_.exit(); |
103 | 18 | } |
104 | 804 | } |
105 | | |
106 | 1.56k | void IntegrationStreamDecoder::decodeData(Buffer::Instance& data, bool end_stream) { |
107 | 1.56k | saw_end_stream_ = end_stream; |
108 | 1.56k | body_ += data.toString(); |
109 | | |
110 | 1.56k | if (end_stream && waiting_for_end_stream_) { |
111 | 786 | dispatcher_.exit(); |
112 | 786 | } else if (body_data_waiting_length_ > 0) { |
113 | 0 | body_data_waiting_length_ -= std::min(body_data_waiting_length_, data.length()); |
114 | 0 | if (body_data_waiting_length_ == 0) { |
115 | 0 | dispatcher_.exit(); |
116 | 0 | } |
117 | 0 | } |
118 | 1.56k | } |
119 | | |
120 | 0 | void IntegrationStreamDecoder::decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) { |
121 | 0 | saw_end_stream_ = true; |
122 | 0 | trailers_ = std::move(trailers); |
123 | 0 | if (waiting_for_end_stream_) { |
124 | 0 | dispatcher_.exit(); |
125 | 0 | } |
126 | 0 | } |
127 | | |
128 | 0 | void IntegrationStreamDecoder::decodeMetadata(Http::MetadataMapPtr&& metadata_map) { |
129 | 0 | metadata_maps_decoded_count_++; |
130 | | // Combines newly received metadata with the existing metadata. |
131 | 0 | for (const auto& metadata : *metadata_map) { |
132 | 0 | duplicated_metadata_key_count_[metadata.first]++; |
133 | 0 | metadata_map_->insert(metadata); |
134 | 0 | } |
135 | 0 | } |
136 | | |
137 | 11 | void IntegrationStreamDecoder::onResetStream(Http::StreamResetReason reason, absl::string_view) { |
138 | 11 | saw_reset_ = true; |
139 | 11 | reset_reason_ = reason; |
140 | 11 | if (waiting_for_reset_) { |
141 | 0 | dispatcher_.exit(); |
142 | 0 | } |
143 | 11 | } |
144 | | |
145 | | } // namespace Envoy |