Coverage Report

Created: 2024-07-27 06:53

/src/rocksdb/db/write_thread.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#pragma once
7
8
#include <atomic>
9
#include <cassert>
10
#include <chrono>
11
#include <condition_variable>
12
#include <cstdint>
13
#include <mutex>
14
#include <type_traits>
15
#include <vector>
16
17
#include "db/dbformat.h"
18
#include "db/post_memtable_callback.h"
19
#include "db/pre_release_callback.h"
20
#include "db/write_callback.h"
21
#include "monitoring/instrumented_mutex.h"
22
#include "rocksdb/options.h"
23
#include "rocksdb/status.h"
24
#include "rocksdb/types.h"
25
#include "rocksdb/user_write_callback.h"
26
#include "rocksdb/write_batch.h"
27
#include "util/aligned_storage.h"
28
#include "util/autovector.h"
29
30
namespace ROCKSDB_NAMESPACE {
31
32
class WriteThread {
33
 public:
34
  enum State : uint8_t {
35
    // The initial state of a writer.  This is a Writer that is
36
    // waiting in JoinBatchGroup.  This state can be left when another
37
    // thread informs the waiter that it has become a group leader
38
    // (-> STATE_GROUP_LEADER), when a leader that has chosen to be
39
    // non-parallel informs a follower that its writes have been committed
40
    // (-> STATE_COMPLETED), or when a leader that has chosen to perform
41
    // updates in parallel and needs this Writer to apply its batch (->
42
    // STATE_PARALLEL_MEMTABLE_WRITER).
43
    STATE_INIT = 1,
44
45
    // The state used to inform a waiting Writer that it has become the
46
    // leader, and it should now build a write batch group.  Tricky:
47
    // this state is not used if newest_writer_ is empty when a writer
48
    // enqueues itself, because there is no need to wait (or even to
49
    // create the mutex and condvar used to wait) in that case.  This is
50
    // a terminal state unless the leader chooses to make this a parallel
51
    // batch, in which case the last parallel worker to finish will move
52
    // the leader to STATE_COMPLETED.
53
    STATE_GROUP_LEADER = 2,
54
55
    // The state used to inform a waiting writer that it has become the
56
    // leader of memtable writer group. The leader will either write
57
    // memtable for the whole group, or launch a parallel group write
58
    // to memtable by calling LaunchParallelMemTableWrite.
59
    STATE_MEMTABLE_WRITER_LEADER = 4,
60
61
    // The state used to inform a waiting writer that it has become a
62
    // parallel memtable writer. It can be the group leader who launch the
63
    // parallel writer group, or one of the followers. The writer should then
64
    // apply its batch to the memtable concurrently and call
65
    // CompleteParallelMemTableWriter.
66
    STATE_PARALLEL_MEMTABLE_WRITER = 8,
67
68
    // A follower whose writes have been applied, or a parallel leader
69
    // whose followers have all finished their work.  This is a terminal
70
    // state.
71
    STATE_COMPLETED = 16,
72
73
    // A state indicating that the thread may be waiting using StateMutex()
74
    // and StateCondVar()
75
    STATE_LOCKED_WAITING = 32,
76
77
    // The state used to inform a waiting writer that it has become a
78
    // caller to call some other waiting writers to write to memtable
79
    // by calling SetMemWritersEachStride. After doing
80
    // this, it will also write to memtable.
81
    STATE_PARALLEL_MEMTABLE_CALLER = 64,
82
  };
83
84
  struct Writer;
85
86
  struct WriteGroup {
87
    Writer* leader = nullptr;
88
    Writer* last_writer = nullptr;
89
    SequenceNumber last_sequence;
90
    // before running goes to zero, status needs leader->StateMutex()
91
    Status status;
92
    std::atomic<size_t> running;
93
    size_t size = 0;
94
95
    struct Iterator {
96
      Writer* writer;
97
      Writer* last_writer;
98
99
      explicit Iterator(Writer* w, Writer* last)
100
13.8M
          : writer(w), last_writer(last) {}
101
102
6.91M
      Writer* operator*() const { return writer; }
103
104
6.91M
      Iterator& operator++() {
105
6.91M
        assert(writer != nullptr);
106
6.91M
        if (writer == last_writer) {
107
6.91M
          writer = nullptr;
108
6.91M
        } else {
109
0
          writer = writer->link_newer;
110
0
        }
111
6.91M
        return *this;
112
6.91M
      }
113
114
13.8M
      bool operator!=(const Iterator& other) const {
115
13.8M
        return writer != other.writer;
116
13.8M
      }
117
    };
118
119
6.91M
    Iterator begin() const { return Iterator(leader, last_writer); }
120
6.91M
    Iterator end() const { return Iterator(nullptr, nullptr); }
121
  };
122
123
  // Information kept for every waiting writer.
124
  struct Writer {
125
    WriteBatch* batch;
126
    bool sync;
127
    bool no_slowdown;
128
    bool disable_wal;
129
    Env::IOPriority rate_limiter_priority;
130
    bool disable_memtable;
131
    size_t batch_cnt;  // if non-zero, number of sub-batches in the write batch
132
    size_t protection_bytes_per_key;
133
    PreReleaseCallback* pre_release_callback;
134
    PostMemTableCallback* post_memtable_callback;
135
    uint64_t log_used;  // log number that this batch was inserted into
136
    uint64_t log_ref;   // log number that memtable insert should reference
137
    WriteCallback* callback;
138
    UserWriteCallback* user_write_cb;
139
    bool made_waitable;          // records lazy construction of mutex and cv
140
    std::atomic<uint8_t> state;  // write under StateMutex() or pre-link
141
    WriteGroup* write_group;
142
    SequenceNumber sequence;  // the sequence number to use for the first key
143
    Status status;
144
    Status callback_status;  // status returned by callback->Callback()
145
146
    aligned_storage<std::mutex>::type state_mutex_bytes;
147
    aligned_storage<std::condition_variable>::type state_cv_bytes;
148
    Writer* link_older;  // read/write only before linking, or as leader
149
    Writer* link_newer;  // lazy, read/write only before linking, or as leader
150
151
    Writer()
152
        : batch(nullptr),
153
          sync(false),
154
          no_slowdown(false),
155
          disable_wal(false),
156
          rate_limiter_priority(Env::IOPriority::IO_TOTAL),
157
          disable_memtable(false),
158
          batch_cnt(0),
159
          protection_bytes_per_key(0),
160
          pre_release_callback(nullptr),
161
          post_memtable_callback(nullptr),
162
          log_used(0),
163
          log_ref(0),
164
          callback(nullptr),
165
          user_write_cb(nullptr),
166
          made_waitable(false),
167
          state(STATE_INIT),
168
          write_group(nullptr),
169
          sequence(kMaxSequenceNumber),
170
          link_older(nullptr),
171
60.4k
          link_newer(nullptr) {}
172
173
    Writer(const WriteOptions& write_options, WriteBatch* _batch,
174
           WriteCallback* _callback, UserWriteCallback* _user_write_cb,
175
           uint64_t _log_ref, bool _disable_memtable, size_t _batch_cnt = 0,
176
           PreReleaseCallback* _pre_release_callback = nullptr,
177
           PostMemTableCallback* _post_memtable_callback = nullptr)
178
        : batch(_batch),
179
          // TODO: store a copy of WriteOptions instead of its seperated data
180
          // members
181
          sync(write_options.sync),
182
          no_slowdown(write_options.no_slowdown),
183
          disable_wal(write_options.disableWAL),
184
          rate_limiter_priority(write_options.rate_limiter_priority),
185
          disable_memtable(_disable_memtable),
186
          batch_cnt(_batch_cnt),
187
          protection_bytes_per_key(_batch->GetProtectionBytesPerKey()),
188
          pre_release_callback(_pre_release_callback),
189
          post_memtable_callback(_post_memtable_callback),
190
          log_used(0),
191
          log_ref(_log_ref),
192
          callback(_callback),
193
          user_write_cb(_user_write_cb),
194
          made_waitable(false),
195
          state(STATE_INIT),
196
          write_group(nullptr),
197
          sequence(kMaxSequenceNumber),
198
          link_older(nullptr),
199
1.38M
          link_newer(nullptr) {}
200
201
1.44M
    ~Writer() {
202
1.44M
      if (made_waitable) {
203
0
        StateMutex().~mutex();
204
0
        StateCV().~condition_variable();
205
0
      }
206
1.44M
      status.PermitUncheckedError();
207
1.44M
      callback_status.PermitUncheckedError();
208
1.44M
    }
209
210
1.38M
    bool CheckCallback(DB* db) {
211
1.38M
      if (callback != nullptr) {
212
0
        callback_status = callback->Callback(db);
213
0
      }
214
1.38M
      return callback_status.ok();
215
1.38M
    }
216
217
1.38M
    void CheckWriteEnqueuedCallback() {
218
1.38M
      if (user_write_cb != nullptr) {
219
0
        user_write_cb->OnWriteEnqueued();
220
0
      }
221
1.38M
    }
222
223
1.38M
    void CheckPostWalWriteCallback() {
224
1.38M
      if (user_write_cb != nullptr) {
225
0
        user_write_cb->OnWalWriteFinish();
226
0
      }
227
1.38M
    }
228
229
0
    void CreateMutex() {
230
0
      if (!made_waitable) {
231
        // Note that made_waitable is tracked separately from state
232
        // transitions, because we can't atomically create the mutex and
233
        // link into the list.
234
0
        made_waitable = true;
235
0
        new (&state_mutex_bytes) std::mutex;
236
0
        new (&state_cv_bytes) std::condition_variable;
237
0
      }
238
0
    }
239
240
    // returns the aggregate status of this Writer
241
1.38M
    Status FinalStatus() {
242
1.38M
      if (!status.ok()) {
243
        // a non-ok memtable write status takes presidence
244
0
        assert(callback == nullptr || callback_status.ok());
245
0
        return status;
246
1.38M
      } else if (!callback_status.ok()) {
247
        // if the callback failed then that is the status we want
248
        // because a memtable insert should not have been attempted
249
0
        assert(callback != nullptr);
250
0
        assert(status.ok());
251
0
        return callback_status;
252
1.38M
      } else {
253
        // if there is no callback then we only care about
254
        // the memtable insert status
255
1.38M
        assert(callback == nullptr || callback_status.ok());
256
1.38M
        return status;
257
1.38M
      }
258
1.38M
    }
259
260
11.0M
    bool CallbackFailed() {
261
11.0M
      return (callback != nullptr) && !callback_status.ok();
262
11.0M
    }
263
264
4.15M
    bool ShouldWriteToMemtable() {
265
4.15M
      return status.ok() && !CallbackFailed() && !disable_memtable;
266
4.15M
    }
267
268
0
    bool ShouldWriteToWAL() {
269
0
      return status.ok() && !CallbackFailed() && !disable_wal;
270
0
    }
271
272
    // No other mutexes may be acquired while holding StateMutex(), it is
273
    // always last in the order
274
0
    std::mutex& StateMutex() {
275
0
      assert(made_waitable);
276
0
      return *static_cast<std::mutex*>(static_cast<void*>(&state_mutex_bytes));
277
0
    }
278
279
0
    std::condition_variable& StateCV() {
280
0
      assert(made_waitable);
281
0
      return *static_cast<std::condition_variable*>(
282
0
          static_cast<void*>(&state_cv_bytes));
283
0
    }
284
  };
285
286
  struct AdaptationContext {
287
    const char* name;
288
    std::atomic<int32_t> value;
289
290
20
    explicit AdaptationContext(const char* name0) : name(name0), value(0) {}
291
  };
292
293
  explicit WriteThread(const ImmutableDBOptions& db_options);
294
295
46.0k
  virtual ~WriteThread() = default;
296
297
  // IMPORTANT: None of the methods in this class rely on the db mutex
298
  // for correctness. All of the methods except JoinBatchGroup and
299
  // EnterUnbatched may be called either with or without the db mutex held.
300
  // Correctness is maintained by ensuring that only a single thread is
301
  // a leader at a time.
302
303
  // Registers w as ready to become part of a batch group, waits until the
304
  // caller should perform some work, and returns the current state of the
305
  // writer.  If w has become the leader of a write batch group, returns
306
  // STATE_GROUP_LEADER.  If w has been made part of a sequential batch
307
  // group and the leader has performed the write, returns STATE_DONE.
308
  // If w has been made part of a parallel batch group and is responsible
309
  // for updating the memtable, returns STATE_PARALLEL_MEMTABLE_WRITER.
310
  //
311
  // The db mutex SHOULD NOT be held when calling this function, because
312
  // it will block.
313
  //
314
  // Writer* w:        Writer to be executed as part of a batch group
315
  void JoinBatchGroup(Writer* w);
316
317
  // Constructs a write batch group led by leader, which should be a
318
  // Writer passed to JoinBatchGroup on the current thread.
319
  //
320
  // Writer* leader:          Writer that is STATE_GROUP_LEADER
321
  // WriteGroup* write_group: Out-param of group members
322
  // returns:                 Total batch group byte size
323
  size_t EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group);
324
325
  // Unlinks the Writer-s in a batch group, wakes up the non-leaders,
326
  // and wakes up the next leader (if any).
327
  //
328
  // WriteGroup* write_group: the write group
329
  // Status status:           Status of write operation
330
  void ExitAsBatchGroupLeader(WriteGroup& write_group, Status& status);
331
332
  // Exit batch group on behalf of batch group leader.
333
  void ExitAsBatchGroupFollower(Writer* w);
334
335
  // Constructs a write batch group led by leader from newest_memtable_writers_
336
  // list. The leader should either write memtable for the whole group and
337
  // call ExitAsMemTableWriter, or launch parallel memtable write through
338
  // LaunchParallelMemTableWriters.
339
  void EnterAsMemTableWriter(Writer* leader, WriteGroup* write_grup);
340
341
  // Memtable writer group leader, or the last finished writer in a parallel
342
  // write group, exit from the newest_memtable_writers_ list, and wake up
343
  // the next leader if needed.
344
  void ExitAsMemTableWriter(Writer* self, WriteGroup& write_group);
345
346
  // Causes JoinBatchGroup to return STATE_PARALLEL_MEMTABLE_WRITER for all of
347
  // the non-leader members of this write batch group.  Sets Writer::sequence
348
  // before waking them up.
349
  // If the size of write_group n is not small, the leader will call n^0.5
350
  // members to be PARALLEL_MEMTABLE_CALLER in the write_group to help to set
351
  // other's status parallel. This ensures that the cost to call SetState
352
  // sequentially does not exceed 2(n^0.5).
353
  //
354
  // WriteGroup* write_group: Extra state used to coordinate the parallel add
355
  void LaunchParallelMemTableWriters(WriteGroup* write_group);
356
357
  // One of the every stride=N number writer in the WriteGroup are set to the
358
  // MemTableWriters, where N is equal to square of the total number of this
359
  // write_group, and all of these MemTableWriters will write to memtable.
360
  void SetMemWritersEachStride(Writer* w);
361
362
  // Reports the completion of w's batch to the parallel group leader, and
363
  // waits for the rest of the parallel batch to complete.  Returns true
364
  // if this thread is the last to complete, and hence should advance
365
  // the sequence number and then call EarlyExitParallelGroup, false if
366
  // someone else has already taken responsibility for that.
367
  bool CompleteParallelMemTableWriter(Writer* w);
368
369
  // Waits for all preceding writers (unlocking mu while waiting), then
370
  // registers w as the currently proceeding writer.
371
  //
372
  // Writer* w:              A Writer not eligible for batching
373
  // InstrumentedMutex* mu:  The db mutex, to unlock while waiting
374
  // REQUIRES: db mutex held
375
  void EnterUnbatched(Writer* w, InstrumentedMutex* mu);
376
377
  // Completes a Writer begun with EnterUnbatched, unblocking subsequent
378
  // writers.
379
  void ExitUnbatched(Writer* w);
380
381
  // Wait for all parallel memtable writers to finish, in case pipelined
382
  // write is enabled.
383
  void WaitForMemTableWriters();
384
385
0
  SequenceNumber UpdateLastSequence(SequenceNumber sequence) {
386
0
    if (sequence > last_sequence_) {
387
0
      last_sequence_ = sequence;
388
0
    }
389
0
    return last_sequence_;
390
0
  }
391
392
  // Insert a dummy writer at the tail of the write queue to indicate a write
393
  // stall, and fail any writers in the queue with no_slowdown set to true
394
  // REQUIRES: db mutex held, no other stall on this queue outstanding
395
  void BeginWriteStall();
396
397
  // Remove the dummy writer and wake up waiting writers
398
  // REQUIRES: db mutex held
399
  void EndWriteStall();
400
401
  // Number of BeginWriteStall(), or 0 if there is no active stall in the
402
  // write queue.
403
  // REQUIRES: db mutex held
404
  uint64_t GetBegunCountOfOutstandingStall();
405
406
  // Wait for number of completed EndWriteStall() to reach >= `stall_count`,
407
  // which will generally have come from GetBegunCountOfOutstandingStall().
408
  // (Does not require db mutex held)
409
  void WaitForStallEndedCount(uint64_t stall_count);
410
411
 private:
412
  // See AwaitState.
413
  const uint64_t max_yield_usec_;
414
  const uint64_t slow_yield_usec_;
415
416
  // Allow multiple writers write to memtable concurrently.
417
  const bool allow_concurrent_memtable_write_;
418
419
  // Enable pipelined write to WAL and memtable.
420
  const bool enable_pipelined_write_;
421
422
  // The maximum limit of number of bytes that are written in a single batch
423
  // of WAL or memtable write. It is followed when the leader write size
424
  // is larger than 1/8 of this limit.
425
  const uint64_t max_write_batch_group_size_bytes;
426
427
  // Points to the newest pending writer. Only leader can remove
428
  // elements, adding can be done lock-free by anybody.
429
  std::atomic<Writer*> newest_writer_;
430
431
  // Points to the newest pending memtable writer. Used only when pipelined
432
  // write is enabled.
433
  std::atomic<Writer*> newest_memtable_writer_;
434
435
  // The last sequence that have been consumed by a writer. The sequence
436
  // is not necessary visible to reads because the writer can be ongoing.
437
  SequenceNumber last_sequence_;
438
439
  // A dummy writer to indicate a write stall condition. This will be inserted
440
  // at the tail of the writer queue by the leader, so newer writers can just
441
  // check for this and bail
442
  Writer write_stall_dummy_;
443
444
  // Mutex and condvar for writers to block on a write stall. During a write
445
  // stall, writers with no_slowdown set to false will wait on this rather
446
  // on the writer queue
447
  port::Mutex stall_mu_;
448
  port::CondVar stall_cv_;
449
450
  // Count the number of stalls begun, so that we can check whether
451
  // a particular stall has cleared (even if caught in another stall).
452
  // Controlled by DB mutex.
453
  // Because of the contract on BeginWriteStall() / EndWriteStall(),
454
  // stall_ended_count_ <= stall_begun_count_ <= stall_ended_count_ + 1.
455
  uint64_t stall_begun_count_ = 0;
456
  // Count the number of stalls ended, so that we can check whether
457
  // a particular stall has cleared (even if caught in another stall).
458
  // Writes controlled by DB mutex + stall_mu_, signalled by stall_cv_.
459
  // Read with stall_mu or DB mutex.
460
  uint64_t stall_ended_count_ = 0;
461
462
  // Waits for w->state & goal_mask using w->StateMutex().  Returns
463
  // the state that satisfies goal_mask.
464
  uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);
