1
#include "source/extensions/access_loggers/fluentd/fluentd_access_log_impl.h"
2

            
3
#include "source/common/buffer/buffer_impl.h"
4

            
5
#include "msgpack.hpp"
6

            
7
namespace Envoy {
8
namespace Extensions {
9
namespace AccessLoggers {
10
namespace Fluentd {
11

            
12
FluentdAccessLoggerImpl::FluentdAccessLoggerImpl(Upstream::ThreadLocalCluster& cluster,
13
                                                 Tcp::AsyncTcpClientPtr client,
14
                                                 Event::Dispatcher& dispatcher,
15
                                                 const FluentdAccessLogConfig& config,
16
                                                 BackOffStrategyPtr backoff_strategy,
17
                                                 Stats::Scope& parent_scope)
18
31
    : FluentdBase(
19
31
          cluster, std::move(client), dispatcher, config.tag(),
20
31
          config.has_retry_options() && config.retry_options().has_max_connect_attempts()
21
31
              ? absl::optional<uint32_t>(config.retry_options().max_connect_attempts().value())
22
31
              : absl::nullopt,
23
31
          parent_scope, config.stat_prefix(), std::move(backoff_strategy),
24
31
          PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_interval, DefaultBufferFlushIntervalMs),
25
31
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, DefaultMaxBufferSize)) {}
26

            
27
10
void FluentdAccessLoggerImpl::packMessage(MessagePackPacker& packer) {
28
  // Creating a Fluentd Forward Protocol Specification (v1) forward mode event as specified in:
29
  // https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#forward-mode
30
10
  packer.pack_array(2); // 1 - tag field, 2 - entries array.
31
10
  packer.pack(tag_);
32
10
  packer.pack_array(entries_.size());
33

            
34
12
  for (auto& entry : entries_) {
35
12
    packer.pack_array(2); // 1 - time, 2 - record.
36
12
    packer.pack(entry->time_);
37
12
    const char* record_bytes = reinterpret_cast<const char*>(&entry->record_[0]);
38
12
    packer.pack_bin_body(record_bytes, entry->record_.size());
39
12
  }
40
10
}
41

            
42
FluentdAccessLog::FluentdAccessLog(AccessLog::FilterPtr&& filter, FluentdFormatterPtr&& formatter,
43
                                   const FluentdAccessLogConfigSharedPtr config,
44
                                   ThreadLocal::SlotAllocator& tls, Random::RandomGenerator& random,
45
                                   FluentdAccessLoggerCacheSharedPtr access_logger_cache)
46
8
    : ImplBase(std::move(filter)), formatter_(std::move(formatter)), tls_slot_(tls.allocateSlot()),
47
8
      config_(config), access_logger_cache_(access_logger_cache) {
48

            
49
8
  uint64_t base_interval_ms = DefaultBaseBackoffIntervalMs;
50
8
  uint64_t max_interval_ms = base_interval_ms * DefaultMaxBackoffIntervalFactor;
51

            
52
8
  if (config->has_retry_options() && config->retry_options().has_backoff_options()) {
53
1
    base_interval_ms = PROTOBUF_GET_MS_OR_DEFAULT(config->retry_options().backoff_options(),
54
1
                                                  base_interval, DefaultBaseBackoffIntervalMs);
55
1
    max_interval_ms =
56
1
        PROTOBUF_GET_MS_OR_DEFAULT(config->retry_options().backoff_options(), max_interval,
57
1
                                   base_interval_ms * DefaultMaxBackoffIntervalFactor);
58
1
  }
59

            
60
8
  tls_slot_->set([config = config_, &random, access_logger_cache = access_logger_cache_,
61
15
                  base_interval_ms, max_interval_ms](Event::Dispatcher&) {
62
15
    BackOffStrategyPtr backoff_strategy = std::make_unique<JitteredExponentialBackOffStrategy>(
63
15
        base_interval_ms, max_interval_ms, random);
64
15
    return std::make_shared<ThreadLocalLogger>(
65
15
        access_logger_cache->getOrCreate(config, random, std::move(backoff_strategy)));
66
15
  });
67
8
}
68

            
69
void FluentdAccessLog::emitLog(const Formatter::Context& context,
70
11
                               const StreamInfo::StreamInfo& stream_info) {
71
11
  auto msgpack = formatter_->format(context, stream_info);
72
11
  uint64_t time = std::chrono::duration_cast<std::chrono::seconds>(
73
11
                      stream_info.timeSource().systemTime().time_since_epoch())
74
11
                      .count();
75
11
  tls_slot_->getTyped<ThreadLocalLogger>().logger_->log(
76
11
      std::make_unique<Entry>(time, std::move(msgpack)));
77
11
}
78

            
79
} // namespace Fluentd
80
} // namespace AccessLoggers
81
} // namespace Extensions
82
} // namespace Envoy