1
#include "source/extensions/transport_sockets/tap/tap_config_impl.h"
2

            
3
#include "envoy/data/tap/v3/transport.pb.h"
4

            
5
#include "source/common/common/assert.h"
6
#include "source/common/network/utility.h"
7

            
8
namespace Envoy {
9
namespace Extensions {
10
namespace TransportSockets {
11
namespace Tap {
12

            
13
namespace TapCommon = Extensions::Common::Tap;
14

            
15
PerSocketTapperImpl::PerSocketTapperImpl(
16
    SocketTapConfigSharedPtr config,
17
    const envoy::extensions::transport_sockets::tap::v3::SocketTapConfig& tap_config,
18
    const TransportTapStats& stats, const Network::Connection& connection)
19
17
    : config_(std::move(config)),
20
17
      sink_handle_(config_->createPerTapSinkHandleManager(connection.id())),
21
17
      connection_(connection), statuses_(config_->createMatchStatusVector()),
22
17
      should_output_conn_info_per_event_(tap_config.set_connection_per_event()), stats_(stats) {
23
17
  config_->rootMatcher().onNewStream(statuses_);
24
17
  if (config_->streaming() && config_->rootMatcher().matchStatus(statuses_).matches_) {
25
    // TODO(mattklein123): For IP client connections, local address will not be populated until
26
    // connection. We should re-emit connection information after connection so the streaming
27
    // trace gets the local address.
28
9
    TapCommon::TraceWrapperPtr trace = makeTraceSegment();
29
9
    fillConnectionInfo(*trace->mutable_socket_streamed_trace_segment()->mutable_connection());
30
9
    sink_handle_->submitTrace(std::move(trace));
31
9
    pegSubmitCounter(true);
32
9
  }
33
17
  seq_num++;
34
17
}
35

            
36
34
void PerSocketTapperImpl::fillConnectionInfo(envoy::data::tap::v3::Connection& connection) {
37
34
  if (connection_.connectionInfoProvider().localAddress() != nullptr) {
38
    // Local address might not be populated before a client connection is connected.
39
33
    Network::Utility::addressToProtobufAddress(*connection_.connectionInfoProvider().localAddress(),
40
33
                                               *connection.mutable_local_address());
41
33
  }
42
34
  Network::Utility::addressToProtobufAddress(*connection_.connectionInfoProvider().remoteAddress(),
43
34
                                             *connection.mutable_remote_address());
44
34
}
45

            
46
17
void PerSocketTapperImpl::closeSocket(Network::ConnectionEvent) {
47
17
  if (!config_->rootMatcher().matchStatus(statuses_).matches_) {
48
1
    return;
49
1
  }
50

            
51
16
  if (config_->streaming()) {
52
9
    seq_num++;
53
9
    if (shouldSendStreamedMsgByConfiguredSize()) {
54
4
      makeStreamedTraceIfNeeded();
55
4
      auto& event =
56
4
          *streamed_trace_->mutable_socket_streamed_trace_segment()->mutable_events()->add_events();
57
4
      initStreamingEvent(event, seq_num);
58
4
      event.mutable_closed();
59
      // submit directly and don't check current_streamed_rx_tx_bytes_ any more
60
4
      submitStreamedDataPerConfiguredSize();
61
6
    } else {
62
5
      TapCommon::TraceWrapperPtr trace = makeTraceSegment();
63
5
      auto& event = *trace->mutable_socket_streamed_trace_segment()->mutable_event();
64
5
      initStreamingEvent(event, seq_num);
65
5
      event.mutable_closed();
66
5
      sink_handle_->submitTrace(std::move(trace));
67
5
    }
68
9
    pegSubmitCounter(true);
69
13
  } else {
70
7
    makeBufferedTraceIfNeeded();
71
7
    fillConnectionInfo(*buffered_trace_->mutable_socket_buffered_trace()->mutable_connection());
72
7
    sink_handle_->submitTrace(std::move(buffered_trace_));
73
7
    pegSubmitCounter(false);
74
7
  }
75

            
76
  // Here we explicitly reset the sink_handle_ to release any sink resources and force a flush
77
  // of any data (e.g., files). This is not explicitly needed in production, but is needed in
78
  // tests to avoid race conditions due to deferred deletion. We could also do this with a stat,
79
  // but this seems fine in general and is simpler.
80
16
  sink_handle_.reset();
81
16
}
82

            
83
46
void PerSocketTapperImpl::initEvent(envoy::data::tap::v3::SocketEvent& event) {
84
46
  event.mutable_timestamp()->MergeFrom(Protobuf::util::TimeUtil::NanosecondsToTimestamp(
85
46
      std::chrono::duration_cast<std::chrono::nanoseconds>(
86
46
          config_->timeSource().systemTime().time_since_epoch())
87
46
          .count()));
88
46
}
89

            
90
void PerSocketTapperImpl::initStreamingEvent(envoy::data::tap::v3::SocketEvent& event,
91
28
                                             uint64_t seq_num) {
92
28
  initEvent(event);
93
28
  if (should_output_conn_info_per_event_) {
94
18
    fillConnectionInfo(*event.mutable_connection());
95
18
  }
96
28
  event.set_seq_num(seq_num);
97
28
}
98

            
99
40
void PerSocketTapperImpl::pegSubmitCounter(const bool is_streaming) {
100
40
  if (is_streaming) {
101
33
    stats_.streamed_submit_.inc();
102
33
  } else {
103
7
    stats_.buffered_submit_.inc();
104
7
  }
105
40
}
106

            
107
28
bool PerSocketTapperImpl::shouldSendStreamedMsgByConfiguredSize() const {
108
28
  return config_->minStreamedSentBytes() > 0;
109
28
}
110

            
111
8
void PerSocketTapperImpl::submitStreamedDataPerConfiguredSize() {
112
8
  sink_handle_->submitTrace(std::move(streamed_trace_));
113
8
  streamed_trace_.reset();
114
8
  current_streamed_rx_tx_bytes_ = 0;
115
8
}
116

            
117
6
bool PerSocketTapperImpl::shouldSubmitStreamedDataPerConfiguredSizeByAgedDuration() const {
118
6
  if (streamed_trace_ == nullptr) {
119
    return false;
120
  }
121
6
  const envoy::data::tap::v3::SocketEvents& streamed_events =
122
6
      streamed_trace_->socket_streamed_trace_segment().events();
123
6
  auto& repeated_streamed_events = streamed_events.events();
124
6
  if (repeated_streamed_events.size() < 2) {
125
    // Only one event.
126
3
    return false;
127
3
  }
128

            
129
3
  const Protobuf::Timestamp& first_event_ts = repeated_streamed_events[0].timestamp();
130
3
  const Protobuf::Timestamp& last_event_ts =
131
3
      repeated_streamed_events[repeated_streamed_events.size() - 1].timestamp();
132
3
  return (last_event_ts.seconds() - first_event_ts.seconds()) >=
133
3
         static_cast<int64_t>(DefaultBufferedAgedDuration);
134
6
}
135

            
136
void PerSocketTapperImpl::handleSendingStreamTappedMsgPerConfigSize(const Buffer::Instance& data,
137
                                                                    const uint32_t total_bytes,
138
                                                                    const bool is_read,
139
8
                                                                    const bool is_end_stream) {
140
8
  makeStreamedTraceIfNeeded();
141
8
  auto& event =
142
8
      *streamed_trace_->mutable_socket_streamed_trace_segment()->mutable_events()->add_events();
143
8
  initStreamingEvent(event, seq_num);
144
8
  uint32_t buffer_start_offset = 0;
145
8
  if (is_read) {
146
4
    buffer_start_offset = data.length() - total_bytes;
147
4
    TapCommon::Utility::addBufferToProtoBytes(*event.mutable_read()->mutable_data(), total_bytes,
148
4
                                              data, buffer_start_offset, total_bytes);
149
4
    current_streamed_rx_tx_bytes_ += event.read().data().as_bytes().size();
150
4
  } else {
151
4
    event.mutable_write()->set_end_stream(is_end_stream);
152
4
    TapCommon::Utility::addBufferToProtoBytes(*event.mutable_write()->mutable_data(), total_bytes,
153
4
                                              data, buffer_start_offset, total_bytes);
154
4
    current_streamed_rx_tx_bytes_ += event.write().data().as_bytes().size();
155
4
  }
156

            
157
8
  if (current_streamed_rx_tx_bytes_ >= config_->minStreamedSentBytes() ||
158
8
      shouldSubmitStreamedDataPerConfiguredSizeByAgedDuration()) {
159
4
    submitStreamedDataPerConfiguredSize();
160
4
    pegSubmitCounter(true);
161
4
  }
162
8
}
163

            
164
19
void PerSocketTapperImpl::onRead(const Buffer::Instance& data, uint32_t bytes_read) {
165
19
  if (!config_->rootMatcher().matchStatus(statuses_).matches_) {
166
    return;
167
  }
168
19
  if (config_->streaming()) {
169
10
    if (shouldSendStreamedMsgByConfiguredSize()) {
170
4
      handleSendingStreamTappedMsgPerConfigSize(data, bytes_read, true, false);
171
7
    } else {
172
6
      TapCommon::TraceWrapperPtr trace = makeTraceSegment();
173
6
      auto& event = *trace->mutable_socket_streamed_trace_segment()->mutable_event();
174
6
      initStreamingEvent(event, seq_num);
175
6
      TapCommon::Utility::addBufferToProtoBytes(*event.mutable_read()->mutable_data(),
176
6
                                                config_->maxBufferedRxBytes(), data,
177
6
                                                data.length() - bytes_read, bytes_read);
178
6
      sink_handle_->submitTrace(std::move(trace));
179
6
      pegSubmitCounter(true);
180
6
    }
181
10
    seq_num = seq_num + bytes_read;
182
15
  } else {
183
9
    if (buffered_trace_ != nullptr && buffered_trace_->socket_buffered_trace().read_truncated()) {
184
2
      return;
185
2
    }
186

            
187
7
    makeBufferedTraceIfNeeded();
188
7
    auto& event = *buffered_trace_->mutable_socket_buffered_trace()->add_events();
189
7
    initEvent(event);
190
7
    ASSERT(rx_bytes_buffered_ <= config_->maxBufferedRxBytes());
191
7
    buffered_trace_->mutable_socket_buffered_trace()->set_read_truncated(
192
7
        TapCommon::Utility::addBufferToProtoBytes(*event.mutable_read()->mutable_data(),
193
7
                                                  config_->maxBufferedRxBytes() -
194
7
                                                      rx_bytes_buffered_,
195
7
                                                  data, data.length() - bytes_read, bytes_read));
196
7
    rx_bytes_buffered_ += event.read().data().as_bytes().size();
197
7
  }
198
19
}
199

            
200
void PerSocketTapperImpl::onWrite(const Buffer::Instance& data, uint32_t bytes_written,
201
22
                                  bool end_stream) {
202
22
  if (!config_->rootMatcher().matchStatus(statuses_).matches_) {
203
    return;
204
  }
205

            
206
22
  if (config_->streaming()) {
207
9
    if (shouldSendStreamedMsgByConfiguredSize()) {
208
4
      handleSendingStreamTappedMsgPerConfigSize(data, bytes_written, false, end_stream);
209
6
    } else {
210
5
      TapCommon::TraceWrapperPtr trace = makeTraceSegment();
211
5
      auto& event = *trace->mutable_socket_streamed_trace_segment()->mutable_event();
212
5
      initStreamingEvent(event, seq_num);
213
5
      TapCommon::Utility::addBufferToProtoBytes(*event.mutable_write()->mutable_data(),
214
5
                                                config_->maxBufferedTxBytes(), data, 0,
215
5
                                                bytes_written);
216
5
      event.mutable_write()->set_end_stream(end_stream);
217
5
      sink_handle_->submitTrace(std::move(trace));
218
5
      pegSubmitCounter(true);
219
5
    }
220
9
    seq_num = seq_num + bytes_written;
221
19
  } else {
222
13
    if (buffered_trace_ != nullptr && buffered_trace_->socket_buffered_trace().write_truncated()) {
223
2
      return;
224
2
    }
225

            
226
11
    makeBufferedTraceIfNeeded();
227
11
    auto& event = *buffered_trace_->mutable_socket_buffered_trace()->add_events();
228
11
    initEvent(event);
229
11
    ASSERT(tx_bytes_buffered_ <= config_->maxBufferedTxBytes());
230
11
    buffered_trace_->mutable_socket_buffered_trace()->set_write_truncated(
231
11
        TapCommon::Utility::addBufferToProtoBytes(
232
11
            *event.mutable_write()->mutable_data(),
233
11
            config_->maxBufferedTxBytes() - tx_bytes_buffered_, data, 0, bytes_written));
234
11
    tx_bytes_buffered_ += event.write().data().as_bytes().size();
235
11
    event.mutable_write()->set_end_stream(end_stream);
236
11
  }
237
22
}
238

            
239
} // namespace Tap
240
} // namespace TransportSockets
241
} // namespace Extensions
242
} // namespace Envoy