LCOV - code coverage report
Current view: top level - source/extensions/common/tap - admin.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 31 196 15.8 %
Date: 2024-01-05 06:35:25 Functions: 6 23 26.1 %

          Line data    Source code
       1             : #include "source/extensions/common/tap/admin.h"
       2             : 
       3             : #include "envoy/admin/v3/tap.pb.h"
       4             : #include "envoy/admin/v3/tap.pb.validate.h"
       5             : #include "envoy/config/tap/v3/common.pb.h"
       6             : #include "envoy/data/tap/v3/wrapper.pb.h"
       7             : 
       8             : #include "source/common/buffer/buffer_impl.h"
       9             : #include "source/common/protobuf/message_validator_impl.h"
      10             : #include "source/common/protobuf/utility.h"
      11             : 
      12             : namespace Envoy {
      13             : namespace Extensions {
      14             : namespace Common {
      15             : namespace Tap {
      16             : 
      17             : // Singleton registration via macro defined in envoy/singleton/manager.h
      18             : SINGLETON_MANAGER_REGISTRATION(tap_admin_handler);
      19             : 
      20             : AdminHandlerSharedPtr AdminHandler::getSingleton(OptRef<Server::Admin> admin,
      21             :                                                  Singleton::Manager& singleton_manager,
      22           2 :                                                  Event::Dispatcher& main_thread_dispatcher) {
      23           2 :   return singleton_manager.getTyped<AdminHandler>(
      24           2 :       SINGLETON_MANAGER_REGISTERED_NAME(tap_admin_handler), [&admin, &main_thread_dispatcher] {
      25           2 :         return std::make_shared<AdminHandler>(admin, main_thread_dispatcher);
      26           2 :       });
      27           2 : }
      28             : 
      29             : AdminHandler::AdminHandler(OptRef<Server::Admin> admin, Event::Dispatcher& main_thread_dispatcher)
      30           2 :     : admin_(admin.value()), main_thread_dispatcher_(main_thread_dispatcher) {
      31           2 :   const bool rc =
      32           2 :       admin_.addHandler("/tap", "tap filter control", MAKE_ADMIN_HANDLER(handler), true, true);
      33           2 :   RELEASE_ASSERT(rc, "/tap admin endpoint is taken");
      34           2 :   if (admin_.socket().addressType() == Network::Address::Type::Pipe) {
      35           0 :     ENVOY_LOG(warn, "Admin tapping (via /tap) is unreliable when the admin endpoint is a pipe and "
      36           0 :                     "the connection is HTTP/1. Either use an IP address or connect using HTTP/2.");
      37           0 :   }
      38           2 : }
      39             : 
      40           2 : AdminHandler::~AdminHandler() {
      41           2 :   const bool rc = admin_.removeHandler("/tap");
      42           2 :   ASSERT(rc);
      43           2 : }
      44             : 
      45             : Http::Code AdminHandler::handler(Http::HeaderMap&, Buffer::Instance& response,
      46           0 :                                  Server::AdminStream& admin_stream) {
      47           0 :   if (attached_request_ != nullptr) {
      48             :     // TODO(mattlklein123): Consider supporting concurrent admin /tap streams. Right now we support
      49             :     // a single stream as a simplification.
      50           0 :     return badRequest(response, "An attached /tap admin stream already exists. Detach it.");
      51           0 :   }
      52             : 
      53           0 :   if (admin_stream.getRequestBody() == nullptr) {
      54           0 :     return badRequest(response, "/tap requires a JSON/YAML body");
      55           0 :   }
      56             : 
      57           0 :   envoy::admin::v3::TapRequest tap_request;
      58           0 :   TRY_NEEDS_AUDIT {
      59           0 :     MessageUtil::loadFromYamlAndValidate(admin_stream.getRequestBody()->toString(), tap_request,
      60           0 :                                          ProtobufMessage::getStrictValidationVisitor());
      61           0 :   }
      62           0 :   END_TRY catch (EnvoyException& e) { return badRequest(response, e.what()); }
      63             : 
      64           0 :   ENVOY_LOG(debug, "tap admin request for config_id={}", tap_request.config_id());
      65           0 :   if (config_id_map_.count(tap_request.config_id()) == 0) {
      66           0 :     return badRequest(
      67           0 :         response, fmt::format("Unknown config id '{}'. No extension has registered with this id.",
      68           0 :                               tap_request.config_id()));
      69           0 :   }
      70           0 :   for (auto config : config_id_map_[tap_request.config_id()]) {
      71           0 :     config->newTapConfig(tap_request.tap_config(), this);
      72           0 :   }
      73             : 
      74           0 :   admin_stream.setEndStreamOnComplete(false);
      75           0 :   admin_stream.addOnDestroyCallback([this] {
      76           0 :     for (auto config : config_id_map_[attached_request_->id()]) {
      77           0 :       ENVOY_LOG(debug, "detach tap admin request for config_id={}", attached_request_->id());
      78           0 :       config->clearTapConfig();
      79           0 :     }
      80           0 :     attached_request_.reset(); // remove ref to attached_request_
      81           0 :   });
      82           0 :   attached_request_ = AttachedRequest::createAttachedRequest(this, tap_request, &admin_stream);
      83           0 :   return Http::Code::OK;
      84           0 : }
      85             : 
      86           0 : Http::Code AdminHandler::badRequest(Buffer::Instance& response, absl::string_view error) {
      87           0 :   ENVOY_LOG(debug, "handler bad request: {}", error);
      88           0 :   response.add(error);
      89           0 :   return Http::Code::BadRequest;
      90           0 : }
      91             : 
      92           2 : void AdminHandler::registerConfig(ExtensionConfig& config, const std::string& config_id) {
      93           2 :   ASSERT(!config_id.empty());
      94           2 :   ASSERT(config_id_map_[config_id].count(&config) == 0);
      95           2 :   config_id_map_[config_id].insert(&config);
      96           2 :   if (attached_request_ != nullptr && attached_request_->id() == config_id) {
      97           0 :     config.newTapConfig(attached_request_->config(), this);
      98           0 :   }
      99           2 : }
     100             : 
     101           2 : void AdminHandler::unregisterConfig(ExtensionConfig& config) {
     102           2 :   ASSERT(!config.adminId().empty());
     103           2 :   std::string admin_id(config.adminId());
     104           2 :   ASSERT(config_id_map_[admin_id].count(&config) == 1);
     105           2 :   config_id_map_[admin_id].erase(&config);
     106           2 :   if (config_id_map_[admin_id].empty()) {
     107           2 :     config_id_map_.erase(admin_id);
     108           2 :   }
     109           2 : }
     110             : 
     111             : PerTapSinkHandlePtr
     112             : AdminHandler::createPerTapSinkHandle(uint64_t trace_id,
     113           0 :                                      envoy::config::tap::v3::OutputSink::OutputSinkTypeCase type) {
     114           0 :   UNREFERENCED_PARAMETER(trace_id);
     115           0 :   using ProtoOutputSinkType = envoy::config::tap::v3::OutputSink::OutputSinkTypeCase;
     116           0 :   ASSERT(type == ProtoOutputSinkType::kStreamingAdmin ||
     117           0 :          type == ProtoOutputSinkType::kBufferedAdmin);
     118             : 
     119             :   /**
     120             :    * Switching on the sink type here again after doing so in TapConfigBaseImpl constructor
     121             :    * seems a bit strange. A possible refactor in the future could involve moving all Sinks
     122             :    * to live where the FilePerTapSink lives, and passing in the sink to use into the AdminHandler.
     123             :    */
     124             : 
     125             :   // Select the sink implementation to use based on type specified in YAML request body
     126           0 :   if (type == ProtoOutputSinkType::kStreamingAdmin) {
     127           0 :     return std::make_unique<AdminPerTapSinkHandle>(*this);
     128           0 :   }
     129           0 :   return std::make_unique<BufferedPerTapSinkHandle>(*this);
     130           0 : }
     131             : 
     132             : void AdminHandler::TraceBuffer::bufferTrace(
     133           0 :     const std::shared_ptr<envoy::data::tap::v3::TraceWrapper>& trace) {
     134             :   // Ignore traces once the buffer is full or flushed
     135           0 :   if (flushed() || full()) {
     136           0 :     return;
     137           0 :   }
     138             : 
     139           0 :   buffer_->emplace_back(std::move(*trace));
     140           0 : }
     141             : 
     142             : void AdminHandler::AdminPerTapSinkHandle::submitTrace(TraceWrapperPtr&& trace,
     143           0 :                                                       envoy::config::tap::v3::OutputSink::Format) {
     144           0 :   ENVOY_LOG(debug, "admin submitting buffered trace to main thread");
     145             :   // Convert to a shared_ptr, so we can send it to the main thread.
     146           0 :   std::shared_ptr<envoy::data::tap::v3::TraceWrapper> shared_trace{std::move(trace)};
     147             :   // Non owning pointer to the attached request, does not preserve lifetime unless in use.
     148           0 :   std::weak_ptr<AttachedRequest> weak_attached_request(parent_.attached_request_);
     149             : 
     150             :   // The handle can be destroyed before the cross thread post is complete. Thus, we capture a
     151             :   // reference to our parent.
     152           0 :   parent_.main_thread_dispatcher_.post([weak_attached_request, shared_trace] {
     153             :     // Take temporary ownership - extend lifetime
     154           0 :     std::shared_ptr<AttachedRequest> attached_request = weak_attached_request.lock();
     155           0 :     if (!attached_request) {
     156             :       // NOTE: Cannot do much here in response to the failed post as an HTTP response code has
     157             :       // already been sent on completion of AdminHandler::handler.
     158           0 :       ENVOY_LOG(debug, "attached request does not exist, not streaming trace");
     159           0 :       return; // No attached request, flushed already
     160           0 :     }
     161             : 
     162           0 :     attached_request->streamMsg(*shared_trace, false);
     163           0 :   });
     164           0 : }
     165             : 
     166             : void AdminHandler::BufferedPerTapSinkHandle::submitTrace(
     167           0 :     TraceWrapperPtr&& trace, envoy::config::tap::v3::OutputSink::Format) {
     168             :   // Convert to a shared_ptr to extend lifetime so we can send it to the main thread.
     169           0 :   std::shared_ptr<envoy::data::tap::v3::TraceWrapper> shared_trace(std::move(trace));
     170             :   // Non owning pointer to the attached request, does not preserve lifetime unless in use
     171           0 :   std::weak_ptr<AttachedRequest> weak_attached_request(parent_.attached_request_);
     172             : 
     173           0 :   parent_.main_thread_dispatcher_.post([shared_trace, weak_attached_request] {
     174             :     // Take temporary ownership - extend lifetime
     175           0 :     std::shared_ptr<AttachedRequest> attached_request = weak_attached_request.lock();
     176           0 :     if (!attached_request) {
     177             :       // NOTE: Cannot do much here in response to the failed post as an HTTP response code has
     178             :       // already been sent on completion of AdminHandler::handler. Additionally we probably don't
     179             :       // want to take any action in this event as this case may be hit in "normal" usage, depending
     180             :       // on when the destruction of attached_request_ occurs.
     181           0 :       ENVOY_LOG(debug, "attached request does not exist, not buffering trace");
     182           0 :       return; // No attached request, flushed already.
     183           0 :     }
     184           0 :     TraceBuffer* trace_buffer = attached_request->traceBuffer();
     185             : 
     186             :     // Check if we already responded to the client
     187             :     // Hit when posts to buffer traces are on the dispatcher queue and the buffer is flushed
     188           0 :     if (trace_buffer->flushed()) {
     189           0 :       return;
     190           0 :     }
     191             :     // Main thread dispatcher serializes access to the trace_buffer
     192           0 :     trace_buffer->bufferTrace(shared_trace);
     193             :     // If the trace buffer is not full yet, wait to buffer more traces
     194           0 :     if (!trace_buffer->full()) {
     195           0 :       return;
     196           0 :     }
     197             : 
     198           0 :     std::vector<envoy::data::tap::v3::TraceWrapper> buffer = trace_buffer->flush();
     199             : 
     200           0 :     ENVOY_LOG(debug, "admin writing buffered trace list to response");
     201             : 
     202             :     // Serialize writes to the stream
     203           0 :     for (size_t i = 0; i < buffer.size(); i++) {
     204             :       // Close stream on final message
     205           0 :       attached_request->streamMsg(buffer[i], (i + 1) == buffer.size());
     206           0 :     }
     207           0 :   });
     208           0 : }
     209             : 
     210             : AdminHandler::AttachedRequest::AttachedRequest(AdminHandler* admin_handler,
     211             :                                                const envoy::admin::v3::TapRequest& tap_request,
     212             :                                                Server::AdminStream* admin_stream)
     213             :     : config_id_(tap_request.config_id()), config_(tap_request.tap_config()),
     214           0 :       admin_stream_(admin_stream), main_thread_dispatcher_(admin_handler->main_thread_dispatcher_) {
     215           0 : }
     216             : 
     217             : AdminHandler::AttachedRequestBuffered::AttachedRequestBuffered(
     218             :     AdminHandler* admin_handler, const envoy::admin::v3::TapRequest& tap_request,
     219             :     Server::AdminStream* admin_stream)
     220           0 :     : AttachedRequest(admin_handler, tap_request, admin_stream) {
     221           0 :   const envoy::config::tap::v3::OutputSink& sink =
     222           0 :       tap_request.tap_config().output_config().sinks(0);
     223             : 
     224           0 :   const uint64_t max_buffered_traces = sink.buffered_admin().max_traces();
     225           0 :   const uint64_t timeout_ms = PROTOBUF_GET_MS_OR_DEFAULT(sink.buffered_admin(), timeout, 0);
     226           0 :   trace_buffer_ = std::make_unique<TraceBuffer>(max_buffered_traces);
     227             :   // Start the countdown if provided an actual timeout
     228           0 :   if (timeout_ms > 0) {
     229           0 :     timer_ = dispatcher().createTimer(
     230           0 :         [this, admin_handler] { this->onTimeout(admin_handler->attached_request_); });
     231           0 :     timer_->enableTimer(std::chrono::milliseconds(timeout_ms));
     232           0 :   }
     233           0 : }
     234             : 
     235             : void AdminHandler::AttachedRequestBuffered::onTimeout(
     236           0 :     const std::weak_ptr<AttachedRequest>& weak_attached_request) {
     237             :   // Flush the buffer regardless of size
     238           0 :   dispatcher().post([weak_attached_request] {
     239             :     // Take temporary ownership - extend lifetime
     240           0 :     std::shared_ptr<AttachedRequest> attached_request = weak_attached_request.lock();
     241           0 :     if (!attached_request) {
     242             :       // NOTE: Cannot do much here in response to the failed post as an HTTP response code has
     243             :       // already been sent on completion of AdminHandler::handler. Additionally we probably don't
     244             :       // want to take any action in this event as this case may be hit in "normal" usage, depending
     245             :       // on when the destruction of attached_request_ occurs.
     246           0 :       ENVOY_LOG(debug, "Timer Expiry after admin tap request completion");
     247           0 :       return; // No attached request, flushed already.
     248           0 :     }
     249           0 :     TraceBuffer* trace_buffer = attached_request->traceBuffer();
     250             : 
     251             :     // if the trace buffer has already been flushed short circuit.
     252             :     // Hit when this timeout callback is on the dispatcher queue and the buffer is flushed
     253           0 :     if (trace_buffer->flushed()) {
     254           0 :       return;
     255           0 :     }
     256             : 
     257           0 :     std::vector<envoy::data::tap::v3::TraceWrapper> buffer = trace_buffer->flush();
     258             : 
     259           0 :     ENVOY_LOG(debug, "Timer Expiry, admin flushing buffered traces to response");
     260             : 
     261             :     // Serialize writes to the stream
     262           0 :     for (const auto& trace : buffer) {
     263           0 :       attached_request->streamMsg(trace);
     264           0 :     }
     265           0 :     attached_request->endStream();
     266           0 :   });
     267           0 : }
     268             : 
     269           0 : void AdminHandler::AttachedRequest::endStream() {
     270           0 :   Buffer::OwnedImpl output_buffer;
     271           0 :   stream()->getDecoderFilterCallbacks().encodeData(output_buffer, true);
     272           0 : }
     273             : 
     274           0 : void AdminHandler::AttachedRequest::streamMsg(const Protobuf::Message& message, bool end_stream) {
     275           0 :   std::string output_string;
     276             : 
     277           0 :   switch (format()) {
     278           0 :     PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
     279           0 :   case envoy::config::tap::v3::OutputSink::JSON_BODY_AS_STRING:
     280           0 :   case envoy::config::tap::v3::OutputSink::JSON_BODY_AS_BYTES:
     281           0 :     output_string = MessageUtil::getJsonStringFromMessageOrError(message, true, true);
     282           0 :     break;
     283           0 :   case envoy::config::tap::v3::OutputSink::PROTO_BINARY_LENGTH_DELIMITED: {
     284           0 :     Protobuf::io::StringOutputStream stream(&output_string);
     285           0 :     Protobuf::io::CodedOutputStream coded_stream(&stream);
     286           0 :     coded_stream.WriteVarint64(message.ByteSizeLong());
     287           0 :     message.SerializeWithCachedSizes(&coded_stream);
     288           0 :     break;
     289           0 :   }
     290           0 :   case envoy::config::tap::v3::OutputSink::PROTO_BINARY:
     291           0 :   case envoy::config::tap::v3::OutputSink::PROTO_TEXT:
     292           0 :     PANIC("not implemented");
     293           0 :   }
     294             : 
     295           0 :   Buffer::OwnedImpl output_buffer{output_string};
     296           0 :   stream()->getDecoderFilterCallbacks().encodeData(output_buffer, end_stream);
     297           0 : }
     298             : 
     299             : std::shared_ptr<AdminHandler::AttachedRequest> AdminHandler::AttachedRequest::createAttachedRequest(
     300             :     AdminHandler* admin_handler, const envoy::admin::v3::TapRequest& tap_request,
     301           0 :     Server::AdminStream* admin_stream) {
     302           0 :   using ProtoOutputSink = envoy::config::tap::v3::OutputSink;
     303             : 
     304           0 :   const ProtoOutputSink& sink = tap_request.tap_config().output_config().sinks(0);
     305             : 
     306           0 :   switch (sink.output_sink_type_case()) {
     307           0 :   case ProtoOutputSink::kBufferedAdmin:
     308           0 :     return std::make_shared<AttachedRequestBuffered>(admin_handler, tap_request, admin_stream);
     309           0 :   default:
     310           0 :     return std::make_shared<AttachedRequest>(admin_handler, tap_request, admin_stream);
     311           0 :   }
     312           0 : }
     313             : 
     314             : } // namespace Tap
     315             : } // namespace Common
     316             : } // namespace Extensions
     317             : } // namespace Envoy

Generated by: LCOV version 1.15