1
#pragma once
2

            
3
#include <memory>
4

            
5
#include "envoy/server/instance.h"
6

            
7
#include "source/common/buffer/buffer_impl.h"
8
#include "source/common/http/header_map_impl.h"
9

            
10
#include "absl/container/flat_hash_set.h"
11
#include "absl/synchronization/mutex.h"
12

            
13
namespace Envoy {
14

            
15
class AdminResponse;
16

            
17
// Holds context for a streaming response from the admin system, enabling
18
// flow-control into another system. This is particularly important when the
19
// generated response is very large, such that holding it in memory may cause
20
// fragmentation or out-of-memory failures. It is possible to interleave xDS
21
// response handling, overload management, and other admin requests during the
22
// streaming of a long admin response.
23
//
24
// There can be be multiple AdminResponses at a time; each are separately
25
// managed. However they will obtain their data from Envoy functions that
26
// run on the main thread.
27
//
28
// Responses may still be active after the server has shut down, and is no
29
// longer running its main thread dispatcher. In this state, the callbacks
30
// will be called with appropriate error codes.
31
//
32
// Requests can also be cancelled explicitly by calling cancel(). After
33
// cancel() is called, no further callbacks will be called by the response.
34
//
35
// The lifecycle of an AdminResponse is rendered as a finite state machine
36
// bubble diagram:
37
// https://docs.google.com/drawings/d/1njUl1twApEMoxmjaG4b7optTh5fcb_YNcfSnkHbdfq0/view
38
class AdminResponse : public std::enable_shared_from_this<AdminResponse> {
39
public:
40
  // AdminResponse can outlive MainCommonBase. But AdminResponse needs a
41
  // reliable way of knowing whether MainCommonBase is alive, so we do this with
42
  // PtrSet, which is held by MainCommonBase and all the active AdminResponses.
43
  // via shared_ptr. This gives MainCommonBase a reliable way of notifying all
44
  // active responses that it is being shut down, and thus all responses need to
45
  // be terminated. And it gives a reliable way for AdminResponse to detach
46
  // itself, whether or not MainCommonBase is already deleted.
47
  //
48
  // In summary:
49
  //  * MainCommonBase can outlive AdminResponse so we need detachResponse.
50
  //  * AdminResponse can outlive MainCommonBase, so we need shared_ptr.
51
  class PtrSet {
52
  public:
53
    /**
54
     * Called when an AdminResponse is created. When terminateAdminRequests is
55
     * called, all outstanding response objects have their terminate() methods
56
     * called.
57
     *
58
     * @param response the response pointer to be added to the set.
59
     */
60
    void attachResponse(AdminResponse* response);
61

            
62
    /**
63
     * Called when an AdminResponse is terminated, either by completing normally
64
     * or having the caller call cancel on it. Either way it needs to be removed
65
     * from the set that will be used by terminateAdminRequests below.
66
     *
67
     * @param response the response pointer to be removed from the set.
68
     */
69
    void detachResponse(AdminResponse* response);
70

            
71
    /**
72
     * Called after the server run-loop finishes; any outstanding streaming
73
     * admin requests will otherwise hang as the main-thread dispatcher loop
74
     * will no longer run.
75
     */
76
    void terminateAdminRequests();
77

            
78
    mutable absl::Mutex mutex_;
79
    absl::flat_hash_set<AdminResponse*> response_set_ ABSL_GUARDED_BY(mutex_);
80
    bool accepting_admin_requests_ ABSL_GUARDED_BY(mutex_) = true;
81
  };
82
  using SharedPtrSet = std::shared_ptr<PtrSet>;
83

            
84
  AdminResponse(Server::Instance& server, absl::string_view path, absl::string_view method,
85
                SharedPtrSet response_set);
86
  ~AdminResponse();
87

            
88
  /**
89
   * Requests the headers for the response. This can be called from any
90
   * thread, and HeaderFn may also be called from any thread.
91
   *
92
   * HeadersFn will not be called after cancel(). It is invalid to
93
   * to call nextChunk from within HeadersFn -- the caller must trigger
94
   * such a call on another thread, after HeadersFn returns. Calling
95
   * nextChunk from HeadersFn may deadlock.
96
   *
97
   * If the server is shut down during the operation, headersFn may
98
   * be called with a 503, if it has not already been called.
99
   *
100
   * @param fn The function to be called with the headers and status code.
101
   */
102
  using HeadersFn = std::function<void(Http::Code, Http::ResponseHeaderMap& map)>;
103
  void getHeaders(HeadersFn fn);
104

            
105
  /**
106
   * Requests a new chunk. This can be called from any thread, and the BodyFn
107
   * callback may also be called from any thread. BodyFn will be called in a
108
   * loop until the Buffer passed to it is fully drained. When 'false' is
109
   * passed as the second arg to BodyFn, that signifies the end of the
110
   * response, and nextChunk must not be called again.
111
   *
112
   * BodyFn will not be called after cancel(). It is invalid to
113
   * to call nextChunk from within BodyFn -- the caller must trigger
114
   * such a call on another thread, after BodyFn returns. Calling
115
   * nextChunk from BodyFn may deadlock.
116
   *
117
   * If the server is shut down during the operation, bodyFn will
118
   * be called with an empty body and 'false' for more_data, if
119
   * this has not already occurred.
120
   *
121
   * @param fn A function to be called on each chunk.
122
   */
123
  using BodyFn = std::function<void(Buffer::Instance&, bool)>;
124
  void nextChunk(BodyFn fn);
125

            
126
  /**
127
   * Requests that any outstanding callbacks be dropped. This can be called
128
   * when the context in which the request is made is destroyed. This enables
129
   * an application to implement a. The Response itself is held as a
130
   * shared_ptr as that makes it much easier to manage cancellation across
131
   * multiple threads.
132
   */
133
  void cancel();
134

            
135
  /**
136
   * @return whether the request was cancelled.
137
   */
138
  bool cancelled() const;
139

            
140
private:
141
  /**
142
   * Called when the server is terminated. This calls any outstanding
143
   * callbacks to be called. If nextChunk is called after termination,
144
   * its callback is called false for the second arg, indicating
145
   * end of stream.
146
   */
147
  void terminate();
148

            
149
  void requestHeaders();
150
  void requestNextChunk();
151
  void sendAbortChunkLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
152
  void sendErrorLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
153

            
154
  Server::Instance& server_;
155
  OptRef<Server::Admin> opt_admin_;
156
  Buffer::OwnedImpl response_;
157
  Http::Code code_;
158
  Server::Admin::RequestPtr request_;
159
  Http::RequestHeaderMapPtr request_headers_{Http::RequestHeaderMapImpl::create()};
160
  Http::ResponseHeaderMapPtr response_headers_{Http::ResponseHeaderMapImpl::create()};
161
  bool more_data_ = true;
162

            
163
  // True if cancel() was explicitly called by the user; headers and body
164
  // callbacks are never called after cancel().
165
  bool cancelled_ ABSL_GUARDED_BY(mutex_) = false;
166

            
167
  // True if the Envoy server has stopped running its main loop. Headers and
168
  // body requests can be initiated and called back are called after terminate,
169
  // so callers do not have to special case this -- the request will simply fail
170
  // with an empty response.
171
  bool terminated_ ABSL_GUARDED_BY(mutex_) = false;
172

            
173
  // Used to indicate whether the body function has been called with false
174
  // as its second argument. That must always happen at most once, even
175
  // if terminate races with the normal end-of-stream marker. more=false
176
  // may never be sent if the request is cancelled, nor deleted prior to
177
  // it being requested.
178
  bool sent_end_stream_ ABSL_GUARDED_BY(mutex_) = false;
179

            
180
  HeadersFn headers_fn_ ABSL_GUARDED_BY(mutex_);
181
  BodyFn body_fn_ ABSL_GUARDED_BY(mutex_);
182
  mutable absl::Mutex mutex_;
183

            
184
  SharedPtrSet shared_response_set_;
185
};
186
using AdminResponseSharedPtr = std::shared_ptr<AdminResponse>;
187

            
188
} // namespace Envoy