1
#include "source/extensions/common/fluentd/fluentd_base.h"
2

            
3
namespace Envoy {
4
namespace Extensions {
5
namespace Common {
6
namespace Fluentd {
7

            
8
FluentdBase::FluentdBase(Upstream::ThreadLocalCluster& cluster, Tcp::AsyncTcpClientPtr client,
9
                         Event::Dispatcher& dispatcher, const std::string& tag,
10
                         absl::optional<uint32_t> max_connect_attempts, Stats::Scope& parent_scope,
11
                         const std::string& stat_prefix, BackOffStrategyPtr backoff_strategy,
12
                         uint64_t buffer_flush_interval, uint64_t max_buffer_size)
13
54
    : tag_(tag), id_(dispatcher.name()), max_connect_attempts_(max_connect_attempts),
14
54
      stats_scope_(parent_scope.createScope(stat_prefix)), cluster_(cluster),
15
54
      backoff_strategy_(std::move(backoff_strategy)), client_(std::move(client)),
16
54
      buffer_flush_interval_msec_(buffer_flush_interval), max_buffer_size_bytes_(max_buffer_size),
17
54
      retry_timer_(dispatcher.createTimer([this]() { onBackoffCallback(); })),
18
54
      flush_timer_(dispatcher.createTimer([this]() {
19
10
        flush();
20
10
        flush_timer_->enableTimer(buffer_flush_interval_msec_);
21
10
      })),
22
54
      fluentd_stats_({FLUENTD_STATS(POOL_COUNTER(*stats_scope_), POOL_GAUGE(*stats_scope_))}) {
23
54
  client_->setAsyncTcpClientCallbacks(*this);
24
54
  flush_timer_->enableTimer(buffer_flush_interval_msec_);
25
54
}
26

            
27
39
void FluentdBase::onEvent(Network::ConnectionEvent event) {
28
39
  connecting_ = false;
29

            
30
39
  if (event == Network::ConnectionEvent::Connected) {
31
15
    backoff_strategy_->reset();
32
15
    retry_timer_->disableTimer();
33
15
    flush();
34
24
  } else if (event == Network::ConnectionEvent::LocalClose ||
35
24
             event == Network::ConnectionEvent::RemoteClose) {
36
24
    ENVOY_LOG(debug, "upstream connection was closed");
37
24
    fluentd_stats_.connections_closed_.inc();
38
24
    maybeReconnect();
39
24
  }
40
39
}
41

            
42
40
void FluentdBase::log(EntryPtr&& entry) {
43
40
  if (disconnected_ || approximate_message_size_bytes_ >= max_buffer_size_bytes_) {
44
4
    fluentd_stats_.entries_lost_.inc();
45
    // We will lose the data deliberately so the buffer doesn't grow infinitely.
46
4
    return;
47
4
  }
48

            
49
36
  approximate_message_size_bytes_ +=
50
36
      sizeof(entry->time_) + entry->record_.size() + entry->map_record_.size();
51
36
  entries_.push_back(std::move(entry));
52
36
  fluentd_stats_.entries_buffered_.inc();
53
36
  if (approximate_message_size_bytes_ >= max_buffer_size_bytes_) {
54
    // If we exceeded the buffer limit, immediately flush the logs instead of waiting for
55
    // the next flush interval, to allow new logs to be buffered.
56
24
    flush();
57
24
  }
58
36
}
59

            
60
49
void FluentdBase::flush() {
61
49
  ASSERT(!disconnected_);
62

            
63
49
  if (entries_.empty() || connecting_) {
64
7
    return; // Nothing to send or waiting for an upstream connection
65
7
  }
66

            
67
42
  if (!client_->connected()) {
68
28
    connect();
69
28
    return;
70
28
  }
71

            
72
  // Pack the message using the derived class implementation
73
14
  MessagePackBuffer buffer;
74
14
  MessagePackPacker packer(buffer);
75
14
  packMessage(packer);
76
14
  Buffer::OwnedImpl data(buffer.data(), buffer.size());
77

            
78
14
  client_->write(data, false);
79
14
  fluentd_stats_.events_sent_.inc();
80
14
  clearBuffer();
81
14
}
82

            
83
40
void FluentdBase::connect() {
84
40
  connect_attempts_++;
85
40
  if (!client_->connect()) {
86
5
    ENVOY_LOG(debug, "no healthy upstream");
87
5
    maybeReconnect();
88
5
    return;
89
5
  }
90

            
91
35
  connecting_ = true;
92
35
}
93

            
94
29
void FluentdBase::maybeReconnect() {
95
29
  if (max_connect_attempts_.has_value() && connect_attempts_ >= max_connect_attempts_) {
96
14
    ENVOY_LOG(debug, "max connection attempts reached");
97
14
    cluster_.info()->trafficStats()->upstream_cx_connect_attempts_exceeded_.inc();
98
14
    setDisconnected();
99
14
    return;
100
14
  }
101

            
102
15
  uint64_t next_backoff_ms = backoff_strategy_->nextBackOffMs();
103
15
  retry_timer_->enableTimer(std::chrono::milliseconds(next_backoff_ms));
104
15
  ENVOY_LOG(debug, "reconnect attempt scheduled for {} ms", next_backoff_ms);
105
15
}
106

            
107
12
void FluentdBase::onBackoffCallback() {
108
12
  fluentd_stats_.reconnect_attempts_.inc();
109
12
  this->connect();
110
12
}
111

            
112
14
void FluentdBase::setDisconnected() {
113
14
  disconnected_ = true;
114
14
  clearBuffer();
115
14
  ASSERT(flush_timer_ != nullptr);
116
14
  flush_timer_->disableTimer();
117
14
}
118

            
119
28
void FluentdBase::clearBuffer() {
120
28
  entries_.clear();
121
28
  approximate_message_size_bytes_ = 0;
122
28
}
123

            
124
} // namespace Fluentd
125
} // namespace Common
126
} // namespace Extensions
127
} // namespace Envoy