Line data Source code
1 : #include "source/extensions/common/tap/admin.h"
2 :
3 : #include "envoy/admin/v3/tap.pb.h"
4 : #include "envoy/admin/v3/tap.pb.validate.h"
5 : #include "envoy/config/tap/v3/common.pb.h"
6 : #include "envoy/data/tap/v3/wrapper.pb.h"
7 :
8 : #include "source/common/buffer/buffer_impl.h"
9 : #include "source/common/protobuf/message_validator_impl.h"
10 : #include "source/common/protobuf/utility.h"
11 :
12 : namespace Envoy {
13 : namespace Extensions {
14 : namespace Common {
15 : namespace Tap {
16 :
17 : // Singleton registration via macro defined in envoy/singleton/manager.h
18 : SINGLETON_MANAGER_REGISTRATION(tap_admin_handler);
19 :
20 : AdminHandlerSharedPtr AdminHandler::getSingleton(OptRef<Server::Admin> admin,
21 : Singleton::Manager& singleton_manager,
22 2 : Event::Dispatcher& main_thread_dispatcher) {
23 2 : return singleton_manager.getTyped<AdminHandler>(
24 2 : SINGLETON_MANAGER_REGISTERED_NAME(tap_admin_handler), [&admin, &main_thread_dispatcher] {
25 2 : return std::make_shared<AdminHandler>(admin, main_thread_dispatcher);
26 2 : });
27 2 : }
28 :
29 : AdminHandler::AdminHandler(OptRef<Server::Admin> admin, Event::Dispatcher& main_thread_dispatcher)
30 2 : : admin_(admin.value()), main_thread_dispatcher_(main_thread_dispatcher) {
31 2 : const bool rc =
32 2 : admin_.addHandler("/tap", "tap filter control", MAKE_ADMIN_HANDLER(handler), true, true);
33 2 : RELEASE_ASSERT(rc, "/tap admin endpoint is taken");
34 2 : if (admin_.socket().addressType() == Network::Address::Type::Pipe) {
35 0 : ENVOY_LOG(warn, "Admin tapping (via /tap) is unreliable when the admin endpoint is a pipe and "
36 0 : "the connection is HTTP/1. Either use an IP address or connect using HTTP/2.");
37 0 : }
38 2 : }
39 :
40 2 : AdminHandler::~AdminHandler() {
41 2 : const bool rc = admin_.removeHandler("/tap");
42 2 : ASSERT(rc);
43 2 : }
44 :
45 : Http::Code AdminHandler::handler(Http::HeaderMap&, Buffer::Instance& response,
46 0 : Server::AdminStream& admin_stream) {
47 0 : if (attached_request_ != nullptr) {
48 : // TODO(mattlklein123): Consider supporting concurrent admin /tap streams. Right now we support
49 : // a single stream as a simplification.
50 0 : return badRequest(response, "An attached /tap admin stream already exists. Detach it.");
51 0 : }
52 :
53 0 : if (admin_stream.getRequestBody() == nullptr) {
54 0 : return badRequest(response, "/tap requires a JSON/YAML body");
55 0 : }
56 :
57 0 : envoy::admin::v3::TapRequest tap_request;
58 0 : TRY_NEEDS_AUDIT {
59 0 : MessageUtil::loadFromYamlAndValidate(admin_stream.getRequestBody()->toString(), tap_request,
60 0 : ProtobufMessage::getStrictValidationVisitor());
61 0 : }
62 0 : END_TRY catch (EnvoyException& e) { return badRequest(response, e.what()); }
63 :
64 0 : ENVOY_LOG(debug, "tap admin request for config_id={}", tap_request.config_id());
65 0 : if (config_id_map_.count(tap_request.config_id()) == 0) {
66 0 : return badRequest(
67 0 : response, fmt::format("Unknown config id '{}'. No extension has registered with this id.",
68 0 : tap_request.config_id()));
69 0 : }
70 0 : for (auto config : config_id_map_[tap_request.config_id()]) {
71 0 : config->newTapConfig(tap_request.tap_config(), this);
72 0 : }
73 :
74 0 : admin_stream.setEndStreamOnComplete(false);
75 0 : admin_stream.addOnDestroyCallback([this] {
76 0 : for (auto config : config_id_map_[attached_request_->id()]) {
77 0 : ENVOY_LOG(debug, "detach tap admin request for config_id={}", attached_request_->id());
78 0 : config->clearTapConfig();
79 0 : }
80 0 : attached_request_.reset(); // remove ref to attached_request_
81 0 : });
82 0 : attached_request_ = AttachedRequest::createAttachedRequest(this, tap_request, &admin_stream);
83 0 : return Http::Code::OK;
84 0 : }
85 :
86 0 : Http::Code AdminHandler::badRequest(Buffer::Instance& response, absl::string_view error) {
87 0 : ENVOY_LOG(debug, "handler bad request: {}", error);
88 0 : response.add(error);
89 0 : return Http::Code::BadRequest;
90 0 : }
91 :
92 2 : void AdminHandler::registerConfig(ExtensionConfig& config, const std::string& config_id) {
93 2 : ASSERT(!config_id.empty());
94 2 : ASSERT(config_id_map_[config_id].count(&config) == 0);
95 2 : config_id_map_[config_id].insert(&config);
96 2 : if (attached_request_ != nullptr && attached_request_->id() == config_id) {
97 0 : config.newTapConfig(attached_request_->config(), this);
98 0 : }
99 2 : }
100 :
101 2 : void AdminHandler::unregisterConfig(ExtensionConfig& config) {
102 2 : ASSERT(!config.adminId().empty());
103 2 : std::string admin_id(config.adminId());
104 2 : ASSERT(config_id_map_[admin_id].count(&config) == 1);
105 2 : config_id_map_[admin_id].erase(&config);
106 2 : if (config_id_map_[admin_id].empty()) {
107 2 : config_id_map_.erase(admin_id);
108 2 : }
109 2 : }
110 :
111 : PerTapSinkHandlePtr
112 : AdminHandler::createPerTapSinkHandle(uint64_t trace_id,
113 0 : envoy::config::tap::v3::OutputSink::OutputSinkTypeCase type) {
114 0 : UNREFERENCED_PARAMETER(trace_id);
115 0 : using ProtoOutputSinkType = envoy::config::tap::v3::OutputSink::OutputSinkTypeCase;
116 0 : ASSERT(type == ProtoOutputSinkType::kStreamingAdmin ||
117 0 : type == ProtoOutputSinkType::kBufferedAdmin);
118 :
119 : /**
120 : * Switching on the sink type here again after doing so in TapConfigBaseImpl constructor
121 : * seems a bit strange. A possible refactor in the future could involve moving all Sinks
122 : * to live where the FilePerTapSink lives, and passing in the sink to use into the AdminHandler.
123 : */
124 :
125 : // Select the sink implementation to use based on type specified in YAML request body
126 0 : if (type == ProtoOutputSinkType::kStreamingAdmin) {
127 0 : return std::make_unique<AdminPerTapSinkHandle>(*this);
128 0 : }
129 0 : return std::make_unique<BufferedPerTapSinkHandle>(*this);
130 0 : }
131 :
132 : void AdminHandler::TraceBuffer::bufferTrace(
133 0 : const std::shared_ptr<envoy::data::tap::v3::TraceWrapper>& trace) {
134 : // Ignore traces once the buffer is full or flushed
135 0 : if (flushed() || full()) {
136 0 : return;
137 0 : }
138 :
139 0 : buffer_->emplace_back(std::move(*trace));
140 0 : }
141 :
142 : void AdminHandler::AdminPerTapSinkHandle::submitTrace(TraceWrapperPtr&& trace,
143 0 : envoy::config::tap::v3::OutputSink::Format) {
144 0 : ENVOY_LOG(debug, "admin submitting buffered trace to main thread");
145 : // Convert to a shared_ptr, so we can send it to the main thread.
146 0 : std::shared_ptr<envoy::data::tap::v3::TraceWrapper> shared_trace{std::move(trace)};
147 : // Non owning pointer to the attached request, does not preserve lifetime unless in use.
148 0 : std::weak_ptr<AttachedRequest> weak_attached_request(parent_.attached_request_);
149 :
150 : // The handle can be destroyed before the cross thread post is complete. Thus, we capture a
151 : // reference to our parent.
152 0 : parent_.main_thread_dispatcher_.post([weak_attached_request, shared_trace] {
153 : // Take temporary ownership - extend lifetime
154 0 : std::shared_ptr<AttachedRequest> attached_request = weak_attached_request.lock();
155 0 : if (!attached_request) {
156 : // NOTE: Cannot do much here in response to the failed post as an HTTP response code has
157 : // already been sent on completion of AdminHandler::handler.
158 0 : ENVOY_LOG(debug, "attached request does not exist, not streaming trace");
159 0 : return; // No attached request, flushed already
160 0 : }
161 :
162 0 : attached_request->streamMsg(*shared_trace, false);
163 0 : });
164 0 : }
165 :
166 : void AdminHandler::BufferedPerTapSinkHandle::submitTrace(
167 0 : TraceWrapperPtr&& trace, envoy::config::tap::v3::OutputSink::Format) {
168 : // Convert to a shared_ptr to extend lifetime so we can send it to the main thread.
169 0 : std::shared_ptr<envoy::data::tap::v3::TraceWrapper> shared_trace(std::move(trace));
170 : // Non owning pointer to the attached request, does not preserve lifetime unless in use
171 0 : std::weak_ptr<AttachedRequest> weak_attached_request(parent_.attached_request_);
172 :
173 0 : parent_.main_thread_dispatcher_.post([shared_trace, weak_attached_request] {
174 : // Take temporary ownership - extend lifetime
175 0 : std::shared_ptr<AttachedRequest> attached_request = weak_attached_request.lock();
176 0 : if (!attached_request) {
177 : // NOTE: Cannot do much here in response to the failed post as an HTTP response code has
178 : // already been sent on completion of AdminHandler::handler. Additionally we probably don't
179 : // want to take any action in this event as this case may be hit in "normal" usage, depending
180 : // on when the destruction of attached_request_ occurs.
181 0 : ENVOY_LOG(debug, "attached request does not exist, not buffering trace");
182 0 : return; // No attached request, flushed already.
183 0 : }
184 0 : TraceBuffer* trace_buffer = attached_request->traceBuffer();
185 :
186 : // Check if we already responded to the client
187 : // Hit when posts to buffer traces are on the dispatcher queue and the buffer is flushed
188 0 : if (trace_buffer->flushed()) {
189 0 : return;
190 0 : }
191 : // Main thread dispatcher serializes access to the trace_buffer
192 0 : trace_buffer->bufferTrace(shared_trace);
193 : // If the trace buffer is not full yet, wait to buffer more traces
194 0 : if (!trace_buffer->full()) {
195 0 : return;
196 0 : }
197 :
198 0 : std::vector<envoy::data::tap::v3::TraceWrapper> buffer = trace_buffer->flush();
199 :
200 0 : ENVOY_LOG(debug, "admin writing buffered trace list to response");
201 :
202 : // Serialize writes to the stream
203 0 : for (size_t i = 0; i < buffer.size(); i++) {
204 : // Close stream on final message
205 0 : attached_request->streamMsg(buffer[i], (i + 1) == buffer.size());
206 0 : }
207 0 : });
208 0 : }
209 :
210 : AdminHandler::AttachedRequest::AttachedRequest(AdminHandler* admin_handler,
211 : const envoy::admin::v3::TapRequest& tap_request,
212 : Server::AdminStream* admin_stream)
213 : : config_id_(tap_request.config_id()), config_(tap_request.tap_config()),
214 0 : admin_stream_(admin_stream), main_thread_dispatcher_(admin_handler->main_thread_dispatcher_) {
215 0 : }
216 :
217 : AdminHandler::AttachedRequestBuffered::AttachedRequestBuffered(
218 : AdminHandler* admin_handler, const envoy::admin::v3::TapRequest& tap_request,
219 : Server::AdminStream* admin_stream)
220 0 : : AttachedRequest(admin_handler, tap_request, admin_stream) {
221 0 : const envoy::config::tap::v3::OutputSink& sink =
222 0 : tap_request.tap_config().output_config().sinks(0);
223 :
224 0 : const uint64_t max_buffered_traces = sink.buffered_admin().max_traces();
225 0 : const uint64_t timeout_ms = PROTOBUF_GET_MS_OR_DEFAULT(sink.buffered_admin(), timeout, 0);
226 0 : trace_buffer_ = std::make_unique<TraceBuffer>(max_buffered_traces);
227 : // Start the countdown if provided an actual timeout
228 0 : if (timeout_ms > 0) {
229 0 : timer_ = dispatcher().createTimer(
230 0 : [this, admin_handler] { this->onTimeout(admin_handler->attached_request_); });
231 0 : timer_->enableTimer(std::chrono::milliseconds(timeout_ms));
232 0 : }
233 0 : }
234 :
235 : void AdminHandler::AttachedRequestBuffered::onTimeout(
236 0 : const std::weak_ptr<AttachedRequest>& weak_attached_request) {
237 : // Flush the buffer regardless of size
238 0 : dispatcher().post([weak_attached_request] {
239 : // Take temporary ownership - extend lifetime
240 0 : std::shared_ptr<AttachedRequest> attached_request = weak_attached_request.lock();
241 0 : if (!attached_request) {
242 : // NOTE: Cannot do much here in response to the failed post as an HTTP response code has
243 : // already been sent on completion of AdminHandler::handler. Additionally we probably don't
244 : // want to take any action in this event as this case may be hit in "normal" usage, depending
245 : // on when the destruction of attached_request_ occurs.
246 0 : ENVOY_LOG(debug, "Timer Expiry after admin tap request completion");
247 0 : return; // No attached request, flushed already.
248 0 : }
249 0 : TraceBuffer* trace_buffer = attached_request->traceBuffer();
250 :
251 : // if the trace buffer has already been flushed short circuit.
252 : // Hit when this timeout callback is on the dispatcher queue and the buffer is flushed
253 0 : if (trace_buffer->flushed()) {
254 0 : return;
255 0 : }
256 :
257 0 : std::vector<envoy::data::tap::v3::TraceWrapper> buffer = trace_buffer->flush();
258 :
259 0 : ENVOY_LOG(debug, "Timer Expiry, admin flushing buffered traces to response");
260 :
261 : // Serialize writes to the stream
262 0 : for (const auto& trace : buffer) {
263 0 : attached_request->streamMsg(trace);
264 0 : }
265 0 : attached_request->endStream();
266 0 : });
267 0 : }
268 :
269 0 : void AdminHandler::AttachedRequest::endStream() {
270 0 : Buffer::OwnedImpl output_buffer;
271 0 : stream()->getDecoderFilterCallbacks().encodeData(output_buffer, true);
272 0 : }
273 :
274 0 : void AdminHandler::AttachedRequest::streamMsg(const Protobuf::Message& message, bool end_stream) {
275 0 : std::string output_string;
276 :
277 0 : switch (format()) {
278 0 : PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
279 0 : case envoy::config::tap::v3::OutputSink::JSON_BODY_AS_STRING:
280 0 : case envoy::config::tap::v3::OutputSink::JSON_BODY_AS_BYTES:
281 0 : output_string = MessageUtil::getJsonStringFromMessageOrError(message, true, true);
282 0 : break;
283 0 : case envoy::config::tap::v3::OutputSink::PROTO_BINARY_LENGTH_DELIMITED: {
284 0 : Protobuf::io::StringOutputStream stream(&output_string);
285 0 : Protobuf::io::CodedOutputStream coded_stream(&stream);
286 0 : coded_stream.WriteVarint64(message.ByteSizeLong());
287 0 : message.SerializeWithCachedSizes(&coded_stream);
288 0 : break;
289 0 : }
290 0 : case envoy::config::tap::v3::OutputSink::PROTO_BINARY:
291 0 : case envoy::config::tap::v3::OutputSink::PROTO_TEXT:
292 0 : PANIC("not implemented");
293 0 : }
294 :
295 0 : Buffer::OwnedImpl output_buffer{output_string};
296 0 : stream()->getDecoderFilterCallbacks().encodeData(output_buffer, end_stream);
297 0 : }
298 :
299 : std::shared_ptr<AdminHandler::AttachedRequest> AdminHandler::AttachedRequest::createAttachedRequest(
300 : AdminHandler* admin_handler, const envoy::admin::v3::TapRequest& tap_request,
301 0 : Server::AdminStream* admin_stream) {
302 0 : using ProtoOutputSink = envoy::config::tap::v3::OutputSink;
303 :
304 0 : const ProtoOutputSink& sink = tap_request.tap_config().output_config().sinks(0);
305 :
306 0 : switch (sink.output_sink_type_case()) {
307 0 : case ProtoOutputSink::kBufferedAdmin:
308 0 : return std::make_shared<AttachedRequestBuffered>(admin_handler, tap_request, admin_stream);
309 0 : default:
310 0 : return std::make_shared<AttachedRequest>(admin_handler, tap_request, admin_stream);
311 0 : }
312 0 : }
313 :
314 : } // namespace Tap
315 : } // namespace Common
316 : } // namespace Extensions
317 : } // namespace Envoy
|