LCOV - code coverage report
Current view: top level - source/extensions/transport_sockets/tap - tap_config_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 96 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 6 0.0 %

          Line data    Source code
       1             : #include "source/extensions/transport_sockets/tap/tap_config_impl.h"
       2             : 
       3             : #include "envoy/data/tap/v3/transport.pb.h"
       4             : 
       5             : #include "source/common/common/assert.h"
       6             : #include "source/common/network/utility.h"
       7             : 
       8             : namespace Envoy {
       9             : namespace Extensions {
      10             : namespace TransportSockets {
      11             : namespace Tap {
      12             : 
      13             : namespace TapCommon = Extensions::Common::Tap;
      14             : 
      15             : PerSocketTapperImpl::PerSocketTapperImpl(SocketTapConfigSharedPtr config,
      16             :                                          const Network::Connection& connection)
      17             :     : config_(std::move(config)),
      18             :       sink_handle_(config_->createPerTapSinkHandleManager(connection.id())),
      19           0 :       connection_(connection), statuses_(config_->createMatchStatusVector()) {
      20           0 :   config_->rootMatcher().onNewStream(statuses_);
      21           0 :   if (config_->streaming() && config_->rootMatcher().matchStatus(statuses_).matches_) {
      22             :     // TODO(mattklein123): For IP client connections, local address will not be populated until
      23             :     // connection. We should re-emit connection information after connection so the streaming
      24             :     // trace gets the local address.
      25           0 :     TapCommon::TraceWrapperPtr trace = makeTraceSegment();
      26           0 :     fillConnectionInfo(*trace->mutable_socket_streamed_trace_segment()->mutable_connection());
      27           0 :     sink_handle_->submitTrace(std::move(trace));
      28           0 :   }
      29           0 : }
      30             : 
      31           0 : void PerSocketTapperImpl::fillConnectionInfo(envoy::data::tap::v3::Connection& connection) {
      32           0 :   if (connection_.connectionInfoProvider().localAddress() != nullptr) {
      33             :     // Local address might not be populated before a client connection is connected.
      34           0 :     Network::Utility::addressToProtobufAddress(*connection_.connectionInfoProvider().localAddress(),
      35           0 :                                                *connection.mutable_local_address());
      36           0 :   }
      37           0 :   Network::Utility::addressToProtobufAddress(*connection_.connectionInfoProvider().remoteAddress(),
      38           0 :                                              *connection.mutable_remote_address());
      39           0 : }
      40             : 
      41           0 : void PerSocketTapperImpl::closeSocket(Network::ConnectionEvent) {
      42           0 :   if (!config_->rootMatcher().matchStatus(statuses_).matches_) {
      43           0 :     return;
      44           0 :   }
      45             : 
      46           0 :   if (config_->streaming()) {
      47           0 :     TapCommon::TraceWrapperPtr trace = makeTraceSegment();
      48           0 :     auto& event = *trace->mutable_socket_streamed_trace_segment()->mutable_event();
      49           0 :     initEvent(event);
      50           0 :     event.mutable_closed();
      51           0 :     sink_handle_->submitTrace(std::move(trace));
      52           0 :   } else {
      53           0 :     makeBufferedTraceIfNeeded();
      54           0 :     fillConnectionInfo(*buffered_trace_->mutable_socket_buffered_trace()->mutable_connection());
      55           0 :     sink_handle_->submitTrace(std::move(buffered_trace_));
      56           0 :   }
      57             : 
      58             :   // Here we explicitly reset the sink_handle_ to release any sink resources and force a flush
      59             :   // of any data (e.g., files). This is not explicitly needed in production, but is needed in
      60             :   // tests to avoid race conditions due to deferred deletion. We could also do this with a stat,
      61             :   // but this seems fine in general and is simpler.
      62           0 :   sink_handle_.reset();
      63           0 : }
      64             : 
      65           0 : void PerSocketTapperImpl::initEvent(envoy::data::tap::v3::SocketEvent& event) {
      66           0 :   event.mutable_timestamp()->MergeFrom(Protobuf::util::TimeUtil::NanosecondsToTimestamp(
      67           0 :       std::chrono::duration_cast<std::chrono::nanoseconds>(
      68           0 :           config_->timeSource().systemTime().time_since_epoch())
      69           0 :           .count()));
      70           0 : }
      71             : 
      72           0 : void PerSocketTapperImpl::onRead(const Buffer::Instance& data, uint32_t bytes_read) {
      73           0 :   if (!config_->rootMatcher().matchStatus(statuses_).matches_) {
      74           0 :     return;
      75           0 :   }
      76             : 
      77           0 :   if (config_->streaming()) {
      78           0 :     TapCommon::TraceWrapperPtr trace = makeTraceSegment();
      79           0 :     auto& event = *trace->mutable_socket_streamed_trace_segment()->mutable_event();
      80           0 :     initEvent(event);
      81           0 :     TapCommon::Utility::addBufferToProtoBytes(*event.mutable_read()->mutable_data(),
      82           0 :                                               config_->maxBufferedRxBytes(), data,
      83           0 :                                               data.length() - bytes_read, bytes_read);
      84           0 :     sink_handle_->submitTrace(std::move(trace));
      85           0 :   } else {
      86           0 :     if (buffered_trace_ != nullptr && buffered_trace_->socket_buffered_trace().read_truncated()) {
      87           0 :       return;
      88           0 :     }
      89             : 
      90           0 :     makeBufferedTraceIfNeeded();
      91           0 :     auto& event = *buffered_trace_->mutable_socket_buffered_trace()->add_events();
      92           0 :     initEvent(event);
      93           0 :     ASSERT(rx_bytes_buffered_ <= config_->maxBufferedRxBytes());
      94           0 :     buffered_trace_->mutable_socket_buffered_trace()->set_read_truncated(
      95           0 :         TapCommon::Utility::addBufferToProtoBytes(*event.mutable_read()->mutable_data(),
      96           0 :                                                   config_->maxBufferedRxBytes() -
      97           0 :                                                       rx_bytes_buffered_,
      98           0 :                                                   data, data.length() - bytes_read, bytes_read));
      99           0 :     rx_bytes_buffered_ += event.read().data().as_bytes().size();
     100           0 :   }
     101           0 : }
     102             : 
     103             : void PerSocketTapperImpl::onWrite(const Buffer::Instance& data, uint32_t bytes_written,
     104           0 :                                   bool end_stream) {
     105           0 :   if (!config_->rootMatcher().matchStatus(statuses_).matches_) {
     106           0 :     return;
     107           0 :   }
     108             : 
     109           0 :   if (config_->streaming()) {
     110           0 :     TapCommon::TraceWrapperPtr trace = makeTraceSegment();
     111           0 :     auto& event = *trace->mutable_socket_streamed_trace_segment()->mutable_event();
     112           0 :     initEvent(event);
     113           0 :     TapCommon::Utility::addBufferToProtoBytes(*event.mutable_write()->mutable_data(),
     114           0 :                                               config_->maxBufferedTxBytes(), data, 0,
     115           0 :                                               bytes_written);
     116           0 :     event.mutable_write()->set_end_stream(end_stream);
     117           0 :     sink_handle_->submitTrace(std::move(trace));
     118           0 :   } else {
     119           0 :     if (buffered_trace_ != nullptr && buffered_trace_->socket_buffered_trace().write_truncated()) {
     120           0 :       return;
     121           0 :     }
     122             : 
     123           0 :     makeBufferedTraceIfNeeded();
     124           0 :     auto& event = *buffered_trace_->mutable_socket_buffered_trace()->add_events();
     125           0 :     initEvent(event);
     126           0 :     ASSERT(tx_bytes_buffered_ <= config_->maxBufferedTxBytes());
     127           0 :     buffered_trace_->mutable_socket_buffered_trace()->set_write_truncated(
     128           0 :         TapCommon::Utility::addBufferToProtoBytes(
     129           0 :             *event.mutable_write()->mutable_data(),
     130           0 :             config_->maxBufferedTxBytes() - tx_bytes_buffered_, data, 0, bytes_written));
     131           0 :     tx_bytes_buffered_ += event.write().data().as_bytes().size();
     132           0 :     event.mutable_write()->set_end_stream(end_stream);
     133           0 :   }
     134           0 : }
     135             : 
     136             : } // namespace Tap
     137             : } // namespace TransportSockets
     138             : } // namespace Extensions
     139             : } // namespace Envoy

Generated by: LCOV version 1.15