1
#pragma once
2

            
3
#include <sys/types.h>
4

            
5
#include <string>
6

            
7
#include "envoy/access_log/access_log.h"
8
#include "envoy/api/api.h"
9
#include "envoy/event/dispatcher.h"
10
#include "envoy/filesystem/filesystem.h"
11
#include "envoy/stats/stats_macros.h"
12
#include "envoy/stats/store.h"
13

            
14
#include "source/common/buffer/buffer_impl.h"
15
#include "source/common/common/logger.h"
16
#include "source/common/common/thread.h"
17

            
18
#include "absl/container/node_hash_map.h"
19

            
20
namespace Envoy {
21

            
22
#define ACCESS_LOG_FILE_STATS(COUNTER, GAUGE)                                                      \
23
10776
  COUNTER(flushed_by_timer)                                                                        \
24
10776
  COUNTER(reopen_failed)                                                                           \
25
10776
  COUNTER(write_buffered)                                                                          \
26
10776
  COUNTER(write_completed)                                                                         \
27
10776
  COUNTER(write_failed)                                                                            \
28
10776
  GAUGE(write_total_buffered, Accumulate)
29

            
30
struct AccessLogFileStats {
31
  ACCESS_LOG_FILE_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT)
32
};
33

            
34
namespace AccessLog {
35

            
36
class AccessLogManagerImpl : public AccessLogManager, Logger::Loggable<Logger::Id::main> {
37
public:
38
  AccessLogManagerImpl(std::chrono::milliseconds file_flush_interval_msec,
39
                       uint64_t min_flush_size_kb, Api::Api& api, Event::Dispatcher& dispatcher,
40
                       Thread::BasicLockable& lock, Stats::Store& stats_store)
41
10776
      : file_flush_interval_msec_(file_flush_interval_msec),
42
10776
        file_min_flush_size_kb_(min_flush_size_kb), api_(api), dispatcher_(dispatcher), lock_(lock),
43
10776
        file_stats_{ACCESS_LOG_FILE_STATS(POOL_COUNTER_PREFIX(stats_store, "filesystem."),
44
10776
                                          POOL_GAUGE_PREFIX(stats_store, "filesystem."))} {}
45
  ~AccessLogManagerImpl() override;
46

            
47
  // AccessLog::AccessLogManager
48
  void reopen() override;
49
  absl::StatusOr<AccessLogFileSharedPtr>
50
  createAccessLog(const Filesystem::FilePathAndType& file_info) override;
51

            
52
private:
53
  const std::chrono::milliseconds file_flush_interval_msec_;
54
  const uint64_t file_min_flush_size_kb_{64};
55
  Api::Api& api_;
56
  Event::Dispatcher& dispatcher_;
57
  Thread::BasicLockable& lock_;
58
  AccessLogFileStats file_stats_;
59
  absl::node_hash_map<std::string, AccessLogFileSharedPtr> access_logs_;
60
};
61

            
62
/**
63
 * This is a file implementation geared for writing out access logs. It turn out that in certain
64
 * cases even if a standard file is opened with O_NONBLOCK, the kernel can still block when writing.
65
 * This implementation uses a flush thread per file, with the idea there aren't that many
66
 * files. If this turns out to be a good implementation we can potentially have a single flush
67
 * thread that flushes all files, but we will start with this.
68
 */
69
class AccessLogFileImpl : public AccessLogFile {
70
public:
71
  AccessLogFileImpl(Filesystem::FilePtr&& file, Event::Dispatcher& dispatcher,
72
                    Thread::BasicLockable& lock, AccessLogFileStats& stats,
73
                    std::chrono::milliseconds flush_interval_msec, uint64_t min_flush_size_kb,
74
                    Thread::ThreadFactory& thread_factory);
75
  ~AccessLogFileImpl() override;
76

            
77
  // AccessLog::AccessLogFile
78
  void write(absl::string_view data) override;
79

            
80
  /**
81
   * Reopen file asynchronously.
82
   * This only sets reopen flag, actual reopen operation is delayed.
83
   * Reopen happens before the next write operation.
84
   */
85
  void reopen() override;
86
  void flush() override;
87

            
88
private:
89
  void doWrite(Buffer::Instance& buffer);
90
  void flushThreadFunc();
91
  void createFlushStructures();
92

            
93
  Filesystem::FilePtr file_;
94

            
95
  // These locks are always acquired in the following order if multiple locks are held:
96
  //    1) write_lock_
97
  //    2) flush_lock_
98
  //    3) file_lock_
99
  Thread::BasicLockable& file_lock_;      // This lock is used only by the flush thread when writing
100
                                          // to disk. This is used to make sure that file blocks do
101
                                          // not get interleaved by multiple processes writing to
102
                                          // the same file during hot-restart.
103
  Thread::MutexBasicLockable flush_lock_; // This lock is used to prevent simultaneous flushes from
104
                                          // the flush thread and a synchronous flush. This protects
105
                                          // concurrent access to the about_to_write_buffer_, fd_,
106
                                          // and all other data used during flushing and file
107
                                          // re-opening.
108
  Thread::MutexBasicLockable
109
      write_lock_; // The lock is used when filling the flush buffer. It allows
110
                   // multiple threads to write to the same file at relatively
111
                   // high performance. It is always local to the process.
112
  Thread::ThreadPtr flush_thread_;
113
  Thread::CondVar flush_event_;
114
  bool flush_thread_exit_ ABSL_GUARDED_BY(write_lock_){false};
115
  bool reopen_file_ ABSL_GUARDED_BY(write_lock_){false};
116
  Buffer::OwnedImpl
117
      flush_buffer_ ABSL_GUARDED_BY(write_lock_); // This buffer is used by multiple threads. It
118
                                                  // gets filled and then flushed either when max
119
                                                  // size is reached or when a timer fires.
120
  // TODO(jmarantz): this should be ABSL_GUARDED_BY(flush_lock_) but the analysis cannot poke
121
  // through the std::make_unique assignment. I do not believe it's possible to annotate this
122
  // properly now due to limitations in the clang thread annotation analysis.
123
  Buffer::OwnedImpl about_to_write_buffer_; // This buffer is used only by the flush thread. Data
124
                                            // is moved from flush_buffer_ under lock, and then
125
                                            // the lock is released so that flush_buffer_ can
126
                                            // continue to fill. This buffer is then used for the
127
                                            // final write to disk.
128
  Event::TimerPtr flush_timer_;
129
  Thread::ThreadFactory& thread_factory_;
130
  const std::chrono::milliseconds flush_interval_msec_; // Time interval buffer gets flushed no
131
                                                        // matter if it reached the MIN_FLUSH_SIZE
132
                                                        // or not.
133
  const uint64_t min_flush_size_{
134
      64 * 1024}; // Minimum size before the flush thread will be told to flush.
135
  AccessLogFileStats& stats_;
136
};
137

            
138
} // namespace AccessLog
139
} // namespace Envoy