1
#include "source/extensions/common/tap/tap_config_base.h"
2

            
3
#include "envoy/config/tap/v3/common.pb.h"
4
#include "envoy/data/tap/v3/common.pb.h"
5
#include "envoy/data/tap/v3/wrapper.pb.h"
6
#include "envoy/server/transport_socket_config.h"
7

            
8
#include "source/common/common/assert.h"
9
#include "source/common/common/fmt.h"
10
#include "source/common/config/utility.h"
11
#include "source/common/protobuf/utility.h"
12
#include "source/extensions/common/matcher/matcher.h"
13

            
14
#include "absl/container/fixed_array.h"
15

            
16
namespace Envoy {
17
namespace Extensions {
18
namespace Common {
19
namespace Tap {
20

            
21
using namespace Matcher;
22

            
23
bool Utility::addBufferToProtoBytes(envoy::data::tap::v3::Body& output_body,
24
                                    uint32_t max_buffered_bytes, const Buffer::Instance& data,
25
122
                                    uint32_t buffer_start_offset, uint32_t buffer_length_to_copy) {
26
  // TODO(mattklein123): Figure out if we can use the buffer API here directly in some way. This is
27
  // is not trivial if we want to avoid extra copies since we end up appending to the existing
28
  // protobuf string.
29

            
30
  // Note that max_buffered_bytes is assumed to include any data already contained in output_bytes.
31
  // This is to account for callers that may be tracking this over multiple body objects.
32
122
  ASSERT(buffer_start_offset + buffer_length_to_copy <= data.length());
33
122
  const uint32_t final_bytes_to_copy = std::min(max_buffered_bytes, buffer_length_to_copy);
34

            
35
122
  Buffer::RawSliceVector slices = data.getRawSlices();
36
122
  trimSlices(slices, buffer_start_offset, final_bytes_to_copy);
37
122
  for (const Buffer::RawSlice& slice : slices) {
38
122
    output_body.mutable_as_bytes()->append(static_cast<const char*>(slice.mem_), slice.len_);
39
122
  }
40

            
41
122
  if (final_bytes_to_copy < buffer_length_to_copy) {
42
13
    output_body.set_truncated(true);
43
13
    return true;
44
109
  } else {
45
109
    return false;
46
109
  }
47
122
}
48

            
49
TapConfigBaseImpl::TapConfigBaseImpl(const envoy::config::tap::v3::TapConfig& proto_config,
50
                                     Common::Tap::Sink* admin_streamer,
51
                                     Server::Configuration::GenericFactoryContext& context)
52
40
    : max_buffered_rx_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
53
          proto_config.output_config(), max_buffered_rx_bytes, DefaultMaxBufferedBytes)),
54
40
      max_buffered_tx_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
55
          proto_config.output_config(), max_buffered_tx_bytes, DefaultMaxBufferedBytes)),
