Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/test/integration/integration_stream_decoder.cc
Line
Count
Source (jump to first uncovered line)
1
#include "test/integration/integration_stream_decoder.h"
2
3
#include <algorithm>
4
#include <cstdint>
5
#include <memory>
6
#include <string>
7
#include <utility>
8
9
#include "envoy/buffer/buffer.h"
10
#include "envoy/event/dispatcher.h"
11
#include "envoy/http/header_map.h"
12
13
#include "source/common/common/assert.h"
14
15
#include "gmock/gmock.h"
16
#include "gtest/gtest.h"
17
18
using testing::AssertionFailure;
19
using testing::AssertionResult;
20
using testing::AssertionSuccess;
21
22
namespace Envoy {
23
24
IntegrationStreamDecoder::IntegrationStreamDecoder(Event::Dispatcher& dispatcher)
25
815
    : dispatcher_(dispatcher) {}
Unexecuted instantiation: Envoy::IntegrationStreamDecoder::IntegrationStreamDecoder(Envoy::Event::Dispatcher&)
Envoy::IntegrationStreamDecoder::IntegrationStreamDecoder(Envoy::Event::Dispatcher&)
Line
Count
Source
25
815
    : dispatcher_(dispatcher) {}
26
27
815
IntegrationStreamDecoder::~IntegrationStreamDecoder() {
28
815
  ENVOY_LOG_MISC(trace, "Destroying IntegrationStreamDecoder");
29
815
}
30
31
0
void IntegrationStreamDecoder::waitFor1xxHeaders() {
32
0
  if (!continue_headers_.get()) {
33
0
    waiting_for_continue_headers_ = true;
34
0
    dispatcher_.run(Event::Dispatcher::RunType::Block);
35
0
  }
36
0
}
37
38
0
void IntegrationStreamDecoder::waitForHeaders() {
39
0
  if (!headers_.get()) {
40
0
    waiting_for_headers_ = true;
41
0
    dispatcher_.run(Event::Dispatcher::RunType::Block);
42
0
  }
43
0
}
44
45
0
void IntegrationStreamDecoder::waitForBodyData(uint64_t size) {
46
0
  ASSERT(body_data_waiting_length_ == 0);
47
0
  body_data_waiting_length_ = size;
48
0
  body_data_waiting_length_ -=
49
0
      std::min(body_data_waiting_length_, static_cast<uint64_t>(body_.size()));
50
0
  if (body_data_waiting_length_ > 0) {
51
0
    dispatcher_.run(Event::Dispatcher::RunType::Block);
52
0
  }
53
0
}
54
55
815
AssertionResult IntegrationStreamDecoder::waitForEndStream(std::chrono::milliseconds timeout) {
56
815
  bool timer_fired = false;
57
1.61k
  while (!saw_end_stream_) {
58
815
    Event::TimerPtr timer(dispatcher_.createTimer([this, &timer_fired]() -> void {
59
11
      timer_fired = true;
60
11
      dispatcher_.exit();
61
11
    }));
62
815
    timer->enableTimer(timeout);
63
815
    waiting_for_end_stream_ = true;
64
815
    dispatcher_.run(Event::Dispatcher::RunType::Block);
65
815
    if (!saw_end_stream_) {
66
11
      ENVOY_LOG_MISC(warn, "non-end stream event.");
67
11
    }
68
815
    if (timer_fired) {
69
11
      return AssertionFailure() << "Timed out waiting for end stream\n";
70
11
    }
71
815
  }
72
804
  return AssertionSuccess();
73
815
}
74
75
0
AssertionResult IntegrationStreamDecoder::waitForReset(std::chrono::milliseconds timeout) {
76
0
  if (!saw_reset_) {
77
    // Set a timer to stop the dispatcher if the timeout has been exceeded.
78
0
    Event::TimerPtr timer(dispatcher_.createTimer([this]() -> void { dispatcher_.exit(); }));
79
0
    timer->enableTimer(timeout);
80
0
    waiting_for_reset_ = true;
81
0
    dispatcher_.run(Event::Dispatcher::RunType::Block);
82
    // If the timer has fired, this timed out before a reset was received.
83
0
    if (!timer->enabled()) {
84
0
      return AssertionFailure() << "Timed out waiting for reset.";
85
0
    }
86
0
  }
87
0
  return AssertionSuccess();
88
0
}
89
90
0
void IntegrationStreamDecoder::decode1xxHeaders(Http::ResponseHeaderMapPtr&& headers) {
91
0
  continue_headers_ = std::move(headers);
92
0
  if (waiting_for_continue_headers_) {
93
0
    dispatcher_.exit();
94
0
  }
95
0
}
96
97
void IntegrationStreamDecoder::decodeHeaders(Http::ResponseHeaderMapPtr&& headers,
98
804
                                             bool end_stream) {
99
804
  saw_end_stream_ = end_stream;
100
804
  headers_ = std::move(headers);
101
804
  if ((end_stream && waiting_for_end_stream_) || waiting_for_headers_) {
102
18
    dispatcher_.exit();
103
18
  }
104
804
}
105
106
1.56k
void IntegrationStreamDecoder::decodeData(Buffer::Instance& data, bool end_stream) {
107
1.56k
  saw_end_stream_ = end_stream;
108
1.56k
  body_ += data.toString();
109
110
1.56k
  if (end_stream && waiting_for_end_stream_) {
111
786
    dispatcher_.exit();
112
786
  } else if (body_data_waiting_length_ > 0) {
113
0
    body_data_waiting_length_ -= std::min(body_data_waiting_length_, data.length());
114
0
    if (body_data_waiting_length_ == 0) {
115
0
      dispatcher_.exit();
116
0
    }
117
0
  }
118
1.56k
}
119
120
0
void IntegrationStreamDecoder::decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) {
121
0
  saw_end_stream_ = true;
122
0
  trailers_ = std::move(trailers);
123
0
  if (waiting_for_end_stream_) {
124
0
    dispatcher_.exit();
125
0
  }
126
0
}
127
128
0
void IntegrationStreamDecoder::decodeMetadata(Http::MetadataMapPtr&& metadata_map) {
129
0
  metadata_maps_decoded_count_++;
130
  // Combines newly received metadata with the existing metadata.
131
0
  for (const auto& metadata : *metadata_map) {
132
0
    duplicated_metadata_key_count_[metadata.first]++;
133
0
    metadata_map_->insert(metadata);
134
0
  }
135
0
}
136
137
11
void IntegrationStreamDecoder::onResetStream(Http::StreamResetReason reason, absl::string_view) {
138
11
  saw_reset_ = true;
139
11
  reset_reason_ = reason;
140
11
  if (waiting_for_reset_) {
141
0
    dispatcher_.exit();
142
0
  }
143
11
}
144
145
} // namespace Envoy