1
#pragma once
2

            
3
#include <sys/types.h>
4

            
5
#include "envoy/buffer/buffer.h"
6
#include "envoy/common/backoff_strategy.h"
7
#include "envoy/event/dispatcher.h"
8
#include "envoy/network/connection.h"
9
#include "envoy/stats/scope.h"
10
#include "envoy/upstream/cluster_manager.h"
11

            
12
#include "source/common/buffer/buffer_impl.h"
13
#include "source/common/common/backoff_strategy.h"
14
#include "source/common/common/logger.h"
15

            
16
#include "msgpack.hpp"
17

            
18
namespace Envoy {
19
namespace Extensions {
20
namespace Common {
21
namespace Fluentd {
22

            
23
static constexpr uint64_t DefaultBaseBackoffIntervalMs = 500;
24
static constexpr uint64_t DefaultMaxBackoffIntervalFactor = 10;
25
static constexpr uint64_t DefaultBufferFlushIntervalMs = 1000;
26
static constexpr uint64_t DefaultMaxBufferSize = 16384;
27

            
28
using MessagePackBuffer = msgpack::sbuffer;
29

            
30
// Entry represents a single Fluentd message, msgpack format based, as specified in:
31
// https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#entry
32
class Entry {
33
public:
34
  Entry(const Entry&) = delete;
35
  Entry& operator=(const Entry&) = delete;
36
25
  Entry(uint64_t time, std::vector<uint8_t>&& record) : time_(time), record_(record) {}
37
  Entry(uint64_t time, std::map<std::string, std::string>&& map_record)
38
16
      : time_(time), map_record_(map_record) {}
39

            
40
  const uint64_t time_;
41
  const std::vector<uint8_t> record_;
42
  const std::map<std::string, std::string> map_record_;
43
};
44

            
45
using EntryPtr = std::unique_ptr<Entry>;
46

            
47
#define FLUENTD_STATS(COUNTER, GAUGE)                                                              \
48
54
  COUNTER(entries_lost)                                                                            \
49
54
  COUNTER(entries_buffered)                                                                        \
50
54
  COUNTER(events_sent)                                                                             \
51
54
  COUNTER(reconnect_attempts)                                                                      \
52
54
  COUNTER(connections_closed)
53

            
54
struct FluentdStats {
55
  FLUENTD_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT)
56
};
57

            
58
class FluentdService {
59
public:
60
55
  virtual ~FluentdService() = default;
61

            
62
  /**
63
   * Send the Fluentd formatted message over the upstream TCP connection.
64
   */
65
  virtual void log(EntryPtr&& entry) PURE;
66
};
67

            
68
using MessagePackBuffer = msgpack::sbuffer;
69
using MessagePackPacker = msgpack::packer<msgpack::sbuffer>;
70

            
71
class FluentdBase : public Tcp::AsyncTcpClientCallbacks,
72
                    public FluentdService,
73
                    public Logger::Loggable<Logger::Id::client> {
74
public:
75
  FluentdBase(Upstream::ThreadLocalCluster& cluster, Tcp::AsyncTcpClientPtr client,
76
              Event::Dispatcher& dispatcher, const std::string& tag,
77
              absl::optional<uint32_t> max_connect_attempts, Stats::Scope& parent_scope,
78
              const std::string& stat_prefix, BackOffStrategyPtr backoff_strategy,
79
              uint64_t buffer_flush_interval, uint64_t max_buffer_size);
80

            
81
54
  virtual ~FluentdBase() = default;
82

            
83
  // Tcp::AsyncTcpClientCallbacks
84
  void onEvent(Network::ConnectionEvent event) override;
85
2
  void onAboveWriteBufferHighWatermark() override {}
86
2
  void onBelowWriteBufferLowWatermark() override {}
87
2
  void onData(Buffer::Instance&, bool) override {}
88

            
89
  // FluentdService
90
  void log(EntryPtr&& entry) override;
91

            
92
protected:
93
  void flush();
94
  void connect();
95
  void maybeReconnect();
96
  void onBackoffCallback();
97
  void setDisconnected();
98
  void clearBuffer();
99
  virtual void packMessage(MessagePackPacker& packer) = 0;
100

            
101
  bool disconnected_ = false;
102
  bool connecting_ = false;
103
  std::string tag_;
104
  std::string id_;
105
  uint32_t connect_attempts_{0};
106
  absl::optional<uint32_t> max_connect_attempts_;
107
  const Stats::ScopeSharedPtr stats_scope_;
108
  Upstream::ThreadLocalCluster& cluster_;
109
  const BackOffStrategyPtr backoff_strategy_;
110
  const Tcp::AsyncTcpClientPtr client_;
111
  const std::chrono::milliseconds buffer_flush_interval_msec_;
112
  const uint64_t max_buffer_size_bytes_;
113
  const Event::TimerPtr retry_timer_;
114
  const Event::TimerPtr flush_timer_;
115
  FluentdStats fluentd_stats_;
116
  std::vector<EntryPtr> entries_;
117
  uint64_t approximate_message_size_bytes_ = 0;
118
};
119

            
120
template <typename ConfigType, typename SharedPtrType> class FluentdCache {
121
public:
122
25
  virtual ~FluentdCache() = default;
123

            
124
  virtual SharedPtrType getOrCreate(const std::shared_ptr<ConfigType>& config,
125
                                    Random::RandomGenerator& random,
126
                                    BackOffStrategyPtr backoff_strategy) = 0;
127
};
128

            
129
template <typename T, typename ConfigType, typename SharedPtrType, typename WeakPtrType>
130
class FluentdCacheBase : public FluentdCache<ConfigType, SharedPtrType> {
131
public:
132
  FluentdCacheBase(Upstream::ClusterManager& cluster_manager, Stats::Scope& parent_scope,
133
                   ThreadLocal::SlotAllocator& tls, const std::string& scope_prefix)
134
24
      : cluster_manager_(cluster_manager), stats_scope_(parent_scope.createScope(scope_prefix)),
135
24
        tls_slot_(tls.allocateSlot()) {
136
31
    tls_slot_->set([](Event::Dispatcher& dispatcher) {
137
31
      return std::make_shared<ThreadLocalCache>(dispatcher);
138
31
    });
139
24
  }
140

            
141
24
  virtual ~FluentdCacheBase() = default;
142

            
143
  SharedPtrType getOrCreate(const std::shared_ptr<ConfigType>& config,
144
                            Random::RandomGenerator& random,
145
33
                            BackOffStrategyPtr backoff_strategy) override {
146
33
    auto& cache = tls_slot_->getTyped<ThreadLocalCache>();
147
33
    const auto cache_key = MessageUtil::hash(*config);
148
33
    auto it = cache.instances_.find(cache_key);
149
33
    if (it != cache.instances_.end() && !it->second.expired()) {
150
2
      return it->second.lock();
151
2
    }
152

            
153
31
    auto* cluster = cluster_manager_.getThreadLocalCluster(config->cluster());
154
31
    if (!cluster) {
155
2
      return nullptr;
156
2
    }
157

            
158
29
    auto client =
159
29
        cluster->tcpAsyncClient(nullptr, std::make_shared<const Tcp::AsyncTcpClientOptions>(false));
160

            
161
29
    auto instance = createInstance(*cluster, std::move(client), cache.dispatcher_, *config,
162
29
                                   std::move(backoff_strategy), random);
163
29
    cache.instances_.emplace(cache_key, instance);
164
29
    return instance;
165
31
  }
166

            
167
protected:
168
  virtual SharedPtrType createInstance(Upstream::ThreadLocalCluster& cluster,
169
                                       Tcp::AsyncTcpClientPtr client, Event::Dispatcher& dispatcher,
170
                                       const ConfigType& config,
171
                                       BackOffStrategyPtr backoff_strategy,
172
                                       Random::RandomGenerator& random) = 0;
173

            
174
  struct ThreadLocalCache : public ThreadLocal::ThreadLocalObject {
175
31
    ThreadLocalCache(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
176

            
177
    Event::Dispatcher& dispatcher_;
178
    // Tracers indexed by the hash of tracer's configuration.
179
    absl::flat_hash_map<std::size_t, WeakPtrType> instances_;
180
  };
181

            
182
  Upstream::ClusterManager& cluster_manager_;
183
  Stats::ScopeSharedPtr stats_scope_;
184
  ThreadLocal::SlotPtr tls_slot_;
185
};
186

            
187
} // namespace Fluentd
188
} // namespace Common
189
} // namespace Extensions
190
} // namespace Envoy