Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/extensions/common/tap/admin.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/extensions/common/tap/admin.h"
2
3
#include "envoy/admin/v3/tap.pb.h"
4
#include "envoy/admin/v3/tap.pb.validate.h"
5
#include "envoy/config/tap/v3/common.pb.h"
6
#include "envoy/data/tap/v3/wrapper.pb.h"
7
8
#include "source/common/buffer/buffer_impl.h"
9
#include "source/common/protobuf/message_validator_impl.h"
10
#include "source/common/protobuf/utility.h"
11
12
namespace Envoy {
13
namespace Extensions {
14
namespace Common {
15
namespace Tap {
16
17
// Singleton registration via macro defined in envoy/singleton/manager.h
18
SINGLETON_MANAGER_REGISTRATION(tap_admin_handler);
19
20
AdminHandlerSharedPtr AdminHandler::getSingleton(OptRef<Server::Admin> admin,
21
                                                 Singleton::Manager& singleton_manager,
22
1
                                                 Event::Dispatcher& main_thread_dispatcher) {
23
1
  return singleton_manager.getTyped<AdminHandler>(
24
1
      SINGLETON_MANAGER_REGISTERED_NAME(tap_admin_handler), [&admin, &main_thread_dispatcher] {
25
1
        return std::make_shared<AdminHandler>(admin, main_thread_dispatcher);
26
1
      });
27
1
}
28
29
AdminHandler::AdminHandler(OptRef<Server::Admin> admin, Event::Dispatcher& main_thread_dispatcher)
30
1
    : admin_(admin.value()), main_thread_dispatcher_(main_thread_dispatcher) {
31
1
  const bool rc =
32
1
      admin_.addHandler("/tap", "tap filter control", MAKE_ADMIN_HANDLER(handler), true, true);
33
1
  RELEASE_ASSERT(rc, "/tap admin endpoint is taken");
34
1
  if (admin_.socket().addressType() == Network::Address::Type::Pipe) {
35
0
    ENVOY_LOG(warn, "Admin tapping (via /tap) is unreliable when the admin endpoint is a pipe and "
36
0
                    "the connection is HTTP/1. Either use an IP address or connect using HTTP/2.");
37
0
  }
38
1
}
39
40
1
AdminHandler::~AdminHandler() {
41
1
  const bool rc = admin_.removeHandler("/tap");
42
1
  ASSERT(rc);
43
1
}
44
45
Http::Code AdminHandler::handler(Http::HeaderMap&, Buffer::Instance& response,
46
0
                                 Server::AdminStream& admin_stream) {
47
0
  if (attached_request_ != nullptr) {
48
    // TODO(mattlklein123): Consider supporting concurrent admin /tap streams. Right now we support
49
    // a single stream as a simplification.
50
0
    return badRequest(response, "An attached /tap admin stream already exists. Detach it.");
51
0
  }
52
53
0
  if (admin_stream.getRequestBody() == nullptr) {
54
0
    return badRequest(response, "/tap requires a JSON/YAML body");
55
0
  }
56
57
0
  envoy::admin::v3::TapRequest tap_request;
58
0
  TRY_NEEDS_AUDIT {
59
0
    MessageUtil::loadFromYamlAndValidate(admin_stream.getRequestBody()->toString(), tap_request,
60
0
                                         ProtobufMessage::getStrictValidationVisitor());
61
0
  }
62
0
  END_TRY catch (EnvoyException& e) { return badRequest(response, e.what()); }
63
64
0
  ENVOY_LOG(debug, "tap admin request for config_id={}", tap_request.config_id());
65
0
  if (config_id_map_.count(tap_request.config_id()) == 0) {
66
0
    return badRequest(
67
0
        response, fmt::format("Unknown config id '{}'. No extension has registered with this id.",
68
0
                              tap_request.config_id()));
69
0
  }
70
0
  for (auto config : config_id_map_[tap_request.config_id()]) {
71
0
    config->newTapConfig(tap_request.tap_config(), this);
72
0
  }
73
74
0
  admin_stream.setEndStreamOnComplete(false);
75
0
  admin_stream.addOnDestroyCallback([this] {
76
0
    for (auto config : config_id_map_[attached_request_->id()]) {
77
0
      ENVOY_LOG(debug, "detach tap admin request for config_id={}", attached_request_->id());
78
0
      config->clearTapConfig();
79
0
    }
80
0
    attached_request_.reset(); // remove ref to attached_request_
81
0
  });
82
0
  attached_request_ = AttachedRequest::createAttachedRequest(this, tap_request, &admin_stream);
83
0
  return Http::Code::OK;
84
0
}
85
86
0
Http::Code AdminHandler::badRequest(Buffer::Instance& response, absl::string_view error) {
87
0
  ENVOY_LOG(debug, "handler bad request: {}", error);
88
0
  response.add(error);
89
0
  return Http::Code::BadRequest;
90
0
}
91
92
1
void AdminHandler::registerConfig(ExtensionConfig& config, const std::string& config_id) {
93
1
  ASSERT(!config_id.empty());
94
1
  ASSERT(config_id_map_[config_id].count(&config) == 0);
95
1
  config_id_map_[config_id].insert(&config);
96
1
  if (attached_request_ != nullptr && attached_request_->id() == config_id) {
97
0
    config.newTapConfig(attached_request_->config(), this);
98
0
  }
99
1
}
100
101
1
void AdminHandler::unregisterConfig(ExtensionConfig& config) {
102
1
  ASSERT(!config.adminId().empty());
103
1
  std::string admin_id(config.adminId());
104
1
  ASSERT(config_id_map_[admin_id].count(&config) == 1);
105
1
  config_id_map_[admin_id].erase(&config);
106
1
  if (config_id_map_[admin_id].empty()) {
107
1
    config_id_map_.erase(admin_id);
108
1
  }
109
1
}
110
111
PerTapSinkHandlePtr
112
AdminHandler::createPerTapSinkHandle(uint64_t trace_id,
113
0
                                     envoy::config::tap::v3::OutputSink::OutputSinkTypeCase type) {
114
0
  UNREFERENCED_PARAMETER(trace_id);
115
0
  using ProtoOutputSinkType = envoy::config::tap::v3::OutputSink::OutputSinkTypeCase;
116
0
  ASSERT(type == ProtoOutputSinkType::kStreamingAdmin ||
117
0
         type == ProtoOutputSinkType::kBufferedAdmin);
118
119
  /**
120
   * Switching on the sink type here again after doing so in TapConfigBaseImpl constructor
121
   * seems a bit strange. A possible refactor in the future could involve moving all Sinks
122
   * to live where the FilePerTapSink lives, and passing in the sink to use into the AdminHandler.
123
   */
124
125
  // Select the sink implementation to use based on type specified in YAML request body
126
0
  if (type == ProtoOutputSinkType::kStreamingAdmin) {
127
0
    return std::make_unique<AdminPerTapSinkHandle>(*this);
128
0
  }
129
0
  return std::make_unique<BufferedPerTapSinkHandle>(*this);
130
0
}
131
132
void AdminHandler::TraceBuffer::bufferTrace(
133
0
    const std::shared_ptr<envoy::data::tap::v3::TraceWrapper>& trace) {
134
  // Ignore traces once the buffer is full or flushed
135
0
  if (flushed() || full()) {
136
0
    return;
137
0
  }
138
139
0
  buffer_->emplace_back(std::move(*trace));
140
0
}
141
142
void AdminHandler::AdminPerTapSinkHandle::submitTrace(TraceWrapperPtr&& trace,
143
0
                                                      envoy::config::tap::v3::OutputSink::Format) {
144
0
  ENVOY_LOG(debug, "admin submitting buffered trace to main thread");
145
  // Convert to a shared_ptr, so we can send it to the main thread.
146
0
  std::shared_ptr<envoy::data::tap::v3::TraceWrapper> shared_trace{std::move(trace)};
147
  // Non owning pointer to the attached request, does not preserve lifetime unless in use.
148
0
  std::weak_ptr<AttachedRequest> weak_attached_request(parent_.attached_request_);
149
150
  // The handle can be destroyed before the cross thread post is complete. Thus, we capture a
151
  // reference to our parent.
152
0
  parent_.main_thread_dispatcher_.post([weak_attached_request, shared_trace] {
153
    // Take temporary ownership - extend lifetime
154
0
    std::shared_ptr<AttachedRequest> attached_request = weak_attached_request.lock();
155
0
    if (!attached_request) {
156
      // NOTE: Cannot do much here in response to the failed post as an HTTP response code has
157
      // already been sent on completion of AdminHandler::handler.
158
0
      ENVOY_LOG(debug, "attached request does not exist, not streaming trace");
159
0
      return; // No attached request, flushed already
160
0
    }
161
162
0
    attached_request->streamMsg(*shared_trace, false);
163
0
  });
164
0
}
165
166
void AdminHandler::BufferedPerTapSinkHandle::submitTrace(
167
0
    TraceWrapperPtr&& trace, envoy::config::tap::v3::OutputSink::Format) {
168
  // Convert to a shared_ptr to extend lifetime so we can send it to the main thread.
169
0
  std::shared_ptr<envoy::data::tap::v3::TraceWrapper> shared_trace(std::move(trace));
170
  // Non owning pointer to the attached request, does not preserve lifetime unless in use
171
0
  std::weak_ptr<AttachedRequest> weak_attached_request(parent_.attached_request_);
172
173
0
  parent_.main_thread_dispatcher_.post([shared_trace, weak_attached_request] {
174
    // Take temporary ownership - extend lifetime
175
0
    std::shared_ptr<AttachedRequest> attached_request = weak_attached_request.lock();
176
0
    if (!attached_request) {
177
      // NOTE: Cannot do much here in response to the failed post as an HTTP response code has
178
      // already been sent on completion of AdminHandler::handler. Additionally we probably don't
179
      // want to take any action in this event as this case may be hit in "normal" usage, depending
180
      // on when the destruction of attached_request_ occurs.
181
0
      ENVOY_LOG(debug, "attached request does not exist, not buffering trace");
182
0
      return; // No attached request, flushed already.
183
0
    }
184
0
    TraceBuffer* trace_buffer = attached_request->traceBuffer();
185
186
    // Check if we already responded to the client
187
    // Hit when posts to buffer traces are on the dispatcher queue and the buffer is flushed
188
0
    if (trace_buffer->flushed()) {
189
0
      return;
190
0
    }
191
    // Main thread dispatcher serializes access to the trace_buffer
192
0
    trace_buffer->bufferTrace(shared_trace);
193
    // If the trace buffer is not full yet, wait to buffer more traces
194
0
    if (!trace_buffer->full()) {
195
0
      return;
196
0
    }
197
198
0
    std::vector<envoy::data::tap::v3::TraceWrapper> buffer = trace_buffer->flush();
199
200
0
    ENVOY_LOG(debug, "admin writing buffered trace list to response");
201
202
    // Serialize writes to the stream
203
0
    for (size_t i = 0; i < buffer.size(); i++) {
204
      // Close stream on final message
205
0
      attached_request->streamMsg(buffer[i], (i + 1) == buffer.size());
206
0
    }
207
0
  });
208
0
}
209
210
AdminHandler::AttachedRequest::AttachedRequest(AdminHandler* admin_handler,
211
                                               const envoy::admin::v3::TapRequest& tap_request,
212
                                               Server::AdminStream* admin_stream)
213
    : config_id_(tap_request.config_id()), config_(tap_request.tap_config()),
214
0
      admin_stream_(admin_stream), main_thread_dispatcher_(admin_handler->main_thread_dispatcher_) {
215
0
}
216
217
AdminHandler::AttachedRequestBuffered::AttachedRequestBuffered(
218
    AdminHandler* admin_handler, const envoy::admin::v3::TapRequest& tap_request,
219
    Server::AdminStream* admin_stream)
220
0
    : AttachedRequest(admin_handler, tap_request, admin_stream) {
221
0
  const envoy::config::tap::v3::OutputSink& sink =
222
0
      tap_request.tap_config().output_config().sinks(0);
223
224
0
  const uint64_t max_buffered_traces = sink.buffered_admin().max_traces();
225
0
  const uint64_t timeout_ms = PROTOBUF_GET_MS_OR_DEFAULT(sink.buffered_admin(), timeout, 0);
226
0
  trace_buffer_ = std::make_unique<TraceBuffer>(max_buffered_traces);
227
  // Start the countdown if provided an actual timeout
228
0
  if (timeout_ms > 0) {
229
0
    timer_ = dispatcher().createTimer(
230
0
        [this, admin_handler] { this->onTimeout(admin_handler->attached_request_); });
231
0
    timer_->enableTimer(std::chrono::milliseconds(timeout_ms));
232
0
  }
233
0
}
234
235
void AdminHandler::AttachedRequestBuffered::onTimeout(
236
0
    const std::weak_ptr<AttachedRequest>& weak_attached_request) {
237
  // Flush the buffer regardless of size
238
0
  dispatcher().post([weak_attached_request] {
239
    // Take temporary ownership - extend lifetime
240
0
    std::shared_ptr<AttachedRequest> attached_request = weak_attached_request.lock();
241
0
    if (!attached_request) {
242
      // NOTE: Cannot do much here in response to the failed post as an HTTP response code has
243
      // already been sent on completion of AdminHandler::handler. Additionally we probably don't
244
      // want to take any action in this event as this case may be hit in "normal" usage, depending
245
      // on when the destruction of attached_request_ occurs.
246
0
      ENVOY_LOG(debug, "Timer Expiry after admin tap request completion");
247
0
      return; // No attached request, flushed already.
248
0
    }
249
0
    TraceBuffer* trace_buffer = attached_request->traceBuffer();
250
251
    // if the trace buffer has already been flushed short circuit.
252
    // Hit when this timeout callback is on the dispatcher queue and the buffer is flushed
253
0
    if (trace_buffer->flushed()) {
254
0
      return;
255
0
    }
256
257
0
    std::vector<envoy::data::tap::v3::TraceWrapper> buffer = trace_buffer->flush();
258
259
0
    ENVOY_LOG(debug, "Timer Expiry, admin flushing buffered traces to response");
260
261
    // Serialize writes to the stream
262
0
    for (const auto& trace : buffer) {
263
0
      attached_request->streamMsg(trace);
264
0
    }
265
0
    attached_request->endStream();
266
0
  });
267
0
}
268
269
0
void AdminHandler::AttachedRequest::endStream() {
270
0
  Buffer::OwnedImpl output_buffer;
271
0
  stream()->getDecoderFilterCallbacks().encodeData(output_buffer, true);
272
0
}
273
274
0
void AdminHandler::AttachedRequest::streamMsg(const Protobuf::Message& message, bool end_stream) {
275
0
  std::string output_string;
276
277
0
  switch (format()) {
278
0
    PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
279
0
  case envoy::config::tap::v3::OutputSink::JSON_BODY_AS_STRING:
280
0
  case envoy::config::tap::v3::OutputSink::JSON_BODY_AS_BYTES:
281
0
    output_string = MessageUtil::getJsonStringFromMessageOrError(message, true, true);
282
0
    break;
283
0
  case envoy::config::tap::v3::OutputSink::PROTO_BINARY_LENGTH_DELIMITED: {
284
0
    Protobuf::io::StringOutputStream stream(&output_string);
285
0
    Protobuf::io::CodedOutputStream coded_stream(&stream);
286
0
    coded_stream.WriteVarint64(message.ByteSizeLong());
287
0
    message.SerializeWithCachedSizes(&coded_stream);
288
0
    break;
289
0
  }
290
0
  case envoy::config::tap::v3::OutputSink::PROTO_BINARY:
291
0
  case envoy::config::tap::v3::OutputSink::PROTO_TEXT:
292
0
    PANIC("not implemented");
293
0
  }
294
295
0
  Buffer::OwnedImpl output_buffer{output_string};
296
0
  stream()->getDecoderFilterCallbacks().encodeData(output_buffer, end_stream);
297
0
}
298
299
std::shared_ptr<AdminHandler::AttachedRequest> AdminHandler::AttachedRequest::createAttachedRequest(
300
    AdminHandler* admin_handler, const envoy::admin::v3::TapRequest& tap_request,
301
0
    Server::AdminStream* admin_stream) {
302
0
  using ProtoOutputSink = envoy::config::tap::v3::OutputSink;
303
304
0
  const ProtoOutputSink& sink = tap_request.tap_config().output_config().sinks(0);
305
306
0
  switch (sink.output_sink_type_case()) {
307
0
  case ProtoOutputSink::kBufferedAdmin:
308
0
    return std::make_shared<AttachedRequestBuffered>(admin_handler, tap_request, admin_stream);
309
0
  default:
310
0
    return std::make_shared<AttachedRequest>(admin_handler, tap_request, admin_stream);
311
0
  }
312
0
}
313
314
} // namespace Tap
315
} // namespace Common
316
} // namespace Extensions
317
} // namespace Envoy