465
466
  // Blocks until w->state & goal_mask, returning the state value
467
  // that satisfied the predicate.  Uses ctx to adaptively use
468
  // std::this_thread::yield() to avoid mutex overheads.  ctx should be
469
  // a context-dependent static.
470
  uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx);
471
472
  // Set writer state and wake the writer up if it is waiting.
473
  void SetState(Writer* w, uint8_t new_state);
474
475
  // Links w into the newest_writer list. Return true if w was linked directly
476
  // into the leader position.  Safe to call from multiple threads without
477
  // external locking.
478
  bool LinkOne(Writer* w, std::atomic<Writer*>* newest_writer);
479
480
  // Link write group into the newest_writer list as a whole, while keeping the
481
  // order of the writers unchanged. Return true if the group was linked
482
  // directly into the leader position.
483
  bool LinkGroup(WriteGroup& write_group, std::atomic<Writer*>* newest_writer);
484
485
  // Computes any missing link_newer links.  Should not be called
486
  // concurrently with itself.
487
  void CreateMissingNewerLinks(Writer* head);
488
489
  // Set the leader in write_group to completed state and remove it from the
490
  // write group.
491
  void CompleteLeader(WriteGroup& write_group);
492
493
  // Set a follower in write_group to completed state and remove it from the
494
  // write group.
495
  void CompleteFollower(Writer* w, WriteGroup& write_group);
496
};
497
498
}  // namespace ROCKSDB_NAMESPACE