1
#pragma once
2

            
3
#include "envoy/config/tap/v3/common.pb.h"
4
#include "envoy/data/tap/v3/transport.pb.h"
5
#include "envoy/event/timer.h"
6
#include "envoy/server/transport_socket_config.h"
7

            
8
#include "source/extensions/common/tap/tap_config_base.h"
9
#include "source/extensions/transport_sockets/tap/tap_config.h"
10

            
11
namespace Envoy {
12
namespace Extensions {
13
namespace TransportSockets {
14
namespace Tap {
15

            
16
class PerSocketTapperImpl : public PerSocketTapper {
17
public:
18
  PerSocketTapperImpl(
19
      SocketTapConfigSharedPtr config,
20
      const envoy::extensions::transport_sockets::tap::v3::SocketTapConfig& tap_config,
21
      const TransportTapStats& stats, const Network::Connection& connection);
22

            
23
  // PerSocketTapper
24
  void closeSocket(Network::ConnectionEvent event) override;
25
  void onRead(const Buffer::Instance& data, uint32_t bytes_read) override;
26
  void onWrite(const Buffer::Instance& data, uint32_t bytes_written, bool end_stream) override;
27

            
28
private:
29
  void initEvent(envoy::data::tap::v3::SocketEvent&);
30
  void initStreamingEvent(envoy::data::tap::v3::SocketEvent&, uint64_t);
31
12
  void makeStreamedTraceIfNeeded() {
32
12
    if (streamed_trace_ == nullptr) {
33
8
      streamed_trace_ = Extensions::Common::Tap::makeTraceWrapper();
34
8
      streamed_trace_->mutable_socket_streamed_trace_segment()->set_trace_id(connection_.id());
35
8
    }
36
12
  }
37
  void fillConnectionInfo(envoy::data::tap::v3::Connection& connection);
38
25
  void makeBufferedTraceIfNeeded() {
39
25
    if (buffered_trace_ == nullptr) {
40
7
      buffered_trace_ = Extensions::Common::Tap::makeTraceWrapper();
41
7
      buffered_trace_->mutable_socket_buffered_trace()->set_trace_id(connection_.id());
42
7
    }
43
25
  }
44
25
  Extensions::Common::Tap::TraceWrapperPtr makeTraceSegment() {
45
25
    Extensions::Common::Tap::TraceWrapperPtr trace = Extensions::Common::Tap::makeTraceWrapper();
46
25
    trace->mutable_socket_streamed_trace_segment()->set_trace_id(connection_.id());
47
25
    return trace;
48
25
  }
49
  void pegSubmitCounter(const bool is_streaming);
50
  bool shouldSendStreamedMsgByConfiguredSize() const;
51
  bool shouldSubmitStreamedDataPerConfiguredSizeByAgedDuration() const;
52
  void submitStreamedDataPerConfiguredSize();
53
  void handleSendingStreamTappedMsgPerConfigSize(const Buffer::Instance& data,
54
                                                 const uint32_t total_bytes, const bool is_read,
55
                                                 const bool is_end_stream);
56
  // This is the default value for min buffered bytes.
57
  // (This means that per transport socket buffer trace, the minimum amount
58
  // which triggering to send the tapped messages size is 9 bytes).
59
  static constexpr uint32_t DefaultMinBufferedBytes = 9;
60
  // It isn't easy to meet data submit threshold when the configured byte size is too large
61
  // and the tapped data volume is low, therefore, set below buffer aged duration (seconds)
62
  // to make sure that the tapped data is submitted in time.
63
  static constexpr uint32_t DefaultBufferedAgedDuration = 15;
64
  // The tapped data from Transport socket may be incomplete
65
  // for some protocols (e.g., HTTP/2 frames may span multiple reads/writes).
66
  // Add sequence number to allow the receiver to reconstruct byte order and
67
  // determine completeness, similar to TCP sequence numbers.
68
  uint64_t seq_num{};
69
  SocketTapConfigSharedPtr config_;
70
  Extensions::Common::Tap::PerTapSinkHandleManagerPtr sink_handle_;
71
  const Network::Connection& connection_;
72
  Extensions::Common::Tap::Matcher::MatchStatusVector statuses_;
73
  // Must be a shared_ptr because of submitTrace().
74
  Extensions::Common::Tap::TraceWrapperPtr buffered_trace_;
75
  uint32_t rx_bytes_buffered_{};
76
  uint32_t tx_bytes_buffered_{};
77
  const bool should_output_conn_info_per_event_{false};
78
  uint32_t current_streamed_rx_tx_bytes_{0};
79
  Extensions::Common::Tap::TraceWrapperPtr streamed_trace_{nullptr};
80
  const TransportTapStats stats_;
81
};
82

            
83
class SocketTapConfigImpl : public Extensions::Common::Tap::TapConfigBaseImpl,
84
                            public SocketTapConfig,
85
                            public std::enable_shared_from_this<SocketTapConfigImpl> {
86
public:
87
  SocketTapConfigImpl(const envoy::config::tap::v3::TapConfig& proto_config,
88
                      Extensions::Common::Tap::Sink* admin_streamer, TimeSource& time_system,
89
                      Server::Configuration::TransportSocketFactoryContext& context)
90
7
      : Extensions::Common::Tap::TapConfigBaseImpl(std::move(proto_config), admin_streamer,
91
7
                                                   context),
92
7
        time_source_(time_system) {}
93

            
94
  // SocketTapConfig
95
  PerSocketTapperPtr createPerSocketTapper(
96
      const envoy::extensions::transport_sockets::tap::v3::SocketTapConfig& tap_config,
97
8
      const TransportTapStats& stats, const Network::Connection& connection) override {
98
8
    return std::make_unique<PerSocketTapperImpl>(shared_from_this(), tap_config, stats, connection);
99
8
  }
100
23
  TimeSource& timeSource() const override { return time_source_; }
101

            
102
private:
103
  TimeSource& time_source_;
104

            
105
  friend class PerSocketTapperImpl;
106
};
107

            
108
} // namespace Tap
109
} // namespace TransportSockets
110
} // namespace Extensions
111
} // namespace Envoy