1
#include "source/common/http/muxdemux.h"
2

            
3
#include <algorithm>
4
#include <memory>
5
#include <utility>
6

            
7
#include "envoy/buffer/buffer.h"
8
#include "envoy/http/async_client.h"
9
#include "envoy/http/header_map.h"
10
#include "envoy/server/factory_context.h"
11
#include "envoy/upstream/thread_local_cluster.h"
12

            
13
#include "source/common/common/assert.h"
14

            
15
#include "absl/status/status.h"
16
#include "absl/status/statusor.h"
17
#include "absl/strings/str_cat.h"
18
#include "absl/strings/string_view.h"
19
#include "absl/types/span.h"
20

            
21
namespace Envoy {
22
namespace Http {
23

            
24
2
void MultiStream::multicastHeaders(RequestHeaderMap& headers, bool end_stream) {
25
4
  for (CallbacksFacade& facade : callbacks_facades_) {
26
4
    if (!facade.is_idle_) {
27
4
      facade.stream_->sendHeaders(headers, end_stream);
28
4
    }
29
4
  }
30
2
}
31

            
32
77
void MultiStream::multicastData(Buffer::Instance& data, bool end_stream) {
33
  // Make a copy of data to workaround the sendData call moving slices out of the buffer.
34
  // We only need to do it, if there is more than 1 stream.
35
77
  Buffer::OwnedImpl copy;
36
77
  if (active_streams_ > 1) {
37
45
    copy.add(data);
38
45
  }
39
77
  uint32_t streams_done = 0;
40
122
  for (CallbacksFacade& facade : callbacks_facades_) {
41
122
    if (!facade.is_idle_) {
42
122
      facade.stream_->sendData(data, end_stream);
43
122
      ++streams_done;
44
122
      if (data.length() == 0) {
45
        // Avoid copy on the last call to sendData
46
121
        if (streams_done < active_streams_ - 1) {
47
          data.add(copy);
48
121
        } else {
49
121
          data.move(copy);
50
121
        }
51
121
      }
52
122
    }
53
122
  }
54
77
}
55

            
56
1
void MultiStream::multicastTrailers(RequestTrailerMap& trailers) {
57
2
  for (CallbacksFacade& facade : callbacks_facades_) {
58
2
    if (!facade.is_idle_) {
59
2
      facade.stream_->sendTrailers(trailers);
60
2
    }
61
2
  }
62
1
}
63

            
64
47
void MultiStream::multicastReset() {
65
71
  for (CallbacksFacade& facade : callbacks_facades_) {
66
69
    if (!facade.is_idle_) {
67
39
      facade.stream_->reset();
68
39
      facade.is_idle_ = true;
69
39
    }
70
69
  }
71
47
  if (auto muxdemux = muxdemux_.lock()) {
72
47
    muxdemux->switchToIdle();
73
47
  }
74
47
}
75

            
76
// TODO(yavlasov): This is a temporary solution to allow using weak_ptr in AsyncClient::start.
77
// It will be removed once AsyncClient::start accepts weak_ptr to callbacks.
78
65
void MultiStream::CallbacksFacade::onHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) {
79
65
  if (auto callbacks = callbacks_.lock()) {
80
65
    callbacks->onHeaders(std::move(headers), end_stream);
81
65
  }
82
65
}
83

            
84
112
void MultiStream::CallbacksFacade::onData(Buffer::Instance& data, bool end_stream) {
85
112
  if (auto callbacks = callbacks_.lock()) {
86
112
    callbacks->onData(data, end_stream);
87
112
  }
88
112
}
89

            
90
2
void MultiStream::CallbacksFacade::onTrailers(ResponseTrailerMapPtr&& trailers) {
91
2
  if (auto callbacks = callbacks_.lock()) {
92
2
    callbacks->onTrailers(std::move(trailers));
93
2
  }
94
2
}
95

            
96
27
void MultiStream::CallbacksFacade::onComplete() {
97
27
  is_idle_ = true;
98
27
  if (auto callbacks = callbacks_.lock()) {
99
27
    callbacks->onComplete();
100
27
  }
101
27
  multistream_.maybeSwitchToIdle();
102
27
}
103

            
104
41
void MultiStream::CallbacksFacade::onReset() {
105
41
  is_idle_ = true;
106
41
  if (auto callbacks = callbacks_.lock()) {
107
41
    callbacks->onReset();
108
41
  }
109
41
  multistream_.maybeSwitchToIdle();
110
41
}
111

            
112
47
MultiStream::~MultiStream() { multicastReset(); }
113

            
114
absl::Status MultiStream::addStream(const AsyncClient::StreamOptions& options,
115
                                    absl::string_view cluster_name,
116
                                    std::weak_ptr<AsyncClient::StreamCallbacks> callbacks,
117
74
                                    Server::Configuration::FactoryContext& factory_context) {
118
74
  Envoy::Upstream::ThreadLocalCluster* cluster =
119
74
      factory_context.serverFactoryContext().clusterManager().getThreadLocalCluster(cluster_name);
120
74
  if (cluster == nullptr) {
121
    // Allow missing clusters in case control plane did not converge yet.
122
    // TODO(yanavlasov): We can possibly fail request here as well.
123
4
    return absl::OkStatus();
124
4
  }
125
70
  callbacks_facades_.emplace_back(*this, callbacks);
126
70
  AsyncClient::Stream* stream =
127
70
      cluster->httpAsyncClient().start(callbacks_facades_.back(), options);
128
70
  if (stream == nullptr) {
129
1
    callbacks_facades_.pop_back();
130
1
    return absl::InternalError(absl::StrCat("Failed to start stream for cluster ", cluster_name));
131
1
  }
132
69
  callbacks_facades_.back().stream_ = stream;
133
  // If at least one stream was successfully started, multiplexer is not idle anymore.
134
69
  ++active_streams_;
135
69
  return absl::OkStatus();
136
70
}
137

            
138
68
void MultiStream::maybeSwitchToIdle() {
139
68
  ASSERT(active_streams_ > 0);
140
68
  --active_streams_;
141
68
  if (active_streams_ > 0) {
142
26
    return;
143
26
  }
144
42
  if (auto muxdemux = muxdemux_.lock()) {
145
42
    muxdemux->switchToIdle();
146
42
  }
147
42
}
148

            
149
58
MuxDemux::MuxDemux(Server::Configuration::FactoryContext& context) : factory_context_(context) {}
150

            
151
58
MuxDemux::~MuxDemux() {}
152

            
153
absl::StatusOr<std::unique_ptr<MultiStream>>
154
MuxDemux::multicast(const AsyncClient::StreamOptions& options,
155
48
                    absl::Span<const Callbacks> callbacks) {
156
  // Sanity checks
157
48
  if (callbacks.empty()) {
158
1
    return absl::InvalidArgumentError("No callbacks provided");
159
1
  }
160
47
  if (std::any_of(callbacks.begin(), callbacks.end(),
161
75
                  [](const Callbacks& callback) { return callback.cluster_name.empty(); })) {
162
    return absl::InvalidArgumentError("Cluster name is empty");
163
  }
164
47
  if (std::any_of(callbacks.begin(), callbacks.end(),
165
75
                  [](const Callbacks& callback) { return callback.callbacks.use_count() == 0; })) {
166
    return absl::InvalidArgumentError("Callbacks are null");
167
  }
168

            
169
47
  auto multistream = std::unique_ptr<MultiStream>(new MultiStream(shared_from_this()));
170
47
  multistream->callbacks_facades_.reserve(callbacks.size());
171
74
  for (const Callbacks& callback : callbacks) {
172
    // Use per-backend options if provided, otherwise fall back to the default options.
173
74
    const AsyncClient::StreamOptions& effective_options =
174
74
        callback.options.has_value() ? callback.options.value() : options;
175
74
    absl::Status status = multistream->addStream(effective_options, callback.cluster_name,
176
74
                                                 callback.callbacks, factory_context_);
177
74
    if (!status.ok()) {
178
1
      return status;
179
1
    }
180
74
  }
181
  // If no streams were actually started, return error.
182
46
  if (multistream->isIdle()) {
183
3
    return absl::InternalError("No streams were started");
184
3
  }
185

            
186
43
  is_idle_ = false;
187
43
  return multistream;
188
46
}
189

            
190
} // namespace Http
191
} // namespace Envoy