1
#pragma once
2

            
3
#include <vector>
4

            
5
#include "envoy/server/admin.h"
6
#include "envoy/singleton/manager.h"
7

            
8
#include "source/extensions/common/tap/tap.h"
9

            
10
#include "absl/container/node_hash_set.h"
11
#include "absl/types/optional.h"
12

            
13
namespace envoy {
14
namespace admin {
15
namespace v3 {
16
class TapRequest;
17
}
18
} // namespace admin
19
} // namespace envoy
20

            
21
namespace Envoy {
22
namespace Extensions {
23
namespace Common {
24
namespace Tap {
25

            
26
class AdminHandler;
27
using AdminHandlerSharedPtr = std::shared_ptr<AdminHandler>;
28

            
29
/**
30
 * Singleton /tap admin handler for admin management of tap configurations and output. This
31
 * handler is not installed and active unless the tap configuration specifically configures it.
32
 * TODO(mattklein123): We should allow the admin handler to always be installed in read only mode
33
 *                     so it's easier to debug the active tap configuration.
34
 */
35
class AdminHandler : public Singleton::Instance,
36
                     public Extensions::Common::Tap::Sink,
37
                     Logger::Loggable<Logger::Id::tap> {
38
public:
39
  AdminHandler(OptRef<Server::Admin> admin, Event::Dispatcher& main_thread_dispatcher);
40
  ~AdminHandler() override;
41

            
42
  /**
43
   * Get the singleton admin handler. The handler will be created if it doesn't already exist,
44
   * otherwise the existing handler will be returned.
45
   */
46
  static AdminHandlerSharedPtr getSingleton(OptRef<Server::Admin> admin,
47
                                            Singleton::Manager& singleton_manager,
48
                                            Event::Dispatcher& main_thread_dispatcher);
49

            
50
  /**
51
   * Register a new extension config to the handler so that it can be admin managed.
52
   * @param config supplies the config to register.
53
   * @param config_id supplies the ID to use for managing the configuration. Multiple extensions
54
   *        can use the same ID so they can be managed in aggregate (e.g., an HTTP filter on
55
   *        many listeners).
56
   */
57
  void registerConfig(ExtensionConfig& config, const std::string& config_id);
58

            
59
  /**
60
   * Unregister an extension config from the handler.
61
   * @param config supplies the previously registered config.
62
   */
63
  void unregisterConfig(ExtensionConfig& config);
64

            
65
  // Extensions::Common::Tap::Sink
66
  PerTapSinkHandlePtr
67
  createPerTapSinkHandle(uint64_t trace_id,
68
                         envoy::config::tap::v3::OutputSink::OutputSinkTypeCase type) override;
69

            
70
private:
71
  /**
72
   * TraceBuffer internally buffers TraceWrappers in a vector. The size of the
73
   * internal buffer is determined on construction by max_buf_size. TraceBuffer will continue to
74
   * buffer traces via calls to bufferTrace until there is no remaining room in the buffer, or
75
   * traceList is called, transfering ownership of the buffered data.
76
   * Note that TraceBuffer is not threadsafe by itself - accesses to TraceBuffer need to be
77
   * serialized. Serialization is currently done by the main thread dispatcher.
78
   */
79
  class TraceBuffer {
80
    using TraceWrapper = envoy::data::tap::v3::TraceWrapper;
81

            
82
  public:
83
    TraceBuffer(uint64_t max_buf_size)
84
13
        : max_buf_size_(max_buf_size), buffer_(std::vector<TraceWrapper>()) {
85
13
      buffer_->reserve(max_buf_size);
86
13
    }
87

            
88
    // Buffers trace internally if there is space available
89
    // This function takes exclusive ownership of trace and may destroy the content of trace.
90
    // A unique_ptr is semantically more correct here, but a shared pointer is
91
    // needed for traces to be captured in lambda function (submitTrace).
92
    void bufferTrace(const std::shared_ptr<envoy::data::tap::v3::TraceWrapper>& trace);
93

            
94
    // Returns true if the trace buffer is full (reached max_buf_size_) false otherwise
95
58
    bool full() const { return (buffer_ && (buffer_->size() == max_buf_size_)); }
96

            
97
    // Return true if the buffer has already been flushed, false otherwise.
98
65
    bool flushed() const { return !buffer_; }
99

            
100
    // Take ownership of the internally managed trace list
101
12
    std::vector<TraceWrapper> flush() {
102
12
      std::vector<TraceWrapper> buffer = std::move(*buffer_);
103
12
      buffer_.reset(); // set optional to empty
104
12
      return buffer;
105
12
    }
106

            
107
  private:
108
    const size_t max_buf_size_; // Number of traces to buffer
109
    absl::optional<std::vector<TraceWrapper>> buffer_;
110
  };
111

            
112
  /**
113
   * This object's lifetime is tied to the lifetime of the admin_stream, and is responsible for
114
   * managing all data that has lifetime tied to the admin_stream.
115
   */
116
  class AttachedRequest {
117
  public:
118
    AttachedRequest(AdminHandler* admin_handler, const envoy::admin::v3::TapRequest& tap_request,
119
                    Server::AdminStream* admin_stream);
120
24
    virtual ~AttachedRequest() = default;
121
    // Stream the protobuf message to the admin_stream using the configured format
122
    // Requires the admin_stream to be open
123
    virtual void streamMsg(const Protobuf::Message& message, bool end_stream = false);
124

            
125
    // Explicitly close the admin_stream. Stream must be open.
126
    virtual void endStream();
127

            
128
    // Factory method for AttachedRequests - uses protobuf input to determine the subtype of
129
    // AttachedRequest to create
130
    static std::shared_ptr<AttachedRequest>
131
    createAttachedRequest(AdminHandler* admin_handler,
132
                          const envoy::admin::v3::TapRequest& tap_request,
133
                          Server::AdminStream* admin_stream);
134

            
135
    // --------- Accessors ---------
136
    // Get a pointer to the internal trace buffer. This method only applies for
137
    // the Buffered sink type, but exists in the generic API to avoid
138
    // dynamic casting of the AttachedRequest type elsewhere
139
1
    virtual TraceBuffer* traceBuffer() const { return nullptr; }
140
20
    const std::string& id() const { return config_id_; }
141
2
    const envoy::config::tap::v3::TapConfig& config() const { return config_; }
142
39
    envoy::config::tap::v3::OutputSink::Format format() const {
143
39
      return config_.output_config().sinks()[0].format();
144
39
    }
145

            
146
  protected:
147
7
    Event::Dispatcher& dispatcher() { return main_thread_dispatcher_; }
148
41
    const Server::AdminStream* stream() const { return admin_stream_; }
149

            
150
  private:
151
    const std::string config_id_;
152
    const envoy::config::tap::v3::TapConfig config_;
153
    const Server::AdminStream* admin_stream_;
154
    Event::Dispatcher& main_thread_dispatcher_;
155
    friend class BaseAdminHandlerTest;
156
  };
157

            
158
  /**
159
   * AttachedRequest with additional data specific to the Buffered Sink type
160
   */
161
  class AttachedRequestBuffered : public AttachedRequest {
162
    // Callback fired on timer expiry
163
    void onTimeout(const std::weak_ptr<AttachedRequest>& attached_request);
164

            
165
  public:
166
36
    TraceBuffer* traceBuffer() const override { return trace_buffer_.get(); }
167

            
168
    AttachedRequestBuffered(AdminHandler* admin_handler,
169
                            const envoy::admin::v3::TapRequest& tap_request,
170
                            Server::AdminStream* admin_stream);
171

            
172
  private:
173
    Event::TimerPtr timer_;
174
    // Pointer to buffered traces, only exists if the sink type requires buffering multiple traces
175
    std::unique_ptr<TraceBuffer> trace_buffer_;
176
    friend class BufferedAdminHandlerTest; // For testing Purposes
177
  };
178

            
179
  struct AdminPerTapSinkHandle : public PerTapSinkHandle {
180
16
    AdminPerTapSinkHandle(AdminHandler& parent) : parent_(parent) {}
181

            
182
    // Extensions::Common::Tap::PerTapSinkHandle
183
    void submitTrace(TraceWrapperPtr&& trace,
184
                     envoy::config::tap::v3::OutputSink::Format format) override;
185

            
186
    AdminHandler& parent_;
187
  };
188

            
189
  /**
190
   * Sink for buffering a variable number of traces in a TraceBuffer
191
   */
192
  struct BufferedPerTapSinkHandle : public PerTapSinkHandle {
193
27
    BufferedPerTapSinkHandle(AdminHandler& parent) : parent_(parent) {}
194

            
195
    // Extensions::Common::Tap::PerTapSinkHandle
196
    void submitTrace(TraceWrapperPtr&& trace,
197
                     envoy::config::tap::v3::OutputSink::Format format) override;
198

            
199
    AdminHandler& parent_;
200
  };
201

            
202
  Http::Code handler(Http::HeaderMap& response_headers, Buffer::Instance& response,
203
                     Server::AdminStream& admin_stream);
204
  Http::Code badRequest(Buffer::Instance& response, absl::string_view error);
205

            
206
  Server::Admin& admin_;
207
  Event::Dispatcher& main_thread_dispatcher_;
208
  absl::node_hash_map<std::string, absl::node_hash_set<ExtensionConfig*>> config_id_map_;
209
  std::shared_ptr<AttachedRequest> attached_request_;
210
  friend class BaseAdminHandlerTest;     // For testing purposes
211
  friend class BufferedAdminHandlerTest; // For testing Purposes
212
};
213

            
214
} // namespace Tap
215
} // namespace Common
216
} // namespace Extensions
217
} // namespace Envoy