1
#pragma once
2

            
3
#include <chrono>
4

            
5
#include "envoy/extensions/access_loggers/fluentd/v3/fluentd.pb.h"
6
#include "envoy/extensions/access_loggers/fluentd/v3/fluentd.pb.validate.h"
7

            
8
#include "source/common/formatter/substitution_formatter.h"
9
#include "source/extensions/access_loggers/common/access_log_base.h"
10
#include "source/extensions/access_loggers/fluentd/substitution_formatter.h"
11
#include "source/extensions/common/fluentd/fluentd_base.h"
12

            
13
namespace Envoy {
14
namespace Extensions {
15
namespace AccessLoggers {
16
namespace Fluentd {
17

            
18
using namespace Envoy::Extensions::Common::Fluentd;
19
using FluentdAccessLogConfig =
20
    envoy::extensions::access_loggers::fluentd::v3::FluentdAccessLogConfig;
21
using FluentdAccessLogConfigSharedPtr = std::shared_ptr<FluentdAccessLogConfig>;
22

            
23
class FluentdAccessLoggerImpl : public FluentdBase {
24
public:
25
  FluentdAccessLoggerImpl(Upstream::ThreadLocalCluster& cluster, Tcp::AsyncTcpClientPtr client,
26
                          Event::Dispatcher& dispatcher, const FluentdAccessLogConfig& config,
27
                          BackOffStrategyPtr backoff_strategy, Stats::Scope& parent_scope);
28

            
29
  void packMessage(MessagePackPacker& packer);
30
};
31

            
32
using FluentdAccessLoggerWeakPtr = std::weak_ptr<FluentdService>;
33
using FluentdAccessLoggerSharedPtr = std::shared_ptr<FluentdService>;
34

            
35
class FluentdAccessLoggerCacheImpl
36
    : public FluentdCacheBase<FluentdAccessLoggerImpl, FluentdAccessLogConfig,
37
                              FluentdAccessLoggerSharedPtr, FluentdAccessLoggerWeakPtr>,
38
      public Singleton::Instance {
39
public:
40
  FluentdAccessLoggerCacheImpl(Upstream::ClusterManager& cluster_manager,
41
                               Stats::Scope& parent_scope, ThreadLocal::SlotAllocator& tls)
42
12
      : FluentdCacheBase(cluster_manager, parent_scope, tls, "access_logs.fluentd") {};
43

            
44
protected:
45
  FluentdAccessLoggerSharedPtr
46
  createInstance(Upstream::ThreadLocalCluster& cluster, Tcp::AsyncTcpClientPtr client,
47
                 Event::Dispatcher& dispatcher, const FluentdAccessLogConfig& config,
48
19
                 BackOffStrategyPtr backoff_strategy, Random::RandomGenerator&) override {
49
19
    return std::make_shared<FluentdAccessLoggerImpl>(cluster, std::move(client), dispatcher, config,
50
19
                                                     std::move(backoff_strategy), *stats_scope_);
51
19
  }
52
};
53

            
54
using FluentdAccessLoggerCacheSharedPtr =
55
    std::shared_ptr<FluentdCache<FluentdAccessLogConfig, FluentdAccessLoggerSharedPtr>>;
56

            
57
/**
58
 * Access log Instance that writes logs to a Fluentd.
59
 */
60
class FluentdAccessLog : public Common::ImplBase {
61
public:
62
  FluentdAccessLog(AccessLog::FilterPtr&& filter, FluentdFormatterPtr&& formatter,
63
                   const FluentdAccessLogConfigSharedPtr config, ThreadLocal::SlotAllocator& tls,
64
                   Random::RandomGenerator& random,
65
                   FluentdAccessLoggerCacheSharedPtr access_logger_cache);
66

            
67
private:
68
  /**
69
   * Per-thread cached logger.
70
   */
71
  struct ThreadLocalLogger : public ThreadLocal::ThreadLocalObject {
72
15
    ThreadLocalLogger(FluentdAccessLoggerSharedPtr logger) : logger_(std::move(logger)) {}
73

            
74
    const FluentdAccessLoggerSharedPtr logger_;
75
  };
76

            
77
  // Common::ImplBase
78
  void emitLog(const Formatter::Context& context,
79
               const StreamInfo::StreamInfo& stream_info) override;
80

            
81
  FluentdFormatterPtr formatter_;
82
  const ThreadLocal::SlotPtr tls_slot_;
83
  const FluentdAccessLogConfigSharedPtr config_;
84
  const FluentdAccessLoggerCacheSharedPtr access_logger_cache_;
85
};
86

            
87
} // namespace Fluentd
88
} // namespace AccessLoggers
89
} // namespace Extensions
90
} // namespace Envoy