Line data Source code
1 : #include "source/extensions/transport_sockets/tap/tap_config_impl.h" 2 : 3 : #include "envoy/data/tap/v3/transport.pb.h" 4 : 5 : #include "source/common/common/assert.h" 6 : #include "source/common/network/utility.h" 7 : 8 : namespace Envoy { 9 : namespace Extensions { 10 : namespace TransportSockets { 11 : namespace Tap { 12 : 13 : namespace TapCommon = Extensions::Common::Tap; 14 : 15 : PerSocketTapperImpl::PerSocketTapperImpl(SocketTapConfigSharedPtr config, 16 : const Network::Connection& connection) 17 : : config_(std::move(config)), 18 : sink_handle_(config_->createPerTapSinkHandleManager(connection.id())), 19 0 : connection_(connection), statuses_(config_->createMatchStatusVector()) { 20 0 : config_->rootMatcher().onNewStream(statuses_); 21 0 : if (config_->streaming() && config_->rootMatcher().matchStatus(statuses_).matches_) { 22 : // TODO(mattklein123): For IP client connections, local address will not be populated until 23 : // connection. We should re-emit connection information after connection so the streaming 24 : // trace gets the local address. 25 0 : TapCommon::TraceWrapperPtr trace = makeTraceSegment(); 26 0 : fillConnectionInfo(*trace->mutable_socket_streamed_trace_segment()->mutable_connection()); 27 0 : sink_handle_->submitTrace(std::move(trace)); 28 0 : } 29 0 : } 30 : 31 0 : void PerSocketTapperImpl::fillConnectionInfo(envoy::data::tap::v3::Connection& connection) { 32 0 : if (connection_.connectionInfoProvider().localAddress() != nullptr) { 33 : // Local address might not be populated before a client connection is connected. 34 0 : Network::Utility::addressToProtobufAddress(*connection_.connectionInfoProvider().localAddress(), 35 0 : *connection.mutable_local_address()); 36 0 : } 37 0 : Network::Utility::addressToProtobufAddress(*connection_.connectionInfoProvider().remoteAddress(), 38 0 : *connection.mutable_remote_address()); 39 0 : } 40 : 41 0 : void PerSocketTapperImpl::closeSocket(Network::ConnectionEvent) { 42 0 : if (!config_->rootMatcher().matchStatus(statuses_).matches_) { 43 0 : return; 44 0 : } 45 : 46 0 : if (config_->streaming()) { 47 0 : TapCommon::TraceWrapperPtr trace = makeTraceSegment(); 48 0 : auto& event = *trace->mutable_socket_streamed_trace_segment()->mutable_event(); 49 0 : initEvent(event); 50 0 : event.mutable_closed(); 51 0 : sink_handle_->submitTrace(std::move(trace)); 52 0 : } else { 53 0 : makeBufferedTraceIfNeeded(); 54 0 : fillConnectionInfo(*buffered_trace_->mutable_socket_buffered_trace()->mutable_connection()); 55 0 : sink_handle_->submitTrace(std::move(buffered_trace_)); 56 0 : } 57 : 58 : // Here we explicitly reset the sink_handle_ to release any sink resources and force a flush 59 : // of any data (e.g., files). This is not explicitly needed in production, but is needed in 60 : // tests to avoid race conditions due to deferred deletion. We could also do this with a stat, 61 : // but this seems fine in general and is simpler. 62 0 : sink_handle_.reset(); 63 0 : } 64 : 65 0 : void PerSocketTapperImpl::initEvent(envoy::data::tap::v3::SocketEvent& event) { 66 0 : event.mutable_timestamp()->MergeFrom(Protobuf::util::TimeUtil::NanosecondsToTimestamp( 67 0 : std::chrono::duration_cast<std::chrono::nanoseconds>( 68 0 : config_->timeSource().systemTime().time_since_epoch()) 69 0 : .count())); 70 0 : } 71 : 72 0 : void PerSocketTapperImpl::onRead(const Buffer::Instance& data, uint32_t bytes_read) { 73 0 : if (!config_->rootMatcher().matchStatus(statuses_).matches_) { 74 0 : return; 75 0 : } 76 : 77 0 : if (config_->streaming()) { 78 0 : TapCommon::TraceWrapperPtr trace = makeTraceSegment(); 79 0 : auto& event = *trace->mutable_socket_streamed_trace_segment()->mutable_event(); 80 0 : initEvent(event); 81 0 : TapCommon::Utility::addBufferToProtoBytes(*event.mutable_read()->mutable_data(), 82 0 : config_->maxBufferedRxBytes(), data, 83 0 : data.length() - bytes_read, bytes_read); 84 0 : sink_handle_->submitTrace(std::move(trace)); 85 0 : } else { 86 0 : if (buffered_trace_ != nullptr && buffered_trace_->socket_buffered_trace().read_truncated()) { 87 0 : return; 88 0 : } 89 : 90 0 : makeBufferedTraceIfNeeded(); 91 0 : auto& event = *buffered_trace_->mutable_socket_buffered_trace()->add_events(); 92 0 : initEvent(event); 93 0 : ASSERT(rx_bytes_buffered_ <= config_->maxBufferedRxBytes()); 94 0 : buffered_trace_->mutable_socket_buffered_trace()->set_read_truncated( 95 0 : TapCommon::Utility::addBufferToProtoBytes(*event.mutable_read()->mutable_data(), 96 0 : config_->maxBufferedRxBytes() - 97 0 : rx_bytes_buffered_, 98 0 : data, data.length() - bytes_read, bytes_read)); 99 0 : rx_bytes_buffered_ += event.read().data().as_bytes().size(); 100 0 : } 101 0 : } 102 : 103 : void PerSocketTapperImpl::onWrite(const Buffer::Instance& data, uint32_t bytes_written, 104 0 : bool end_stream) { 105 0 : if (!config_->rootMatcher().matchStatus(statuses_).matches_) { 106 0 : return; 107 0 : } 108 : 109 0 : if (config_->streaming()) { 110 0 : TapCommon::TraceWrapperPtr trace = makeTraceSegment(); 111 0 : auto& event = *trace->mutable_socket_streamed_trace_segment()->mutable_event(); 112 0 : initEvent(event); 113 0 : TapCommon::Utility::addBufferToProtoBytes(*event.mutable_write()->mutable_data(), 114 0 : config_->maxBufferedTxBytes(), data, 0, 115 0 : bytes_written); 116 0 : event.mutable_write()->set_end_stream(end_stream); 117 0 : sink_handle_->submitTrace(std::move(trace)); 118 0 : } else { 119 0 : if (buffered_trace_ != nullptr && buffered_trace_->socket_buffered_trace().write_truncated()) { 120 0 : return; 121 0 : } 122 : 123 0 : makeBufferedTraceIfNeeded(); 124 0 : auto& event = *buffered_trace_->mutable_socket_buffered_trace()->add_events(); 125 0 : initEvent(event); 126 0 : ASSERT(tx_bytes_buffered_ <= config_->maxBufferedTxBytes()); 127 0 : buffered_trace_->mutable_socket_buffered_trace()->set_write_truncated( 128 0 : TapCommon::Utility::addBufferToProtoBytes( 129 0 : *event.mutable_write()->mutable_data(), 130 0 : config_->maxBufferedTxBytes() - tx_bytes_buffered_, data, 0, bytes_written)); 131 0 : tx_bytes_buffered_ += event.write().data().as_bytes().size(); 132 0 : event.mutable_write()->set_end_stream(end_stream); 133 0 : } 134 0 : } 135 : 136 : } // namespace Tap 137 : } // namespace TransportSockets 138 : } // namespace Extensions 139 : } // namespace Envoy