/proc/self/cwd/source/extensions/common/tap/admin.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/extensions/common/tap/admin.h" |
2 | | |
3 | | #include "envoy/admin/v3/tap.pb.h" |
4 | | #include "envoy/admin/v3/tap.pb.validate.h" |
5 | | #include "envoy/config/tap/v3/common.pb.h" |
6 | | #include "envoy/data/tap/v3/wrapper.pb.h" |
7 | | |
8 | | #include "source/common/buffer/buffer_impl.h" |
9 | | #include "source/common/protobuf/message_validator_impl.h" |
10 | | #include "source/common/protobuf/utility.h" |
11 | | |
12 | | namespace Envoy { |
13 | | namespace Extensions { |
14 | | namespace Common { |
15 | | namespace Tap { |
16 | | |
17 | | // Singleton registration via macro defined in envoy/singleton/manager.h |
18 | | SINGLETON_MANAGER_REGISTRATION(tap_admin_handler); |
19 | | |
20 | | AdminHandlerSharedPtr AdminHandler::getSingleton(OptRef<Server::Admin> admin, |
21 | | Singleton::Manager& singleton_manager, |
22 | 1 | Event::Dispatcher& main_thread_dispatcher) { |
23 | 1 | return singleton_manager.getTyped<AdminHandler>( |
24 | 1 | SINGLETON_MANAGER_REGISTERED_NAME(tap_admin_handler), [&admin, &main_thread_dispatcher] { |
25 | 1 | return std::make_shared<AdminHandler>(admin, main_thread_dispatcher); |
26 | 1 | }); |
27 | 1 | } |
28 | | |
29 | | AdminHandler::AdminHandler(OptRef<Server::Admin> admin, Event::Dispatcher& main_thread_dispatcher) |
30 | 1 | : admin_(admin.value()), main_thread_dispatcher_(main_thread_dispatcher) { |
31 | 1 | const bool rc = |
32 | 1 | admin_.addHandler("/tap", "tap filter control", MAKE_ADMIN_HANDLER(handler), true, true); |
33 | 1 | RELEASE_ASSERT(rc, "/tap admin endpoint is taken"); |
34 | 1 | if (admin_.socket().addressType() == Network::Address::Type::Pipe) { |
35 | 0 | ENVOY_LOG(warn, "Admin tapping (via /tap) is unreliable when the admin endpoint is a pipe and " |
36 | 0 | "the connection is HTTP/1. Either use an IP address or connect using HTTP/2."); |
37 | 0 | } |
38 | 1 | } |
39 | | |
40 | 1 | AdminHandler::~AdminHandler() { |
41 | 1 | const bool rc = admin_.removeHandler("/tap"); |
42 | 1 | ASSERT(rc); |
43 | 1 | } |
44 | | |
45 | | Http::Code AdminHandler::handler(Http::HeaderMap&, Buffer::Instance& response, |
46 | 0 | Server::AdminStream& admin_stream) { |
47 | 0 | if (attached_request_ != nullptr) { |
48 | | // TODO(mattlklein123): Consider supporting concurrent admin /tap streams. Right now we support |
49 | | // a single stream as a simplification. |
50 | 0 | return badRequest(response, "An attached /tap admin stream already exists. Detach it."); |
51 | 0 | } |
52 | | |
53 | 0 | if (admin_stream.getRequestBody() == nullptr) { |
54 | 0 | return badRequest(response, "/tap requires a JSON/YAML body"); |
55 | 0 | } |
56 | | |
57 | 0 | envoy::admin::v3::TapRequest tap_request; |
58 | 0 | TRY_NEEDS_AUDIT { |
59 | 0 | MessageUtil::loadFromYamlAndValidate(admin_stream.getRequestBody()->toString(), tap_request, |
60 | 0 | ProtobufMessage::getStrictValidationVisitor()); |
61 | 0 | } |
62 | 0 | END_TRY catch (EnvoyException& e) { return badRequest(response, e.what()); } |
63 | | |
64 | 0 | ENVOY_LOG(debug, "tap admin request for config_id={}", tap_request.config_id()); |
65 | 0 | if (config_id_map_.count(tap_request.config_id()) == 0) { |
66 | 0 | return badRequest( |
67 | 0 | response, fmt::format("Unknown config id '{}'. No extension has registered with this id.", |
68 | 0 | tap_request.config_id())); |
69 | 0 | } |
70 | 0 | for (auto config : config_id_map_[tap_request.config_id()]) { |
71 | 0 | config->newTapConfig(tap_request.tap_config(), this); |
72 | 0 | } |
73 | |
|
74 | 0 | admin_stream.setEndStreamOnComplete(false); |
75 | 0 | admin_stream.addOnDestroyCallback([this] { |
76 | 0 | for (auto config : config_id_map_[attached_request_->id()]) { |
77 | 0 | ENVOY_LOG(debug, "detach tap admin request for config_id={}", attached_request_->id()); |
78 | 0 | config->clearTapConfig(); |
79 | 0 | } |
80 | 0 | attached_request_.reset(); // remove ref to attached_request_ |
81 | 0 | }); |
82 | 0 | attached_request_ = AttachedRequest::createAttachedRequest(this, tap_request, &admin_stream); |
83 | 0 | return Http::Code::OK; |
84 | 0 | } |
85 | | |
86 | 0 | Http::Code AdminHandler::badRequest(Buffer::Instance& response, absl::string_view error) { |
87 | 0 | ENVOY_LOG(debug, "handler bad request: {}", error); |
88 | 0 | response.add(error); |
89 | 0 | return Http::Code::BadRequest; |
90 | 0 | } |
91 | | |
92 | 1 | void AdminHandler::registerConfig(ExtensionConfig& config, const std::string& config_id) { |
93 | 1 | ASSERT(!config_id.empty()); |
94 | 1 | ASSERT(config_id_map_[config_id].count(&config) == 0); |
95 | 1 | config_id_map_[config_id].insert(&config); |
96 | 1 | if (attached_request_ != nullptr && attached_request_->id() == config_id) { |
97 | 0 | config.newTapConfig(attached_request_->config(), this); |
98 | 0 | } |
99 | 1 | } |
100 | | |
101 | 1 | void AdminHandler::unregisterConfig(ExtensionConfig& config) { |
102 | 1 | ASSERT(!config.adminId().empty()); |
103 | 1 | std::string admin_id(config.adminId()); |
104 | 1 | ASSERT(config_id_map_[admin_id].count(&config) == 1); |
105 | 1 | config_id_map_[admin_id].erase(&config); |
106 | 1 | if (config_id_map_[admin_id].empty()) { |
107 | 1 | config_id_map_.erase(admin_id); |
108 | 1 | } |
109 | 1 | } |
110 | | |
111 | | PerTapSinkHandlePtr |
112 | | AdminHandler::createPerTapSinkHandle(uint64_t trace_id, |
113 | 0 | envoy::config::tap::v3::OutputSink::OutputSinkTypeCase type) { |
114 | 0 | UNREFERENCED_PARAMETER(trace_id); |
115 | 0 | using ProtoOutputSinkType = envoy::config::tap::v3::OutputSink::OutputSinkTypeCase; |
116 | 0 | ASSERT(type == ProtoOutputSinkType::kStreamingAdmin || |
117 | 0 | type == ProtoOutputSinkType::kBufferedAdmin); |
118 | | |
119 | | /** |
120 | | * Switching on the sink type here again after doing so in TapConfigBaseImpl constructor |
121 | | * seems a bit strange. A possible refactor in the future could involve moving all Sinks |
122 | | * to live where the FilePerTapSink lives, and passing in the sink to use into the AdminHandler. |
123 | | */ |
124 | | |
125 | | // Select the sink implementation to use based on type specified in YAML request body |
126 | 0 | if (type == ProtoOutputSinkType::kStreamingAdmin) { |
127 | 0 | return std::make_unique<AdminPerTapSinkHandle>(*this); |
128 | 0 | } |
129 | 0 | return std::make_unique<BufferedPerTapSinkHandle>(*this); |
130 | 0 | } |
131 | | |
132 | | void AdminHandler::TraceBuffer::bufferTrace( |
133 | 0 | const std::shared_ptr<envoy::data::tap::v3::TraceWrapper>& trace) { |
134 | | // Ignore traces once the buffer is full or flushed |
135 | 0 | if (flushed() || full()) { |
136 | 0 | return; |
137 | 0 | } |
138 | | |
139 | 0 | buffer_->emplace_back(std::move(*trace)); |
140 | 0 | } |
141 | | |
142 | | void AdminHandler::AdminPerTapSinkHandle::submitTrace(TraceWrapperPtr&& trace, |
143 | 0 | envoy::config::tap::v3::OutputSink::Format) { |
144 | 0 | ENVOY_LOG(debug, "admin submitting buffered trace to main thread"); |
145 | | // Convert to a shared_ptr, so we can send it to the main thread. |
146 | 0 | std::shared_ptr<envoy::data::tap::v3::TraceWrapper> shared_trace{std::move(trace)}; |
147 | | // Non owning pointer to the attached request, does not preserve lifetime unless in use. |
148 | 0 | std::weak_ptr<AttachedRequest> weak_attached_request(parent_.attached_request_); |
149 | | |
150 | | // The handle can be destroyed before the cross thread post is complete. Thus, we capture a |
151 | | // reference to our parent. |
152 | 0 | parent_.main_thread_dispatcher_.post([weak_attached_request, shared_trace] { |
153 | | // Take temporary ownership - extend lifetime |
154 | 0 | std::shared_ptr<AttachedRequest> attached_request = weak_attached_request.lock(); |
155 | 0 | if (!attached_request) { |
156 | | // NOTE: Cannot do much here in response to the failed post as an HTTP response code has |
157 | | // already been sent on completion of AdminHandler::handler. |
158 | 0 | ENVOY_LOG(debug, "attached request does not exist, not streaming trace"); |
159 | 0 | return; // No attached request, flushed already |
160 | 0 | } |
161 | | |
162 | 0 | attached_request->streamMsg(*shared_trace, false); |
163 | 0 | }); |
164 | 0 | } |
165 | | |
166 | | void AdminHandler::BufferedPerTapSinkHandle::submitTrace( |
167 | 0 | TraceWrapperPtr&& trace, envoy::config::tap::v3::OutputSink::Format) { |
168 | | // Convert to a shared_ptr to extend lifetime so we can send it to the main thread. |
169 | 0 | std::shared_ptr<envoy::data::tap::v3::TraceWrapper> shared_trace(std::move(trace)); |
170 | | // Non owning pointer to the attached request, does not preserve lifetime unless in use |
171 | 0 | std::weak_ptr<AttachedRequest> weak_attached_request(parent_.attached_request_); |
172 | |
|
173 | 0 | parent_.main_thread_dispatcher_.post([shared_trace, weak_attached_request] { |
174 | | // Take temporary ownership - extend lifetime |
175 | 0 | std::shared_ptr<AttachedRequest> attached_request = weak_attached_request.lock(); |
176 | 0 | if (!attached_request) { |
177 | | // NOTE: Cannot do much here in response to the failed post as an HTTP response code has |
178 | | // already been sent on completion of AdminHandler::handler. Additionally we probably don't |
179 | | // want to take any action in this event as this case may be hit in "normal" usage, depending |
180 | | // on when the destruction of attached_request_ occurs. |
181 | 0 | ENVOY_LOG(debug, "attached request does not exist, not buffering trace"); |
182 | 0 | return; // No attached request, flushed already. |
183 | 0 | } |
184 | 0 | TraceBuffer* trace_buffer = attached_request->traceBuffer(); |
185 | | |
186 | | // Check if we already responded to the client |
187 | | // Hit when posts to buffer traces are on the dispatcher queue and the buffer is flushed |
188 | 0 | if (trace_buffer->flushed()) { |
189 | 0 | return; |
190 | 0 | } |
191 | | // Main thread dispatcher serializes access to the trace_buffer |
192 | 0 | trace_buffer->bufferTrace(shared_trace); |
193 | | // If the trace buffer is not full yet, wait to buffer more traces |
194 | 0 | if (!trace_buffer->full()) { |
195 | 0 | return; |
196 | 0 | } |
197 | | |
198 | 0 | std::vector<envoy::data::tap::v3::TraceWrapper> buffer = trace_buffer->flush(); |
199 | |
|
200 | 0 | ENVOY_LOG(debug, "admin writing buffered trace list to response"); |
201 | | |
202 | | // Serialize writes to the stream |
203 | 0 | for (size_t i = 0; i < buffer.size(); i++) { |
204 | | // Close stream on final message |
205 | 0 | attached_request->streamMsg(buffer[i], (i + 1) == buffer.size()); |
206 | 0 | } |
207 | 0 | }); |
208 | 0 | } |
209 | | |
210 | | AdminHandler::AttachedRequest::AttachedRequest(AdminHandler* admin_handler, |
211 | | const envoy::admin::v3::TapRequest& tap_request, |
212 | | Server::AdminStream* admin_stream) |
213 | | : config_id_(tap_request.config_id()), config_(tap_request.tap_config()), |
214 | 0 | admin_stream_(admin_stream), main_thread_dispatcher_(admin_handler->main_thread_dispatcher_) { |
215 | 0 | } |
216 | | |
217 | | AdminHandler::AttachedRequestBuffered::AttachedRequestBuffered( |
218 | | AdminHandler* admin_handler, const envoy::admin::v3::TapRequest& tap_request, |
219 | | Server::AdminStream* admin_stream) |
220 | 0 | : AttachedRequest(admin_handler, tap_request, admin_stream) { |
221 | 0 | const envoy::config::tap::v3::OutputSink& sink = |
222 | 0 | tap_request.tap_config().output_config().sinks(0); |
223 | |
|
224 | 0 | const uint64_t max_buffered_traces = sink.buffered_admin().max_traces(); |
225 | 0 | const uint64_t timeout_ms = PROTOBUF_GET_MS_OR_DEFAULT(sink.buffered_admin(), timeout, 0); |
226 | 0 | trace_buffer_ = std::make_unique<TraceBuffer>(max_buffered_traces); |
227 | | // Start the countdown if provided an actual timeout |
228 | 0 | if (timeout_ms > 0) { |
229 | 0 | timer_ = dispatcher().createTimer( |
230 | 0 | [this, admin_handler] { this->onTimeout(admin_handler->attached_request_); }); |
231 | 0 | timer_->enableTimer(std::chrono::milliseconds(timeout_ms)); |
232 | 0 | } |
233 | 0 | } |
234 | | |
235 | | void AdminHandler::AttachedRequestBuffered::onTimeout( |
236 | 0 | const std::weak_ptr<AttachedRequest>& weak_attached_request) { |
237 | | // Flush the buffer regardless of size |
238 | 0 | dispatcher().post([weak_attached_request] { |
239 | | // Take temporary ownership - extend lifetime |
240 | 0 | std::shared_ptr<AttachedRequest> attached_request = weak_attached_request.lock(); |
241 | 0 | if (!attached_request) { |
242 | | // NOTE: Cannot do much here in response to the failed post as an HTTP response code has |
243 | | // already been sent on completion of AdminHandler::handler. Additionally we probably don't |
244 | | // want to take any action in this event as this case may be hit in "normal" usage, depending |
245 | | // on when the destruction of attached_request_ occurs. |
246 | 0 | ENVOY_LOG(debug, "Timer Expiry after admin tap request completion"); |
247 | 0 | return; // No attached request, flushed already. |
248 | 0 | } |
249 | 0 | TraceBuffer* trace_buffer = attached_request->traceBuffer(); |
250 | | |
251 | | // if the trace buffer has already been flushed short circuit. |
252 | | // Hit when this timeout callback is on the dispatcher queue and the buffer is flushed |
253 | 0 | if (trace_buffer->flushed()) { |
254 | 0 | return; |
255 | 0 | } |
256 | | |
257 | 0 | std::vector<envoy::data::tap::v3::TraceWrapper> buffer = trace_buffer->flush(); |
258 | |
|
259 | 0 | ENVOY_LOG(debug, "Timer Expiry, admin flushing buffered traces to response"); |
260 | | |
261 | | // Serialize writes to the stream |
262 | 0 | for (const auto& trace : buffer) { |
263 | 0 | attached_request->streamMsg(trace); |
264 | 0 | } |
265 | 0 | attached_request->endStream(); |
266 | 0 | }); |
267 | 0 | } |
268 | | |
269 | 0 | void AdminHandler::AttachedRequest::endStream() { |
270 | 0 | Buffer::OwnedImpl output_buffer; |
271 | 0 | stream()->getDecoderFilterCallbacks().encodeData(output_buffer, true); |
272 | 0 | } |
273 | | |
274 | 0 | void AdminHandler::AttachedRequest::streamMsg(const Protobuf::Message& message, bool end_stream) { |
275 | 0 | std::string output_string; |
276 | |
|
277 | 0 | switch (format()) { |
278 | 0 | PANIC_ON_PROTO_ENUM_SENTINEL_VALUES; |
279 | 0 | case envoy::config::tap::v3::OutputSink::JSON_BODY_AS_STRING: |
280 | 0 | case envoy::config::tap::v3::OutputSink::JSON_BODY_AS_BYTES: |
281 | 0 | output_string = MessageUtil::getJsonStringFromMessageOrError(message, true, true); |
282 | 0 | break; |
283 | 0 | case envoy::config::tap::v3::OutputSink::PROTO_BINARY_LENGTH_DELIMITED: { |
284 | 0 | Protobuf::io::StringOutputStream stream(&output_string); |
285 | 0 | Protobuf::io::CodedOutputStream coded_stream(&stream); |
286 | 0 | coded_stream.WriteVarint64(message.ByteSizeLong()); |
287 | 0 | message.SerializeWithCachedSizes(&coded_stream); |
288 | 0 | break; |
289 | 0 | } |
290 | 0 | case envoy::config::tap::v3::OutputSink::PROTO_BINARY: |
291 | 0 | case envoy::config::tap::v3::OutputSink::PROTO_TEXT: |
292 | 0 | PANIC("not implemented"); |
293 | 0 | } |
294 | | |
295 | 0 | Buffer::OwnedImpl output_buffer{output_string}; |
296 | 0 | stream()->getDecoderFilterCallbacks().encodeData(output_buffer, end_stream); |
297 | 0 | } |
298 | | |
299 | | std::shared_ptr<AdminHandler::AttachedRequest> AdminHandler::AttachedRequest::createAttachedRequest( |
300 | | AdminHandler* admin_handler, const envoy::admin::v3::TapRequest& tap_request, |
301 | 0 | Server::AdminStream* admin_stream) { |
302 | 0 | using ProtoOutputSink = envoy::config::tap::v3::OutputSink; |
303 | |
|
304 | 0 | const ProtoOutputSink& sink = tap_request.tap_config().output_config().sinks(0); |
305 | |
|
306 | 0 | switch (sink.output_sink_type_case()) { |
307 | 0 | case ProtoOutputSink::kBufferedAdmin: |
308 | 0 | return std::make_shared<AttachedRequestBuffered>(admin_handler, tap_request, admin_stream); |
309 | 0 | default: |
310 | 0 | return std::make_shared<AttachedRequest>(admin_handler, tap_request, admin_stream); |
311 | 0 | } |
312 | 0 | } |
313 | | |
314 | | } // namespace Tap |
315 | | } // namespace Common |
316 | | } // namespace Extensions |
317 | | } // namespace Envoy |