56
40
      streaming_(proto_config.output_config().streaming()) {
57

            
58
40
  using ProtoOutputSink = envoy::config::tap::v3::OutputSink;
59
40
  auto& sinks = proto_config.output_config().sinks();
60
40
  ASSERT(sinks.size() == 1);
61
  // TODO(mattklein123): Add per-sink checks to make sure format makes sense. I.e., when using
62
  // streaming, we should require the length delimited version of binary proto, etc.
63
40
  sink_format_ = sinks[0].format();
64
40
  sink_type_ = sinks[0].output_sink_type_case();
65

            
66
40
  if (PROTOBUF_GET_OPTIONAL_WRAPPED(proto_config.output_config(), min_streamed_sent_bytes) !=
67
40
      absl::nullopt) {
68
1
    min_streamed_sent_bytes_ =
69
1
        std::max(proto_config.output_config().min_streamed_sent_bytes().value(),
70
1
                 DefaultMinStreamedSentBytes);
71
1
  }
72

            
73
40
  switch (sink_type_) {
74
10
  case ProtoOutputSink::OutputSinkTypeCase::kBufferedAdmin:
75
10
    if (admin_streamer == nullptr) {
76
1
      throw EnvoyException(fmt::format("Output sink type BufferedAdmin requires that the admin "
77
1
                                       "output will be configured via admin"));
78
1
    }
79
    // TODO(mattklein123): Graceful failure, error message, and test if someone specifies an
80
    // admin stream output with the wrong format.
81
9
    RELEASE_ASSERT(
82
9
        sink_format_ == ProtoOutputSink::JSON_BODY_AS_BYTES ||
83
9
            sink_format_ == ProtoOutputSink::JSON_BODY_AS_STRING ||
84
9
            sink_format_ == ProtoOutputSink::PROTO_BINARY_LENGTH_DELIMITED,
85
9
        "buffered admin output only supports JSON or length delimited proto binary formats");
86
9
    sink_to_use_ = admin_streamer;
87
9
    break;
88
12
  case ProtoOutputSink::OutputSinkTypeCase::kStreamingAdmin:
89
12
    if (admin_streamer == nullptr) {
90
1
      throw EnvoyException(fmt::format("Output sink type StreamingAdmin requires that the admin "
91
1
                                       "output will be configured via admin"));
92
1
    }
93
    // TODO(mattklein123): Graceful failure, error message, and test if someone specifies an
94
    // admin stream output with the wrong format.
95
    // TODO(davidpeet8): Simple change to enable PROTO_BINARY_LENGTH_DELIMITED format -
96
    // functionality already implemented for kBufferedAdmin
97
11
    RELEASE_ASSERT(sink_format_ == ProtoOutputSink::JSON_BODY_AS_BYTES ||
98
11
                       sink_format_ == ProtoOutputSink::JSON_BODY_AS_STRING,
99
11
                   "streaming admin output only supports JSON formats");
100
11
    sink_to_use_ = admin_streamer;
101
11
    break;
102
16
  case ProtoOutputSink::OutputSinkTypeCase::kFilePerTap:
103
16
    sink_ = std::make_unique<FilePerTapSink>(sinks[0].file_per_tap());
104
16
    sink_to_use_ = sink_.get();
105
16
    break;
106
2
  case ProtoOutputSink::OutputSinkTypeCase::kCustomSink: {
107
2
    TapSinkFactory& tap_sink_factory =
108
2
        Envoy::Config::Utility::getAndCheckFactory<TapSinkFactory>(sinks[0].custom_sink());
109

            
110
    // extract message validation visitor from the context and use it to define config
111
2
    ProtobufTypes::MessagePtr config = Config::Utility::translateAnyToFactoryConfig(
112
2
        sinks[0].custom_sink().typed_config(), context.messageValidationVisitor(),
113
2
        tap_sink_factory);
114
2
    sink_ = tap_sink_factory.createSinkPtr(*config, context);
115
2
    sink_to_use_ = sink_.get();
116
2
    break;
117
11
  }
118
  case envoy::config::tap::v3::OutputSink::OutputSinkTypeCase::kStreamingGrpc:
119
    PANIC("not implemented");
120
  case envoy::config::tap::v3::OutputSink::OutputSinkTypeCase::OUTPUT_SINK_TYPE_NOT_SET:
121
    PANIC_DUE_TO_CORRUPT_ENUM;
122
40
  }
123

            
124
38
  envoy::config::common::matcher::v3::MatchPredicate match;
125
38
  if (proto_config.has_match()) {
126
    // Use the match field whenever it is set.
127
36
    match = proto_config.match();
128
37
  } else if (proto_config.has_match_config()) {
129
    // Fallback to use the deprecated match_config field and upgrade (wire cast) it to the new
130
    // MatchPredicate which is backward compatible with the old MatchPredicate originally
131
    // introduced in the Tap filter.
132
1
    if (!match.ParseFromString(proto_config.match_config().SerializeAsString())) {
133
      // This should should generally succeed, but if there are malformed UTF-8 strings in a
134
      // message, this can fail.
135
      throw EnvoyException("Unable to deserialize proto.");
136
    }
137
2
  } else {
138
1
    throw EnvoyException(fmt::format("Neither match nor match_config is set in TapConfig: {}",
139
1
                                     proto_config.DebugString()));
140
1
  }
141

            
142
37
  buildMatcher(match, matchers_, context.serverFactoryContext());
143
37
}
144

            
145
330
const Matcher& TapConfigBaseImpl::rootMatcher() const {
146
330
  ASSERT(!matchers_.empty());
147
330
  return *matchers_[0];
148
330
}
149

            
150
namespace {
151
14
void swapBytesToString(envoy::data::tap::v3::Body& body) {
152
14
  body.set_allocated_as_string(body.release_as_bytes());
153
14
}
154
} // namespace
155

            
156
void Utility::bodyBytesToString(envoy::data::tap::v3::TraceWrapper& trace,
157
74
                                envoy::config::tap::v3::OutputSink::Format sink_format) {
158
  // Swap the "bytes" string into the "string" string. This is done purely so that JSON
159
  // serialization will serialize as a string vs. doing base64 encoding.
160
74
  if (sink_format != envoy::config::tap::v3::OutputSink::JSON_BODY_AS_STRING) {
161
65
    return;
162
65
  }
163

            
164
9
  switch (trace.trace_case()) {
165
2
  case envoy::data::tap::v3::TraceWrapper::TraceCase::kHttpBufferedTrace: {
166
2
    auto* http_trace = trace.mutable_http_buffered_trace();
167
2
    if (http_trace->has_request() && http_trace->request().has_body()) {
168
2
      swapBytesToString(*http_trace->mutable_request()->mutable_body());
169
2
    }
170
2
    if (http_trace->has_response() && http_trace->response().has_body()) {
171
2
      swapBytesToString(*http_trace->mutable_response()->mutable_body());
172
2
    }
173
2
    break;
174
  }
175
2
  case envoy::data::tap::v3::TraceWrapper::TraceCase::kHttpStreamedTraceSegment: {
176
2
    auto* http_trace = trace.mutable_http_streamed_trace_segment();
177
2
    if (http_trace->has_request_body_chunk()) {
178
1
      swapBytesToString(*http_trace->mutable_request_body_chunk());
179
1
    }
180
2
    if (http_trace->has_response_body_chunk()) {
181
1
      swapBytesToString(*http_trace->mutable_response_body_chunk());
182
1
    }
183
2
    break;
184
  }
185
1
  case envoy::data::tap::v3::TraceWrapper::TraceCase::kSocketBufferedTrace: {
186
1
    auto* socket_trace = trace.mutable_socket_buffered_trace();
187
2
    for (auto& event : *socket_trace->mutable_events()) {
188
2
      if (event.has_read()) {
189
1
        swapBytesToString(*event.mutable_read()->mutable_data());
190
1
      } else {
191
1
        ASSERT(event.has_write());
192
1
        swapBytesToString(*event.mutable_write()->mutable_data());
193
1
      }
194
2
    }
195
1
    break;
196
  }
197
4
  case envoy::data::tap::v3::TraceWrapper::TraceCase::kSocketStreamedTraceSegment: {
198
4
    if (trace.socket_streamed_trace_segment().has_events()) {
199
      // Multiple events in each streamed trace.
200
2
      auto* socket_trace_events = trace.mutable_socket_streamed_trace_segment()->mutable_events();
201
4
      for (auto& event : *socket_trace_events->mutable_events()) {
202
4
        if (event.has_read()) {
203
2
          swapBytesToString(*event.mutable_read()->mutable_data());
204
2
        } else if (event.has_write()) {
205
2
          swapBytesToString(*event.mutable_write()->mutable_data());
206
2
        }
207
        // else.
208
        // The event which has no read or write field.
209
4
      }
210
2
    } else {
211
      // Single event in each streamed trace.
212
2
      auto& event = *trace.mutable_socket_streamed_trace_segment()->mutable_event();
213
2
      if (event.has_read()) {
214
1
        swapBytesToString(*event.mutable_read()->mutable_data());
215
1
      } else if (event.has_write()) {
216
1
        swapBytesToString(*event.mutable_write()->mutable_data());
217
1
      }
218
2
    }
219
4
    break;
220
  }
221
  case envoy::data::tap::v3::TraceWrapper::TraceCase::TRACE_NOT_SET:
222
    PANIC_DUE_TO_CORRUPT_ENUM;
223
9
  }
224
9
}
225

            
226
67
void TapConfigBaseImpl::PerTapSinkHandleManagerImpl::submitTrace(TraceWrapperPtr&& trace) {
227
67
  Utility::bodyBytesToString(*trace, parent_.sink_format_);
228
67
  handle_->submitTrace(std::move(trace), parent_.sink_format_);
229
67
}
230

            
231
void FilePerTapSink::FilePerTapSinkHandle::submitTrace(
232
34
    TraceWrapperPtr&& trace, envoy::config::tap::v3::OutputSink::Format format) {
233
34
  if (!output_file_.is_open()) {
234
16
    std::string path = fmt::format("{}_{}", parent_.config_.path_prefix(), trace_id_);
235
16
    switch (format) {
236
      PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
237
6
    case envoy::config::tap::v3::OutputSink::PROTO_BINARY:
238
6
      path += MessageUtil::FileExtensions::get().ProtoBinary;
239
6
      break;
240
8
    case envoy::config::tap::v3::OutputSink::PROTO_BINARY_LENGTH_DELIMITED:
241
8
      path += MessageUtil::FileExtensions::get().ProtoBinaryLengthDelimited;
242
8
      break;
243
1
    case envoy::config::tap::v3::OutputSink::PROTO_TEXT:
244
1
      path += MessageUtil::FileExtensions::get().ProtoText;
245
1
      break;
246
1
    case envoy::config::tap::v3::OutputSink::JSON_BODY_AS_STRING:
247
1
    case envoy::config::tap::v3::OutputSink::JSON_BODY_AS_BYTES:
248
1
      path += MessageUtil::FileExtensions::get().Json;
249
1
      break;
250
16
    }
251

            
252
16
    ENVOY_LOG_MISC(debug, "Opening tap file for [id={}] to {}", trace_id_, path);
253
    // When reading and writing binary files, we need to be sure std::ios_base::binary
254
    // is set, otherwise we will not get the expected results on Windows
255
16
    output_file_.open(path, std::ios_base::binary);
256
16
  }
257

            
258
34
  ENVOY_LOG_MISC(trace, "Tap for [id={}]: {}", trace_id_, trace->DebugString());
259

            
260
34
  switch (format) {
261
    PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
262
6
  case envoy::config::tap::v3::OutputSink::PROTO_BINARY:
263
6
    trace->SerializeToOstream(&output_file_);
264
6
    break;
265
26
  case envoy::config::tap::v3::OutputSink::PROTO_BINARY_LENGTH_DELIMITED: {
266
26
    Protobuf::io::OstreamOutputStream stream(&output_file_);
267
26
    Protobuf::io::CodedOutputStream coded_stream(&stream);
268
26
    coded_stream.WriteVarint32(trace->ByteSize());
269
26
    trace->SerializeWithCachedSizes(&coded_stream);
270
26
    break;
271
  }
272
1
  case envoy::config::tap::v3::OutputSink::PROTO_TEXT:
273
1
    output_file_ << MessageUtil::toTextProto(*trace);
274
1
    break;
275
1
  case envoy::config::tap::v3::OutputSink::JSON_BODY_AS_STRING:
276
1
  case envoy::config::tap::v3::OutputSink::JSON_BODY_AS_BYTES:
277
1
    output_file_ << MessageUtil::getJsonStringFromMessageOrError(*trace, true, true);
278
1
    break;
279
34
  }
280
34
}
281

            
282
} // namespace Tap
283
} // namespace Common
284
} // namespace Extensions
285
} // namespace Envoy