1
#pragma once
2

            
3
#include <cstddef>
4
#include <cstdint>
5
#include <iterator>
6
#include <memory>
7
#include <string>
8
#include <vector>
9

            
10
#include "envoy/buffer/buffer.h"
11
#include "envoy/http/async_client.h"
12
#include "envoy/http/header_map.h"
13
#include "envoy/server/factory_context.h"
14

            
15
#include "absl/status/status.h"
16
#include "absl/status/statusor.h"
17
#include "absl/strings/string_view.h"
18
#include "absl/types/span.h"
19

            
20
namespace Envoy {
21
namespace Http {
22

            
23
class MuxDemux;
24

            
25
// Facade that combines multiple Http::AsyncClient::Stream into one object.
26
// Created my the MuxDemux::multicast() method.
27
class MultiStream {
28
  // TODO(yanavlasov): This is a temporary solution to allow using
29
  // weak_ptr<AsyncClient::StreamCallbacks> with AsyncClient::start. It will be removed once
30
  // AsyncClient::start accepts weak_ptr to callbacks.
31
  struct CallbacksFacade : public AsyncClient::StreamCallbacks {
32
    explicit CallbacksFacade(MultiStream& multistream,
33
                             std::weak_ptr<AsyncClient::StreamCallbacks> callbacks)
34
70
        : multistream_(multistream), callbacks_(callbacks) {}
35

            
36
    void onHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) override;
37
    void onData(Buffer::Instance& data, bool end_stream) override;
38
    void onTrailers(ResponseTrailerMapPtr&& trailers) override;
39
    void onComplete() override;
40
    void onReset() override;
41

            
42
    MultiStream& multistream_;
43
    std::weak_ptr<AsyncClient::StreamCallbacks> callbacks_;
44
    bool is_idle_{false};
45
    AsyncClient::Stream* stream_{nullptr};
46
  };
47

            
48
public:
49
  ~MultiStream();
50

            
51
  // Iterator over streams. Allows sending different headers, body or trailers to different streams.
52
  struct StreamIterator {
53
    using difference_type = std::ptrdiff_t;
54
    using element_type = AsyncClient::Stream*;
55
    using pointer = element_type*;
56
    using reference = element_type&;
57
102
    explicit StreamIterator(std::vector<CallbacksFacade>::iterator it) : it(it) {}
58
    StreamIterator() = default;
59

            
60
64
    reference operator*() { return it->stream_; }
61
    reference operator*() const { return it->stream_; }
62
    reference operator->() { return it->stream_; }
63
    reference operator->() const { return it->stream_; }
64
48
    StreamIterator& operator++() {
65
48
      ++it;
66
48
      return *this;
67
48
    }
68
    StreamIterator operator++(int) {
69
      StreamIterator tmp(*this);
70
      ++(*this);
71
      return tmp;
72
    }
73
66
    bool operator==(const StreamIterator& other) const { return it == other.it; }
74

            
75
    std::vector<CallbacksFacade>::iterator it;
76
  };
77

            
78
  // Iterator over underlying Http::AsyncClient::Stream objects
79
  static_assert(std::forward_iterator<StreamIterator>);
80
40
  StreamIterator begin() { return StreamIterator(callbacks_facades_.begin()); }
81
62
  StreamIterator end() { return StreamIterator(callbacks_facades_.end()); }
82

            
83
  // Send the same headers to all streams.
84
  void multicastHeaders(RequestHeaderMap& headers, bool end_stream);
85
  // Send the same data to all streams.
86
  void multicastData(Buffer::Instance& data, bool end_stream);
87
  // Send the same trailers to all streams.
88
  void multicastTrailers(RequestTrailerMap& trailers);
89
  // Reset all streams.
90
  void multicastReset();
91

            
92
51
  bool isIdle() const { return active_streams_ == 0; };
93

            
94
private:
95
  friend class MuxDemux;
96
47
  MultiStream(std::weak_ptr<MuxDemux> muxdemux) : muxdemux_(muxdemux) {}
97

            
98
  absl::Status addStream(const AsyncClient::StreamOptions& options, absl::string_view cluster_name,
99
                         std::weak_ptr<AsyncClient::StreamCallbacks> callbacks,
100
                         Server::Configuration::FactoryContext& factory_context);
101
  void maybeSwitchToIdle();
102

            
103
  uint32_t active_streams_{0};
104
  std::weak_ptr<MuxDemux> muxdemux_;
105
  std::vector<CallbacksFacade> callbacks_facades_;
106
};
107

            
108
// MuxDemux allows sending the same or different requests to multiple destinations.
109
// The same connections are re-used when sending repeated requests.
110
class MuxDemux : public std::enable_shared_from_this<MuxDemux> {
111
public:
112
  struct Callbacks {
113
    std::string cluster_name;
114
    std::weak_ptr<AsyncClient::StreamCallbacks> callbacks;
115
    absl::optional<AsyncClient::StreamOptions> options;
116
  };
117

            
118
58
  static std::shared_ptr<MuxDemux> create(Server::Configuration::FactoryContext& context) {
119
58
    return std::shared_ptr<MuxDemux>(new MuxDemux(context));
120
58
  }
121

            
122
  ~MuxDemux();
123

            
124
  // Multicast a request to multiple destinations. Multiplexer must be in an idle state.
125
  // Return a MultiStream object if the request was successfully sent to all destinations.
126
  // Error if the multiplexer was not in an idle state, or all streams failed to start.
127
  // Note releasing MultiStream object while it is not fully closed (i.e. the end_stream
128
  // was observed in both directions on all streams) will result in all still active streams being
129
  // reset.
130
  absl::StatusOr<std::unique_ptr<MultiStream>> multicast(const AsyncClient::StreamOptions& options,
131
                                                         absl::Span<const Callbacks> callbacks);
132

            
133
  // Returns true if the multiplexer is in an idle state.
134
  // Idle state is defined as:
135
  // - There are no requests in progress.
136
48
  bool isIdle() const { return is_idle_; }
137

            
138
private:
139
  friend class MultiStream;
140
  MuxDemux(Server::Configuration::FactoryContext& context);
141

            
142
89
  void switchToIdle() { is_idle_ = true; }
143

            
144
  Server::Configuration::FactoryContext& factory_context_;
145
  bool is_idle_{true};
146
};
147

            
148
} // namespace Http
149
} // namespace Envoy