1
#include "source/extensions/common/tap/admin.h"
2

            
3
#include "envoy/admin/v3/tap.pb.h"
4
#include "envoy/admin/v3/tap.pb.validate.h"
5
#include "envoy/config/tap/v3/common.pb.h"
6
#include "envoy/data/tap/v3/wrapper.pb.h"
7

            
8
#include "source/common/buffer/buffer_impl.h"
9
#include "source/common/protobuf/message_validator_impl.h"
10
#include "source/common/protobuf/utility.h"
11

            
12
namespace Envoy {
13
namespace Extensions {
14
namespace Common {
15
namespace Tap {
16

            
17
// Singleton registration via macro defined in envoy/singleton/manager.h
18
SINGLETON_MANAGER_REGISTRATION(tap_admin_handler);
19

            
20
AdminHandlerSharedPtr AdminHandler::getSingleton(OptRef<Server::Admin> admin,
21
                                                 Singleton::Manager& singleton_manager,
22
19
                                                 Event::Dispatcher& main_thread_dispatcher) {
23
19
  return singleton_manager.getTyped<AdminHandler>(
24
19
      SINGLETON_MANAGER_REGISTERED_NAME(tap_admin_handler), [&admin, &main_thread_dispatcher] {
25
17
        return std::make_shared<AdminHandler>(admin, main_thread_dispatcher);
26
17
      });
27
19
}
28

            
29
AdminHandler::AdminHandler(OptRef<Server::Admin> admin, Event::Dispatcher& main_thread_dispatcher)
30
27
    : admin_(admin.value()), main_thread_dispatcher_(main_thread_dispatcher) {
31
27
  const bool rc =
32
27
      admin_.addHandler("/tap", "tap filter control", MAKE_ADMIN_HANDLER(handler), true, true);
33
27
  RELEASE_ASSERT(rc, "/tap admin endpoint is taken");
34
27
  if (admin_.socket().addressType() == Network::Address::Type::Pipe) {
35
1
    ENVOY_LOG(warn, "Admin tapping (via /tap) is unreliable when the admin endpoint is a pipe and "
36
1
                    "the connection is HTTP/1. Either use an IP address or connect using HTTP/2.");
37
1
  }
38
27
}
39

            
40
27
AdminHandler::~AdminHandler() {
41
27
  const bool rc = admin_.removeHandler("/tap");
42
27
  ASSERT(rc);
43
27
}
44

            
45
Http::Code AdminHandler::handler(Http::HeaderMap&, Buffer::Instance& response,
46
28
                                 Server::AdminStream& admin_stream) {
47
28
  if (attached_request_ != nullptr) {
48
    // TODO(mattlklein123): Consider supporting concurrent admin /tap streams. Right now we support
49
    // a single stream as a simplification.
50
1
    return badRequest(response, "An attached /tap admin stream already exists. Detach it.");
51
1
  }
52

            
53
27
  if (admin_stream.getRequestBody() == nullptr) {
54
1
    return badRequest(response, "/tap requires a JSON/YAML body");
55
1
  }
56

            
57
26
  envoy::admin::v3::TapRequest tap_request;
58
26
  TRY_NEEDS_AUDIT {
59
26
    MessageUtil::loadFromYamlAndValidate(admin_stream.getRequestBody()->toString(), tap_request,
60
26
                                         ProtobufMessage::getStrictValidationVisitor());
61
26
  }
62
26
  END_TRY catch (EnvoyException& e) { return badRequest(response, e.what()); }
63

            
64
25
  ENVOY_LOG(debug, "tap admin request for config_id={}", tap_request.config_id());
65
25
  if (config_id_map_.count(tap_request.config_id()) == 0) {
66
1
    return badRequest(
67
1
        response, fmt::format("Unknown config id '{}'. No extension has registered with this id.",
68
1
                              tap_request.config_id()));
69
1
  }
70
24
  for (auto config : config_id_map_[tap_request.config_id()]) {
71
24
    config->newTapConfig(tap_request.tap_config(), this);
72
24
  }
73

            
74
24
  admin_stream.setEndStreamOnComplete(false);
75
24
  admin_stream.addOnDestroyCallback([this] {
76
20
    for (auto config : config_id_map_[attached_request_->id()]) {
77
20
      ENVOY_LOG(debug, "detach tap admin request for config_id={}", attached_request_->id());
78
20
      config->clearTapConfig();
79
20
    }
80
18
    attached_request_.reset(); // remove ref to attached_request_
81
18
  });
82
24
  attached_request_ = AttachedRequest::createAttachedRequest(this, tap_request, &admin_stream);
83
24
  return Http::Code::OK;
84
25
}
85

            
86
4
Http::Code AdminHandler::badRequest(Buffer::Instance& response, absl::string_view error) {
87
4
  ENVOY_LOG(debug, "handler bad request: {}", error);
88
4
  response.add(error);
89
4
  return Http::Code::BadRequest;
90
4
}
91

            
92
25
void AdminHandler::registerConfig(ExtensionConfig& config, const std::string& config_id) {
93
25
  ASSERT(!config_id.empty());
94
25
  ASSERT(config_id_map_[config_id].count(&config) == 0);
95
25
  config_id_map_[config_id].insert(&config);
96
25
  if (attached_request_ != nullptr && attached_request_->id() == config_id) {
97
2
    config.newTapConfig(attached_request_->config(), this);
98
2
  }
99
25
}
100

            
101
19
void AdminHandler::unregisterConfig(ExtensionConfig& config) {
102
19
  ASSERT(!config.adminId().empty());
103
19
  std::string admin_id(config.adminId());
104
19
  ASSERT(config_id_map_[admin_id].count(&config) == 1);
105
19
  config_id_map_[admin_id].erase(&config);
106
19
  if (config_id_map_[admin_id].empty()) {
107
17
    config_id_map_.erase(admin_id);
108
17
  }
109
19
}
110

            
111
PerTapSinkHandlePtr
112
AdminHandler::createPerTapSinkHandle(uint64_t trace_id,
113
43
                                     envoy::config::tap::v3::OutputSink::OutputSinkTypeCase type) {
114
43
  UNREFERENCED_PARAMETER(trace_id);
115
43
  using ProtoOutputSinkType = envoy::config::tap::v3::OutputSink::OutputSinkTypeCase;
116
43
  ASSERT(type == ProtoOutputSinkType::kStreamingAdmin ||
117
43
         type == ProtoOutputSinkType::kBufferedAdmin);
118

            
119
  /**
120
   * Switching on the sink type here again after doing so in TapConfigBaseImpl constructor
121
   * seems a bit strange. A possible refactor in the future could involve moving all Sinks
122
   * to live where the FilePerTapSink lives, and passing in the sink to use into the AdminHandler.
123
   */
124

            
125
  // Select the sink implementation to use based on type specified in YAML request body
126
43
  if (type == ProtoOutputSinkType::kStreamingAdmin) {
127
16
    return std::make_unique<AdminPerTapSinkHandle>(*this);
128
16
  }
129
27
  return std::make_unique<BufferedPerTapSinkHandle>(*this);
130
43
}
131

            
132
void AdminHandler::TraceBuffer::bufferTrace(
133
29
    const std::shared_ptr<envoy::data::tap::v3::TraceWrapper>& trace) {
134
  // Ignore traces once the buffer is full or flushed
135
29
  if (flushed() || full()) {
136
    return;
137
  }
138

            
139
29
  buffer_->emplace_back(std::move(*trace));
140
29
}
141

            
142
void AdminHandler::AdminPerTapSinkHandle::submitTrace(TraceWrapperPtr&& trace,
143
11
                                                      envoy::config::tap::v3::OutputSink::Format) {
144
11
  ENVOY_LOG(debug, "admin submitting buffered trace to main thread");
145
  // Convert to a shared_ptr, so we can send it to the main thread.
146
11
  std::shared_ptr<envoy::data::tap::v3::TraceWrapper> shared_trace{std::move(trace)};
147
  // Non owning pointer to the attached request, does not preserve lifetime unless in use.
148
11
  std::weak_ptr<AttachedRequest> weak_attached_request(parent_.attached_request_);
149

            
150
  // The handle can be destroyed before the cross thread post is complete. Thus, we capture a
151
  // reference to our parent.
152
11
  parent_.main_thread_dispatcher_.post([weak_attached_request, shared_trace] {
153
    // Take temporary ownership - extend lifetime
154
11
    std::shared_ptr<AttachedRequest> attached_request = weak_attached_request.lock();
155
11
    if (!attached_request) {
156
      // NOTE: Cannot do much here in response to the failed post as an HTTP response code has
157
      // already been sent on completion of AdminHandler::handler.
158
1
      ENVOY_LOG(debug, "attached request does not exist, not streaming trace");
159
1
      return; // No attached request, flushed already
160
1
    }
161

            
162
10
    attached_request->streamMsg(*shared_trace, false);
163
10
  });
164
11
}
165

            
166
void AdminHandler::BufferedPerTapSinkHandle::submitTrace(
167
34
    TraceWrapperPtr&& trace, envoy::config::tap::v3::OutputSink::Format) {
168
  // Convert to a shared_ptr to extend lifetime so we can send it to the main thread.
169
34
  std::shared_ptr<envoy::data::tap::v3::TraceWrapper> shared_trace(std::move(trace));
170
  // Non owning pointer to the attached request, does not preserve lifetime unless in use
171
34
  std::weak_ptr<AttachedRequest> weak_attached_request(parent_.attached_request_);
172

            
173
34
  parent_.main_thread_dispatcher_.post([shared_trace, weak_attached_request] {
174
    // Take temporary ownership - extend lifetime
175
34
    std::shared_ptr<AttachedRequest> attached_request = weak_attached_request.lock();
176
34
    if (!attached_request) {
177
      // NOTE: Cannot do much here in response to the failed post as an HTTP response code has
178
      // already been sent on completion of AdminHandler::handler. Additionally we probably don't
179
      // want to take any action in this event as this case may be hit in "normal" usage, depending
180
      // on when the destruction of attached_request_ occurs.
181
1
      ENVOY_LOG(debug, "attached request does not exist, not buffering trace");
182
1
      return; // No attached request, flushed already.
183
1
    }
184
33
    TraceBuffer* trace_buffer = attached_request->traceBuffer();
185

            
186
    // Check if we already responded to the client
187
    // Hit when posts to buffer traces are on the dispatcher queue and the buffer is flushed
188
33
    if (trace_buffer->flushed()) {
189
4
      return;
190
4
    }
191
    // Main thread dispatcher serializes access to the trace_buffer
192
29
    trace_buffer->bufferTrace(shared_trace);
193
    // If the trace buffer is not full yet, wait to buffer more traces
194
29
    if (!trace_buffer->full()) {
195
19
      return;
196
19
    }
197

            
198
10
    std::vector<envoy::data::tap::v3::TraceWrapper> buffer = trace_buffer->flush();
199

            
200
10
    ENVOY_LOG(debug, "admin writing buffered trace list to response");
201

            
202
    // Serialize writes to the stream
203
38
    for (size_t i = 0; i < buffer.size(); i++) {
204
      // Close stream on final message
205
28
      attached_request->streamMsg(buffer[i], (i + 1) == buffer.size());
206
28
    }
207
10
  });
208
34
}
209

            
210
AdminHandler::AttachedRequest::AttachedRequest(AdminHandler* admin_handler,
211
                                               const envoy::admin::v3::TapRequest& tap_request,
212
                                               Server::AdminStream* admin_stream)
213
24
    : config_id_(tap_request.config_id()), config_(tap_request.tap_config()),
214
24
      admin_stream_(admin_stream), main_thread_dispatcher_(admin_handler->main_thread_dispatcher_) {
215
24
}
216

            
217
AdminHandler::AttachedRequestBuffered::AttachedRequestBuffered(
218
    AdminHandler* admin_handler, const envoy::admin::v3::TapRequest& tap_request,
219
    Server::AdminStream* admin_stream)
220
13
    : AttachedRequest(admin_handler, tap_request, admin_stream) {
221
13
  const envoy::config::tap::v3::OutputSink& sink =
222
13
      tap_request.tap_config().output_config().sinks(0);
223

            
224
13
  const uint64_t max_buffered_traces = sink.buffered_admin().max_traces();
225
13
  const uint64_t timeout_ms = PROTOBUF_GET_MS_OR_DEFAULT(sink.buffered_admin(), timeout, 0);
226
13
  trace_buffer_ = std::make_unique<TraceBuffer>(max_buffered_traces);
227
  // Start the countdown if provided an actual timeout
228
13
  if (timeout_ms > 0) {
229
3
    timer_ = dispatcher().createTimer(
230
3
        [this, admin_handler] { this->onTimeout(admin_handler->attached_request_); });
231
3
    timer_->enableTimer(std::chrono::milliseconds(timeout_ms));
232
3
  }
233
13
}
234

            
235
void AdminHandler::AttachedRequestBuffered::onTimeout(
236
4
    const std::weak_ptr<AttachedRequest>& weak_attached_request) {
237
  // Flush the buffer regardless of size
238
4
  dispatcher().post([weak_attached_request] {
239
    // Take temporary ownership - extend lifetime
240
4
    std::shared_ptr<AttachedRequest> attached_request = weak_attached_request.lock();
241
4
    if (!attached_request) {
242
      // NOTE: Cannot do much here in response to the failed post as an HTTP response code has
243
      // already been sent on completion of AdminHandler::handler. Additionally we probably don't
244
      // want to take any action in this event as this case may be hit in "normal" usage, depending
245
      // on when the destruction of attached_request_ occurs.
246
1
      ENVOY_LOG(debug, "Timer Expiry after admin tap request completion");
247
1
      return; // No attached request, flushed already.
248
1
    }
249
3
    TraceBuffer* trace_buffer = attached_request->traceBuffer();
250

            
251
    // if the trace buffer has already been flushed short circuit.
252
    // Hit when this timeout callback is on the dispatcher queue and the buffer is flushed
253
3
    if (trace_buffer->flushed()) {
254
1
      return;
255
1
    }
256

            
257
2
    std::vector<envoy::data::tap::v3::TraceWrapper> buffer = trace_buffer->flush();
258

            
259
2
    ENVOY_LOG(debug, "Timer Expiry, admin flushing buffered traces to response");
260

            
261
    // Serialize writes to the stream
262
2
    for (const auto& trace : buffer) {
263
1
      attached_request->streamMsg(trace);
264
1
    }
265
2
    attached_request->endStream();
266
2
  });
267
4
}
268

            
269
2
void AdminHandler::AttachedRequest::endStream() {
270
2
  Buffer::OwnedImpl output_buffer;
271
2
  stream()->getDecoderFilterCallbacks().encodeData(output_buffer, true);
272
2
}
273

            
274
39
void AdminHandler::AttachedRequest::streamMsg(const Protobuf::Message& message, bool end_stream) {
275
39
  std::string output_string;
276

            
277
39
  switch (format()) {
278
    PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
279
2
  case envoy::config::tap::v3::OutputSink::JSON_BODY_AS_STRING:
280
16
  case envoy::config::tap::v3::OutputSink::JSON_BODY_AS_BYTES:
281
16
    output_string = MessageUtil::getJsonStringFromMessageOrError(message, true, true);
282
16
    break;
283
23
  case envoy::config::tap::v3::OutputSink::PROTO_BINARY_LENGTH_DELIMITED: {
284
23
    Protobuf::io::StringOutputStream stream(&output_string);
285
23
    Protobuf::io::CodedOutputStream coded_stream(&stream);
286
23
    coded_stream.WriteVarint64(message.ByteSizeLong());
287
23
    message.SerializeWithCachedSizes(&coded_stream);
288
23
    break;
289
2
  }
290
  case envoy::config::tap::v3::OutputSink::PROTO_BINARY:
291
  case envoy::config::tap::v3::OutputSink::PROTO_TEXT:
292
    PANIC("not implemented");
293
39
  }
294

            
295
39
  Buffer::OwnedImpl output_buffer{output_string};
296
39
  stream()->getDecoderFilterCallbacks().encodeData(output_buffer, end_stream);
297
39
}
298

            
299
std::shared_ptr<AdminHandler::AttachedRequest> AdminHandler::AttachedRequest::createAttachedRequest(
300
    AdminHandler* admin_handler, const envoy::admin::v3::TapRequest& tap_request,
301
24
    Server::AdminStream* admin_stream) {
302
24
  using ProtoOutputSink = envoy::config::tap::v3::OutputSink;
303

            
304
24
  const ProtoOutputSink& sink = tap_request.tap_config().output_config().sinks(0);
305

            
306
24
  switch (sink.output_sink_type_case()) {
307
13
  case ProtoOutputSink::kBufferedAdmin:
308
13
    return std::make_shared<AttachedRequestBuffered>(admin_handler, tap_request, admin_stream);
309
11
  default:
310
11
    return std::make_shared<AttachedRequest>(admin_handler, tap_request, admin_stream);
311
24
  }
312
24
}
313

            
314
} // namespace Tap
315
} // namespace Common
316
} // namespace Extensions
317
} // namespace Envoy