Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/test/integration/integration_stream_decoder.cc
Line
Count
Source (jump to first uncovered line)
1
#include "test/integration/integration_stream_decoder.h"
2
3
#include <algorithm>
4
#include <cstdint>
5
#include <memory>
6
#include <string>
7
#include <utility>
8
9
#include "envoy/buffer/buffer.h"
10
#include "envoy/event/dispatcher.h"
11
#include "envoy/http/header_map.h"
12
13
#include "source/common/common/assert.h"
14
15
#include "gmock/gmock.h"
16
#include "gtest/gtest.h"
17
18
using testing::AssertionFailure;
19
using testing::AssertionResult;
20
using testing::AssertionSuccess;
21
22
namespace Envoy {
23
24
IntegrationStreamDecoder::IntegrationStreamDecoder(Event::Dispatcher& dispatcher)
25
1.10k
    : dispatcher_(dispatcher) {}
Unexecuted instantiation: Envoy::IntegrationStreamDecoder::IntegrationStreamDecoder(Envoy::Event::Dispatcher&)
Envoy::IntegrationStreamDecoder::IntegrationStreamDecoder(Envoy::Event::Dispatcher&)
Line
Count
Source
25
1.10k
    : dispatcher_(dispatcher) {}
26
27
0
void IntegrationStreamDecoder::waitFor1xxHeaders() {
28
0
  if (!continue_headers_.get()) {
29
0
    waiting_for_continue_headers_ = true;
30
0
    dispatcher_.run(Event::Dispatcher::RunType::Block);
31
0
  }
32
0
}
33
34
0
void IntegrationStreamDecoder::waitForHeaders() {
35
0
  if (!headers_.get()) {
36
0
    waiting_for_headers_ = true;
37
0
    dispatcher_.run(Event::Dispatcher::RunType::Block);
38
0
  }
39
0
}
40
41
0
void IntegrationStreamDecoder::waitForBodyData(uint64_t size) {
42
0
  ASSERT(body_data_waiting_length_ == 0);
43
0
  body_data_waiting_length_ = size;
44
0
  body_data_waiting_length_ -=
45
0
      std::min(body_data_waiting_length_, static_cast<uint64_t>(body_.size()));
46
0
  if (body_data_waiting_length_ > 0) {
47
0
    dispatcher_.run(Event::Dispatcher::RunType::Block);
48
0
  }
49
0
}
50
51
1.10k
AssertionResult IntegrationStreamDecoder::waitForEndStream(std::chrono::milliseconds timeout) {
52
1.10k
  bool timer_fired = false;
53
2.16k
  while (!saw_end_stream_) {
54
1.07k
    Event::TimerPtr timer(dispatcher_.createTimer([this, &timer_fired]() -> void {
55
15
      timer_fired = true;
56
15
      dispatcher_.exit();
57
15
    }));
58
1.07k
    timer->enableTimer(timeout);
59
1.07k
    waiting_for_end_stream_ = true;
60
1.07k
    dispatcher_.run(Event::Dispatcher::RunType::Block);
61
1.07k
    if (!saw_end_stream_) {
62
15
      ENVOY_LOG_MISC(warn, "non-end stream event.");
63
15
    }
64
1.07k
    if (timer_fired) {
65
15
      return AssertionFailure() << "Timed out waiting for end stream\n";
66
15
    }
67
1.07k
  }
68
1.09k
  return AssertionSuccess();
69
1.10k
}
70
71
0
AssertionResult IntegrationStreamDecoder::waitForReset(std::chrono::milliseconds timeout) {
72
0
  if (!saw_reset_) {
73
    // Set a timer to stop the dispatcher if the timeout has been exceeded.
74
0
    Event::TimerPtr timer(dispatcher_.createTimer([this]() -> void { dispatcher_.exit(); }));
75
0
    timer->enableTimer(timeout);
76
0
    waiting_for_reset_ = true;
77
0
    dispatcher_.run(Event::Dispatcher::RunType::Block);
78
    // If the timer has fired, this timed out before a reset was received.
79
0
    if (!timer->enabled()) {
80
0
      return AssertionFailure() << "Timed out waiting for reset.";
81
0
    }
82
0
  }
83
0
  return AssertionSuccess();
84
0
}
85
86
0
void IntegrationStreamDecoder::decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) {
87
0
  continue_headers_ = std::move(headers);
88
0
  if (waiting_for_continue_headers_) {
89
0
    dispatcher_.exit();
90
0
  }
91
0
}
92
93
void IntegrationStreamDecoder::decodeHeaders(Http::ResponseHeaderMapPtr&& headers,
94
1.10k
                                             bool end_stream) {
95
1.10k
  saw_end_stream_ = end_stream;
96
1.10k
  headers_ = std::move(headers);
97
1.10k
  if ((end_stream && waiting_for_end_stream_) || waiting_for_headers_) {
98
17
    dispatcher_.exit();
99
17
  }
100
1.10k
}
101
102
2.13k
void IntegrationStreamDecoder::decodeData(Buffer::Instance& data, bool end_stream) {
103
2.13k
  saw_end_stream_ = end_stream;
104
2.13k
  body_ += data.toString();
105
106
2.13k
  if (end_stream && waiting_for_end_stream_) {
107
1.04k
    dispatcher_.exit();
108
1.09k
  } else if (body_data_waiting_length_ > 0) {
109
0
    body_data_waiting_length_ -= std::min(body_data_waiting_length_, data.length());
110
0
    if (body_data_waiting_length_ == 0) {
111
0
      dispatcher_.exit();
112
0
    }
113
0
  }
114
2.13k
}
115
116
0
void IntegrationStreamDecoder::decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) {
117
0
  saw_end_stream_ = true;
118
0
  trailers_ = std::move(trailers);
119
0
  if (waiting_for_end_stream_) {
120
0
    dispatcher_.exit();
121
0
  }
122
0
}
123
124
0
void IntegrationStreamDecoder::decodeMetadata(Http::MetadataMapPtr&& metadata_map) {
125
0
  metadata_maps_decoded_count_++;
126
  // Combines newly received metadata with the existing metadata.
127
0
  for (const auto& metadata : *metadata_map) {
128
0
    duplicated_metadata_key_count_[metadata.first]++;
129
0
    metadata_map_->insert(metadata);
130
0
  }
131
0
}
132
133
15
void IntegrationStreamDecoder::onResetStream(Http::StreamResetReason reason, absl::string_view) {
134
15
  saw_reset_ = true;
135
15
  reset_reason_ = reason;
136
15
  if (waiting_for_reset_) {
137
0
    dispatcher_.exit();
138
0
  }
139
15
}
140
141
} // namespace Envoy