Line data Source code
1 : #include "source/extensions/common/tap/tap_config_base.h" 2 : 3 : #include "envoy/config/tap/v3/common.pb.h" 4 : #include "envoy/data/tap/v3/common.pb.h" 5 : #include "envoy/data/tap/v3/wrapper.pb.h" 6 : #include "envoy/server/transport_socket_config.h" 7 : 8 : #include "source/common/common/assert.h" 9 : #include "source/common/common/fmt.h" 10 : #include "source/common/config/utility.h" 11 : #include "source/common/protobuf/utility.h" 12 : #include "source/extensions/common/matcher/matcher.h" 13 : 14 : #include "absl/container/fixed_array.h" 15 : 16 : namespace Envoy { 17 : namespace Extensions { 18 : namespace Common { 19 : namespace Tap { 20 : 21 : using namespace Matcher; 22 : 23 : bool Utility::addBufferToProtoBytes(envoy::data::tap::v3::Body& output_body, 24 : uint32_t max_buffered_bytes, const Buffer::Instance& data, 25 0 : uint32_t buffer_start_offset, uint32_t buffer_length_to_copy) { 26 : // TODO(mattklein123): Figure out if we can use the buffer API here directly in some way. This is 27 : // is not trivial if we want to avoid extra copies since we end up appending to the existing 28 : // protobuf string. 29 : 30 : // Note that max_buffered_bytes is assumed to include any data already contained in output_bytes. 31 : // This is to account for callers that may be tracking this over multiple body objects. 32 0 : ASSERT(buffer_start_offset + buffer_length_to_copy <= data.length()); 33 0 : const uint32_t final_bytes_to_copy = std::min(max_buffered_bytes, buffer_length_to_copy); 34 : 35 0 : Buffer::RawSliceVector slices = data.getRawSlices(); 36 0 : trimSlices(slices, buffer_start_offset, final_bytes_to_copy); 37 0 : for (const Buffer::RawSlice& slice : slices) { 38 0 : output_body.mutable_as_bytes()->append(static_cast<const char*>(slice.mem_), slice.len_); 39 0 : } 40 : 41 0 : if (final_bytes_to_copy < buffer_length_to_copy) { 42 0 : output_body.set_truncated(true); 43 0 : return true; 44 0 : } else { 45 0 : return false; 46 0 : } 47 0 : } 48 : 49 : TapConfigBaseImpl::TapConfigBaseImpl(const envoy::config::tap::v3::TapConfig& proto_config, 50 : Common::Tap::Sink* admin_streamer, SinkContext context) 51 : : max_buffered_rx_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT( 52 : proto_config.output_config(), max_buffered_rx_bytes, DefaultMaxBufferedBytes)), 53 : max_buffered_tx_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT( 54 : proto_config.output_config(), max_buffered_tx_bytes, DefaultMaxBufferedBytes)), 55 0 : streaming_(proto_config.output_config().streaming()) { 56 : 57 0 : using ProtoOutputSink = envoy::config::tap::v3::OutputSink; 58 0 : auto& sinks = proto_config.output_config().sinks(); 59 0 : ASSERT(sinks.size() == 1); 60 : // TODO(mattklein123): Add per-sink checks to make sure format makes sense. I.e., when using 61 : // streaming, we should require the length delimited version of binary proto, etc. 62 0 : sink_format_ = sinks[0].format(); 63 0 : sink_type_ = sinks[0].output_sink_type_case(); 64 : 65 0 : switch (sink_type_) { 66 0 : case ProtoOutputSink::OutputSinkTypeCase::kBufferedAdmin: 67 0 : ASSERT(admin_streamer != nullptr, "admin output must be configured via admin"); 68 : // TODO(mattklein123): Graceful failure, error message, and test if someone specifies an 69 : // admin stream output with the wrong format. 70 0 : RELEASE_ASSERT( 71 0 : sink_format_ == ProtoOutputSink::JSON_BODY_AS_BYTES || 72 0 : sink_format_ == ProtoOutputSink::JSON_BODY_AS_STRING || 73 0 : sink_format_ == ProtoOutputSink::PROTO_BINARY_LENGTH_DELIMITED, 74 0 : "buffered admin output only supports JSON or length delimited proto binary formats"); 75 0 : sink_to_use_ = admin_streamer; 76 0 : break; 77 0 : case ProtoOutputSink::OutputSinkTypeCase::kStreamingAdmin: 78 0 : ASSERT(admin_streamer != nullptr, "admin output must be configured via admin"); 79 : // TODO(mattklein123): Graceful failure, error message, and test if someone specifies an 80 : // admin stream output with the wrong format. 81 : // TODO(davidpeet8): Simple change to enable PROTO_BINARY_LENGTH_DELIMITED format - 82 : // functionality already implemented for kBufferedAdmin 83 0 : RELEASE_ASSERT(sink_format_ == ProtoOutputSink::JSON_BODY_AS_BYTES || 84 0 : sink_format_ == ProtoOutputSink::JSON_BODY_AS_STRING, 85 0 : "streaming admin output only supports JSON formats"); 86 0 : sink_to_use_ = admin_streamer; 87 0 : break; 88 0 : case ProtoOutputSink::OutputSinkTypeCase::kFilePerTap: 89 0 : sink_ = std::make_unique<FilePerTapSink>(sinks[0].file_per_tap()); 90 0 : sink_to_use_ = sink_.get(); 91 0 : break; 92 0 : case ProtoOutputSink::OutputSinkTypeCase::kCustomSink: { 93 0 : TapSinkFactory& tap_sink_factory = 94 0 : Envoy::Config::Utility::getAndCheckFactory<TapSinkFactory>(sinks[0].custom_sink()); 95 : 96 : // extract message validation visitor from the context and use it to define config 97 0 : ProtobufTypes::MessagePtr config; 98 0 : using TsfContextRef = 99 0 : std::reference_wrapper<Server::Configuration::TransportSocketFactoryContext>; 100 0 : using HttpContextRef = std::reference_wrapper<Server::Configuration::FactoryContext>; 101 0 : if (absl::holds_alternative<TsfContextRef>(context)) { 102 0 : Server::Configuration::TransportSocketFactoryContext& tsf_context = 103 0 : absl::get<TsfContextRef>(context).get(); 104 0 : config = Config::Utility::translateAnyToFactoryConfig(sinks[0].custom_sink().typed_config(), 105 0 : tsf_context.messageValidationVisitor(), 106 0 : tap_sink_factory); 107 0 : } else { 108 0 : Server::Configuration::FactoryContext& http_context = 109 0 : absl::get<HttpContextRef>(context).get(); 110 0 : config = Config::Utility::translateAnyToFactoryConfig( 111 0 : sinks[0].custom_sink().typed_config(), 112 0 : http_context.serverFactoryContext().messageValidationContext().staticValidationVisitor(), 113 0 : tap_sink_factory); 114 0 : } 115 : 116 0 : sink_ = tap_sink_factory.createSinkPtr(*config, context); 117 0 : sink_to_use_ = sink_.get(); 118 0 : break; 119 0 : } 120 0 : case envoy::config::tap::v3::OutputSink::OutputSinkTypeCase::kStreamingGrpc: 121 0 : PANIC("not implemented"); 122 0 : case envoy::config::tap::v3::OutputSink::OutputSinkTypeCase::OUTPUT_SINK_TYPE_NOT_SET: 123 0 : PANIC_DUE_TO_CORRUPT_ENUM; 124 0 : } 125 : 126 0 : envoy::config::common::matcher::v3::MatchPredicate match; 127 0 : if (proto_config.has_match()) { 128 : // Use the match field whenever it is set. 129 0 : match = proto_config.match(); 130 0 : } else if (proto_config.has_match_config()) { 131 : // Fallback to use the deprecated match_config field and upgrade (wire cast) it to the new 132 : // MatchPredicate which is backward compatible with the old MatchPredicate originally 133 : // introduced in the Tap filter. 134 0 : MessageUtil::wireCast(proto_config.match_config(), match); 135 0 : } else { 136 0 : throw EnvoyException(fmt::format("Neither match nor match_config is set in TapConfig: {}", 137 0 : proto_config.DebugString())); 138 0 : } 139 0 : buildMatcher(match, matchers_); 140 0 : } 141 : 142 0 : const Matcher& TapConfigBaseImpl::rootMatcher() const { 143 0 : ASSERT(!matchers_.empty()); 144 0 : return *matchers_[0]; 145 0 : } 146 : 147 : namespace { 148 0 : void swapBytesToString(envoy::data::tap::v3::Body& body) { 149 0 : body.set_allocated_as_string(body.release_as_bytes()); 150 0 : } 151 : } // namespace 152 : 153 : void Utility::bodyBytesToString(envoy::data::tap::v3::TraceWrapper& trace, 154 0 : envoy::config::tap::v3::OutputSink::Format sink_format) { 155 : // Swap the "bytes" string into the "string" string. This is done purely so that JSON 156 : // serialization will serialize as a string vs. doing base64 encoding. 157 0 : if (sink_format != envoy::config::tap::v3::OutputSink::JSON_BODY_AS_STRING) { 158 0 : return; 159 0 : } 160 : 161 0 : switch (trace.trace_case()) { 162 0 : case envoy::data::tap::v3::TraceWrapper::TraceCase::kHttpBufferedTrace: { 163 0 : auto* http_trace = trace.mutable_http_buffered_trace(); 164 0 : if (http_trace->has_request() && http_trace->request().has_body()) { 165 0 : swapBytesToString(*http_trace->mutable_request()->mutable_body()); 166 0 : } 167 0 : if (http_trace->has_response() && http_trace->response().has_body()) { 168 0 : swapBytesToString(*http_trace->mutable_response()->mutable_body()); 169 0 : } 170 0 : break; 171 0 : } 172 0 : case envoy::data::tap::v3::TraceWrapper::TraceCase::kHttpStreamedTraceSegment: { 173 0 : auto* http_trace = trace.mutable_http_streamed_trace_segment(); 174 0 : if (http_trace->has_request_body_chunk()) { 175 0 : swapBytesToString(*http_trace->mutable_request_body_chunk()); 176 0 : } 177 0 : if (http_trace->has_response_body_chunk()) { 178 0 : swapBytesToString(*http_trace->mutable_response_body_chunk()); 179 0 : } 180 0 : break; 181 0 : } 182 0 : case envoy::data::tap::v3::TraceWrapper::TraceCase::kSocketBufferedTrace: { 183 0 : auto* socket_trace = trace.mutable_socket_buffered_trace(); 184 0 : for (auto& event : *socket_trace->mutable_events()) { 185 0 : if (event.has_read()) { 186 0 : swapBytesToString(*event.mutable_read()->mutable_data()); 187 0 : } else { 188 0 : ASSERT(event.has_write()); 189 0 : swapBytesToString(*event.mutable_write()->mutable_data()); 190 0 : } 191 0 : } 192 0 : break; 193 0 : } 194 0 : case envoy::data::tap::v3::TraceWrapper::TraceCase::kSocketStreamedTraceSegment: { 195 0 : auto& event = *trace.mutable_socket_streamed_trace_segment()->mutable_event(); 196 0 : if (event.has_read()) { 197 0 : swapBytesToString(*event.mutable_read()->mutable_data()); 198 0 : } else if (event.has_write()) { 199 0 : swapBytesToString(*event.mutable_write()->mutable_data()); 200 0 : } 201 0 : break; 202 0 : } 203 0 : case envoy::data::tap::v3::TraceWrapper::TraceCase::TRACE_NOT_SET: 204 0 : PANIC_DUE_TO_CORRUPT_ENUM; 205 0 : } 206 0 : } 207 : 208 0 : void TapConfigBaseImpl::PerTapSinkHandleManagerImpl::submitTrace(TraceWrapperPtr&& trace) { 209 0 : Utility::bodyBytesToString(*trace, parent_.sink_format_); 210 0 : handle_->submitTrace(std::move(trace), parent_.sink_format_); 211 0 : } 212 : 213 : void FilePerTapSink::FilePerTapSinkHandle::submitTrace( 214 0 : TraceWrapperPtr&& trace, envoy::config::tap::v3::OutputSink::Format format) { 215 0 : if (!output_file_.is_open()) { 216 0 : std::string path = fmt::format("{}_{}", parent_.config_.path_prefix(), trace_id_); 217 0 : switch (format) { 218 0 : PANIC_ON_PROTO_ENUM_SENTINEL_VALUES; 219 0 : case envoy::config::tap::v3::OutputSink::PROTO_BINARY: 220 0 : path += MessageUtil::FileExtensions::get().ProtoBinary; 221 0 : break; 222 0 : case envoy::config::tap::v3::OutputSink::PROTO_BINARY_LENGTH_DELIMITED: 223 0 : path += MessageUtil::FileExtensions::get().ProtoBinaryLengthDelimited; 224 0 : break; 225 0 : case envoy::config::tap::v3::OutputSink::PROTO_TEXT: 226 0 : path += MessageUtil::FileExtensions::get().ProtoText; 227 0 : break; 228 0 : case envoy::config::tap::v3::OutputSink::JSON_BODY_AS_BYTES: 229 0 : case envoy::config::tap::v3::OutputSink::JSON_BODY_AS_STRING: 230 0 : path += MessageUtil::FileExtensions::get().Json; 231 0 : break; 232 0 : } 233 : 234 0 : ENVOY_LOG_MISC(debug, "Opening tap file for [id={}] to {}", trace_id_, path); 235 : // When reading and writing binary files, we need to be sure std::ios_base::binary 236 : // is set, otherwise we will not get the expected results on Windows 237 0 : output_file_.open(path, std::ios_base::binary); 238 0 : } 239 : 240 0 : ENVOY_LOG_MISC(trace, "Tap for [id={}]: {}", trace_id_, trace->DebugString()); 241 : 242 0 : switch (format) { 243 0 : PANIC_ON_PROTO_ENUM_SENTINEL_VALUES; 244 0 : case envoy::config::tap::v3::OutputSink::PROTO_BINARY: 245 0 : trace->SerializeToOstream(&output_file_); 246 0 : break; 247 0 : case envoy::config::tap::v3::OutputSink::PROTO_BINARY_LENGTH_DELIMITED: { 248 0 : Protobuf::io::OstreamOutputStream stream(&output_file_); 249 0 : Protobuf::io::CodedOutputStream coded_stream(&stream); 250 0 : coded_stream.WriteVarint32(trace->ByteSize()); 251 0 : trace->SerializeWithCachedSizes(&coded_stream); 252 0 : break; 253 0 : } 254 0 : case envoy::config::tap::v3::OutputSink::PROTO_TEXT: 255 0 : output_file_ << trace->DebugString(); 256 0 : break; 257 0 : case envoy::config::tap::v3::OutputSink::JSON_BODY_AS_BYTES: 258 0 : case envoy::config::tap::v3::OutputSink::JSON_BODY_AS_STRING: 259 0 : output_file_ << MessageUtil::getJsonStringFromMessageOrError(*trace, true, true); 260 0 : break; 261 0 : } 262 0 : } 263 : 264 : } // namespace Tap 265 : } // namespace Common 266 : } // namespace Extensions 267 : } // namespace Envoy