1
#include "source/common/access_log/access_log_manager_impl.h"
2

            
3
#include <cstdint>
4
#include <string>
5

            
6
#include "envoy/common/exception.h"
7

            
8
#include "source/common/common/assert.h"
9
#include "source/common/common/fmt.h"
10
#include "source/common/common/lock_guard.h"
11

            
12
#include "absl/container/fixed_array.h"
13

            
14
namespace Envoy {
15
namespace AccessLog {
16
namespace {
17
static constexpr Filesystem::FlagSet default_flags{1 << Filesystem::File::Operation::Write |
18
                                                   1 << Filesystem::File::Operation::Create |
19
                                                   1 << Filesystem::File::Operation::Append};
20
} // namespace
21

            
22
10776
AccessLogManagerImpl::~AccessLogManagerImpl() {
23
12431
  for (auto& [log_key, log_file_ptr] : access_logs_) {
24
12287
    ENVOY_LOG(debug, "destroying access logger {}", log_key);
25
12287
    log_file_ptr.reset();
26
12287
  }
27
10776
  ENVOY_LOG(debug, "destroyed access loggers");
28
10776
}
29

            
30
1
void AccessLogManagerImpl::reopen() {
31
2
  for (auto& iter : access_logs_) {
32
2
    iter.second->reopen();
33
2
  }
34
1
}
35

            
36
absl::StatusOr<AccessLogFileSharedPtr>
37
20174
AccessLogManagerImpl::createAccessLog(const Filesystem::FilePathAndType& file_info) {
38
20174
  auto file = api_.fileSystem().createFile(file_info);
39
20174
  absl::string_view file_name = file->path();
40
20174
  if (const auto it = access_logs_.find(file_name); it != access_logs_.end()) {
41
7885
    return it->second;
42
7885
  }
43

            
44
12289
  Api::IoCallBoolResult open_result = file->open(default_flags);
45
12289
  if (!open_result.ok()) {
46
2
    return absl::InvalidArgumentError(fmt::format("unable to open file '{}': {}", file_name,
47
2
                                                  open_result.err_->getErrorDetails()));
48
2
  }
49

            
50
12287
  auto [it, insert_success] = access_logs_.emplace(
51
12287
      file_name, std::make_shared<AccessLogFileImpl>(
52
12287
                     std::move(file), dispatcher_, lock_, file_stats_, file_flush_interval_msec_,
53
12287
                     file_min_flush_size_kb_, api_.threadFactory()));
54
  // Insertion was successful because the key wasn't found in the map or else
55
  // the value would have been previously returned.
56
12287
  ASSERT(insert_success);
57
12287
  return it->second;
58
12289
}
59

            
60
AccessLogFileImpl::AccessLogFileImpl(Filesystem::FilePtr&& file, Event::Dispatcher& dispatcher,
61
                                     Thread::BasicLockable& lock, AccessLogFileStats& stats,
62
                                     std::chrono::milliseconds flush_interval_msec,
63
                                     uint64_t min_flush_size_kb,
64
                                     Thread::ThreadFactory& thread_factory)
65
12287
    : file_(std::move(file)), file_lock_(lock),
66
6012128
      flush_timer_(dispatcher.createTimer([this]() -> void {
67
6011797
        stats_.flushed_by_timer_.inc();
68
6011797
        flush_event_.notifyOne();
69
6011797
        flush_timer_->enableTimer(flush_interval_msec_);
70
6011797
      })),
71
12287
      thread_factory_(thread_factory), flush_interval_msec_(flush_interval_msec),
72
12287
      min_flush_size_(min_flush_size_kb * 1024), stats_(stats) {
73
12287
  flush_timer_->enableTimer(flush_interval_msec_);
74
12287
}
75

            
76
6
void AccessLogFileImpl::reopen() {
77
6
  Thread::LockGuard lock(write_lock_);
78
6
  reopen_file_ = true;
79
6
  flush_event_.notifyOne();
80
6
}
81

            
82
12287
AccessLogFileImpl::~AccessLogFileImpl() {
83
12287
  {
84
12287
    Thread::LockGuard lock(write_lock_);
85
12287
    flush_thread_exit_ = true;
86
12287
    flush_event_.notifyOne();
87
12287
  }
88

            
89
12287
  if (flush_thread_ != nullptr) {
90
8487
    flush_thread_->join();
91
8487
  }
92

            
93
  // Flush any remaining data. If file was not opened for some reason, skip flushing part.
94
12287
  if (file_->isOpen()) {
95
12287
    if (flush_buffer_.length() > 0) {
96
117
      doWrite(flush_buffer_);
97
117
    }
98
12287
    const Api::IoCallBoolResult result = file_->close();
99
12287
    ASSERT(result.return_value_, fmt::format("unable to close file '{}': {}", file_->path(),
100
12287
                                             result.err_->getErrorDetails()));
101
12287
  }
102
12287
}
103

            
104
13995
void AccessLogFileImpl::doWrite(Buffer::Instance& buffer) {
105
13995
  Buffer::RawSliceVector slices = buffer.getRawSlices();
106

            
107
  // We must do the actual writes to disk under lock, so that we don't intermix chunks from
108
  // different AccessLogFileImpl pointing to the same underlying file. This can happen either via
109
  // hot restart or if calling code opens the same underlying file into a different
110
  // AccessLogFileImpl in the same process.
111
  // TODO PERF: Currently, we use a single cross process lock to serialize all disk writes. This
112
  //            will never block network workers, but does mean that only a single flush thread can
113
  //            actually flush to disk. In the future it would be nice if we did away with the cross
114
  //            process lock or had multiple locks.
115
13995
  {
116
13995
    Thread::LockGuard lock(file_lock_);
117
15408
    for (const Buffer::RawSlice& slice : slices) {
118
15405
      absl::string_view data(static_cast<char*>(slice.mem_), slice.len_);
119
15405
      const Api::IoCallSizeResult result = file_->write(data);
120
15405
      if (result.ok() && result.return_value_ == static_cast<ssize_t>(slice.len_)) {
121
15404
        stats_.write_completed_.inc();
122
15404
      } else {
123
        // Probably disk full.
124
1
        stats_.write_failed_.inc();
125
1
      }
126
15405
    }
127
13995
  }
128

            
129
13995
  stats_.write_total_buffered_.sub(buffer.length());
130
13995
  buffer.drain(buffer.length());
131
13995
}
132

            
133
8487
void AccessLogFileImpl::flushThreadFunc() {
134

            
135
  // Transfer the action from `reopen_file_` to this variable so that `reopen_file_` is only
136
  // accessed while holding the mutex while the actual operation is performed while not holding the
137
  // mutex.
138
8487
  bool do_reopen = false;
139

            
140
22362
  while (true) {
141
22362
    std::unique_lock<Thread::BasicLockable> flush_lock;
142

            
143
22362
    {
144
22362
      Thread::LockGuard write_lock(write_lock_);
145

            
146
      // flush_event_ can be woken up either by large enough flush_buffer or by timer.
147
      // In case it was timer, flush_buffer_ can be empty.
148
      //
149
      // Note: do not stop waiting when only `do_reopen` is true. In this case, we tried to
150
      // reopen and failed. We don't want to retry this in a tight loop, so wait for the next
151
      // event (timer or flush).
152
77981
      while (flush_buffer_.length() == 0 && !flush_thread_exit_ && !reopen_file_) {
153
        // CondVar::wait() does not throw, so it's safe to pass the mutex rather than the guard.
154
55619
        flush_event_.wait(write_lock_);
155
55619
      }
156

            
157
22362
      if (flush_thread_exit_) {
158
8487
        return;
159
8487
      }
160

            
161
13875
      flush_lock = std::unique_lock<Thread::BasicLockable>(flush_lock_);
162
13875
      about_to_write_buffer_.move(flush_buffer_);
163
13875
      ASSERT(flush_buffer_.length() == 0);
164

            
165
13875
      if (reopen_file_) {
166
6
        do_reopen = true;
167
6
        reopen_file_ = false;
168
6
      }
169
13875
    }
170

            
171
13875
    if (do_reopen) {
172
7
      if (file_->isOpen()) {
173
5
        const Api::IoCallBoolResult result = file_->close();
174
5
        ASSERT(result.return_value_, fmt::format("unable to close file '{}': {}", file_->path(),
175
5
                                                 result.err_->getErrorDetails()));
176
5
      }
177
7
      const Api::IoCallBoolResult open_result = file_->open(default_flags);
178
7
      if (!open_result.return_value_) {
179
2
        stats_.reopen_failed_.inc();
180
5
      } else {
181
5
        do_reopen = false;
182
5
      }
183
7
    }
184
    // doWrite no matter file isOpen, if not, we can drain buffer
185
13875
    doWrite(about_to_write_buffer_);
186
13875
  }
187
8487
}
188

            
189
4
void AccessLogFileImpl::flush() {
190
4
  std::unique_lock<Thread::BasicLockable> flush_buffer_lock;
191

            
192
4
  {
193
4
    Thread::LockGuard write_lock(write_lock_);
194

            
195
    // flush_lock_ must be held while checking this or else it is
196
    // possible that flushThreadFunc() has already moved data from
197
    // flush_buffer_ to about_to_write_buffer_, has unlocked write_lock_,
198
    // but has not yet completed doWrite(). This would allow flush() to
199
    // return before the pending data has actually been written to disk.
200
4
    flush_buffer_lock = std::unique_lock<Thread::BasicLockable>(flush_lock_);
201

            
202
4
    if (flush_buffer_.length() == 0) {
203
1
      return;
204
1
    }
205

            
206
3
    about_to_write_buffer_.move(flush_buffer_);
207
3
    ASSERT(flush_buffer_.length() == 0);
208
3
  }
209

            
210
  doWrite(about_to_write_buffer_);
211
3
}
212

            
213
74069
void AccessLogFileImpl::write(absl::string_view data) {
214
74069
  Thread::LockGuard lock(write_lock_);
215

            
216
74069
  if (flush_thread_ == nullptr) {
217
8487
    createFlushStructures();
218
8487
  }
219

            
220
74069
  stats_.write_buffered_.inc();
221
74069
  stats_.write_total_buffered_.add(data.length());
222
74069
  flush_buffer_.add(data.data(), data.size());
223
74069
  if (flush_buffer_.length() > min_flush_size_) {
224
49
    flush_event_.notifyOne();
225
49
  }
226
74069
}
227

            
228
8487
void AccessLogFileImpl::createFlushStructures() {
229
8487
  flush_thread_ = thread_factory_.createThread([this]() -> void { flushThreadFunc(); },
230
8487
                                               Thread::Options{"AccessLogFlush"});
231
8487
}
232

            
233
} // namespace AccessLog
234
} // namespace Envoy