Coverage Report

Created: 2026-03-31 07:51

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/write_thread.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "db/write_thread.h"
7
8
#include <chrono>
9
#include <thread>
10
11
#include "db/column_family.h"
12
#include "monitoring/perf_context_imp.h"
13
#include "port/port.h"
14
#include "test_util/sync_point.h"
15
#include "util/random.h"
16
17
namespace ROCKSDB_NAMESPACE {
18
19
WriteThread::WriteThread(const ImmutableDBOptions& db_options)
20
80.4k
    : max_yield_usec_(db_options.enable_write_thread_adaptive_yield
21
80.4k
                          ? db_options.write_thread_max_yield_usec
22
80.4k
                          : 0),
23
80.4k
      slow_yield_usec_(db_options.write_thread_slow_yield_usec),
24
      allow_concurrent_memtable_write_(
25
80.4k
          db_options.allow_concurrent_memtable_write),
26
80.4k
      enable_pipelined_write_(db_options.enable_pipelined_write),
27
      max_write_batch_group_size_bytes(
28
80.4k
          db_options.max_write_batch_group_size_bytes),
29
80.4k
      newest_writer_(nullptr),
30
80.4k
      newest_memtable_writer_(nullptr),
31
80.4k
      last_sequence_(0),
32
80.4k
      write_stall_dummy_(),
33
80.4k
      stall_mu_(),
34
80.4k
      stall_cv_(&stall_mu_) {}
35
36
0
uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
37
  // We're going to block.  Lazily create the mutex.  We guarantee
38
  // propagation of this construction to the waker via the
39
  // STATE_LOCKED_WAITING state.  The waker won't try to touch the mutex
40
  // or the condvar unless they CAS away the STATE_LOCKED_WAITING that
41
  // we install below.
42
0
  w->CreateMutex();
43
44
0
  auto state = w->state.load(std::memory_order_acquire);
45
0
  assert(state != STATE_LOCKED_WAITING);
46
0
  if ((state & goal_mask) == 0 &&
47
0
      w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)) {
48
    // we have permission (and an obligation) to use StateMutex
49
0
    std::unique_lock<std::mutex> guard(w->StateMutex());
50
0
    w->StateCV().wait(guard, [w] {
51
0
      return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING;
52
0
    });
53
0
    state = w->state.load(std::memory_order_relaxed);
54
0
  }
55
  // else tricky.  Goal is met or CAS failed.  In the latter case the waker
56
  // must have changed the state, and compare_exchange_strong has updated
57
  // our local variable with the new one.  At the moment WriteThread never
58
  // waits for a transition across intermediate states, so we know that
59
  // since a state change has occurred the goal must have been met.
60
0
  assert((state & goal_mask) != 0);
61
0
  return state;
62
0
}
63
64
uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask,
65
0
                                AdaptationContext* ctx) {
66
0
  uint8_t state = 0;
67
68
  // 1. Busy loop using "pause" for 1 micro sec
69
  // 2. Else SOMETIMES busy loop using "yield" for 100 micro sec (default)
70
  // 3. Else blocking wait
71
72
  // On a modern Xeon each loop takes about 7 nanoseconds (most of which
73
  // is the effect of the pause instruction), so 200 iterations is a bit
74
  // more than a microsecond.  This is long enough that waits longer than
75
  // this can amortize the cost of accessing the clock and yielding.
76
0
  for (uint32_t tries = 0; tries < 200; ++tries) {
77
0
    state = w->state.load(std::memory_order_acquire);
78
0
    if ((state & goal_mask) != 0) {
79
0
      return state;
80
0
    }
81
0
    port::AsmVolatilePause();
82
0
  }
83
84
  // This is below the fast path, so that the stat is zero when all writes are
85
  // from the same thread.
86
0
  PERF_TIMER_FOR_WAIT_GUARD(write_thread_wait_nanos);
87
88
  // If we're only going to end up waiting a short period of time,
89
  // it can be a lot more efficient to call std::this_thread::yield()
90
  // in a loop than to block in StateMutex().  For reference, on my 4.0
91
  // SELinux test server with support for syscall auditing enabled, the
92
  // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is
93
  // 2.7 usec, and the average is more like 10 usec.  That can be a big
94
  // drag on RockDB's single-writer design.  Of course, spinning is a
95
  // bad idea if other threads are waiting to run or if we're going to
96
  // wait for a long time.  How do we decide?
97
  //
98
  // We break waiting into 3 categories: short-uncontended,
99
  // short-contended, and long.  If we had an oracle, then we would always
100
  // spin for short-uncontended, always block for long, and our choice for
101
  // short-contended might depend on whether we were trying to optimize
102
  // RocksDB throughput or avoid being greedy with system resources.
103
  //
104
  // Bucketing into short or long is easy by measuring elapsed time.
105
  // Differentiating short-uncontended from short-contended is a bit
106
  // trickier, but not too bad.  We could look for involuntary context
107
  // switches using getrusage(RUSAGE_THREAD, ..), but it's less work
108
  // (portability code and CPU) to just look for yield calls that take
109
  // longer than we expect.  sched_yield() doesn't actually result in any
110
  // context switch overhead if there are no other runnable processes
111
  // on the current core, in which case it usually takes less than
112
  // a microsecond.
113
  //
114
  // There are two primary tunables here: the threshold between "short"
115
  // and "long" waits, and the threshold at which we suspect that a yield
116
  // is slow enough to indicate we should probably block.  If these
117
  // thresholds are chosen well then CPU-bound workloads that don't
118
  // have more threads than cores will experience few context switches
119
  // (voluntary or involuntary), and the total number of context switches
120
  // (voluntary and involuntary) will not be dramatically larger (maybe
121
  // 2x) than the number of voluntary context switches that occur when
122
  // --max_yield_wait_micros=0.
123
  //
124
  // There's another constant, which is the number of slow yields we will
125
  // tolerate before reversing our previous decision.  Solitary slow
126
  // yields are pretty common (low-priority small jobs ready to run),
127
  // so this should be at least 2.  We set this conservatively to 3 so
128
  // that we can also immediately schedule a ctx adaptation, rather than
129
  // waiting for the next update_ctx.
130
131
0
  const size_t kMaxSlowYieldsWhileSpinning = 3;
132
133
  // Whether the yield approach has any credit in this context. The credit is
134
  // added by yield being succesfull before timing out, and decreased otherwise.
135
0
  auto& yield_credit = ctx->value;
136
  // Update the yield_credit based on sample runs or right after a hard failure
137
0
  bool update_ctx = false;
138
  // Should we reinforce the yield credit
139
0
  bool would_spin_again = false;
140
  // The samling base for updating the yeild credit. The sampling rate would be
141
  // 1/sampling_base.
142
0
  const int sampling_base = 256;
143
144
0
  if (max_yield_usec_ > 0) {
145
0
    update_ctx = Random::GetTLSInstance()->OneIn(sampling_base);
146
147
0
    if (update_ctx || yield_credit.load(std::memory_order_relaxed) >= 0) {
148
      // we're updating the adaptation statistics, or spinning has >
149
      // 50% chance of being shorter than max_yield_usec_ and causing no
150
      // involuntary context switches
151
0
      auto spin_begin = std::chrono::steady_clock::now();
152
153
      // this variable doesn't include the final yield (if any) that
154
      // causes the goal to be met
155
0
      size_t slow_yield_count = 0;
156
157
0
      auto iter_begin = spin_begin;
158
0
      while ((iter_begin - spin_begin) <=
159
0
             std::chrono::microseconds(max_yield_usec_)) {
160
0
        std::this_thread::yield();
161
162
0
        state = w->state.load(std::memory_order_acquire);
163
0
        if ((state & goal_mask) != 0) {
164
          // success
165
0
          would_spin_again = true;
166
0
          break;
167
0
        }
168
169
0
        auto now = std::chrono::steady_clock::now();
170
0
        if (now == iter_begin ||
171
0
            now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) {
172
          // conservatively count it as a slow yield if our clock isn't
173
          // accurate enough to measure the yield duration
174
0
          ++slow_yield_count;
175
0
          if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) {
176
            // Not just one ivcsw, but several.  Immediately update yield_credit
177
            // and fall back to blocking
178
0
            update_ctx = true;
179
0
            break;
180
0
          }
181
0
        }
182
0
        iter_begin = now;
183
0
      }
184
0
    }
185
0
  }
186
187
0
  if ((state & goal_mask) == 0) {
188
0
    TEST_SYNC_POINT_CALLBACK("WriteThread::AwaitState:BlockingWaiting", w);
189
0
    state = BlockingAwaitState(w, goal_mask);
190
0
  }
191
192
0
  if (update_ctx) {
193
    // Since our update is sample based, it is ok if a thread overwrites the
194
    // updates by other threads. Thus the update does not have to be atomic.
195
0
    auto v = yield_credit.load(std::memory_order_relaxed);
196
    // fixed point exponential decay with decay constant 1/1024, with +1
197
    // and -1 scaled to avoid overflow for int32_t
198
    //
199
    // On each update the positive credit is decayed by a facor of 1/1024 (i.e.,
200
    // 0.1%). If the sampled yield was successful, the credit is also increased
201
    // by X. Setting X=2^17 ensures that the credit never exceeds
202
    // 2^17*2^10=2^27, which is lower than 2^31 the upperbound of int32_t. Same
203
    // logic applies to negative credits.
204
0
    v = v - (v / 1024) + (would_spin_again ? 1 : -1) * 131072;
205
0
    yield_credit.store(v, std::memory_order_relaxed);
206
0
  }
207
208
0
  assert((state & goal_mask) != 0);
209
0
  return state;
210
0
}
211
212
495k
void WriteThread::SetState(Writer* w, uint8_t new_state) {
213
495k
  assert(w);
214
495k
  auto state = w->state.load(std::memory_order_acquire);
215
495k
  if (state == STATE_LOCKED_WAITING ||
216
495k
      !w->state.compare_exchange_strong(state, new_state)) {
217
0
    assert(state == STATE_LOCKED_WAITING);
218
219
0
    std::lock_guard<std::mutex> guard(w->StateMutex());
220
0
    assert(w->state.load(std::memory_order_relaxed) != new_state);
221
0
    w->state.store(new_state, std::memory_order_relaxed);
222
0
    w->StateCV().notify_one();
223
0
  }
224
495k
}
225
226
526k
bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
227
526k
  assert(newest_writer != nullptr);
228
526k
  assert(w->state == STATE_INIT);
229
526k
  Writer* writers = newest_writer->load(std::memory_order_relaxed);
230
526k
  while (true) {
231
526k
    assert(writers != w);
232
    // If write stall in effect, and w->no_slowdown is not true,
233
    // block here until stall is cleared. If its true, then return
234
    // immediately
235
526k
    if (writers == &write_stall_dummy_) {
236
0
      if (w->no_slowdown) {
237
0
        w->status = Status::Incomplete("Write stall");
238
0
        SetState(w, STATE_COMPLETED);
239
0
        return false;
240
0
      }
241
      // Since no_slowdown is false, wait here to be notified of the write
242
      // stall clearing
243
0
      {
244
0
        MutexLock lock(&stall_mu_);
245
0
        writers = newest_writer->load(std::memory_order_relaxed);
246
0
        if (writers == &write_stall_dummy_) {
247
0
          TEST_SYNC_POINT_CALLBACK("WriteThread::WriteStall::Wait", w);
248
0
          stall_cv_.Wait();
249
          // Load newest_writers_ again since it may have changed
250
0
          writers = newest_writer->load(std::memory_order_relaxed);
251
0
          continue;
252
0
        }
253
0
      }
254
0
    }
255
526k
    w->link_older = writers;
256
526k
    if (newest_writer->compare_exchange_weak(writers, w)) {
257
526k
      return (writers == nullptr);
258
526k
    }
259
526k
  }
260
526k
}
261
262
bool WriteThread::LinkGroup(WriteGroup& write_group,
263
0
                            std::atomic<Writer*>* newest_writer) {
264
0
  assert(newest_writer != nullptr);
265
0
  Writer* leader = write_group.leader;
266
0
  Writer* last_writer = write_group.last_writer;
267
0
  Writer* w = last_writer;
268
0
  while (true) {
269
    // Unset link_newer pointers to make sure when we call
270
    // CreateMissingNewerLinks later it create all missing links.
271
0
    w->link_newer = nullptr;
272
0
    w->write_group = nullptr;
273
0
    if (w == leader) {
274
0
      break;
275
0
    }
276
0
    w = w->link_older;
277
0
  }
278
0
  Writer* newest = newest_writer->load(std::memory_order_relaxed);
279
0
  while (true) {
280
0
    leader->link_older = newest;
281
0
    if (newest_writer->compare_exchange_weak(newest, last_writer)) {
282
0
      return (newest == nullptr);
283
0
    }
284
0
  }
285
0
}
286
287
495k
void WriteThread::CreateMissingNewerLinks(Writer* head) {
288
495k
  while (true) {
289
495k
    Writer* next = head->link_older;
290
495k
    if (next == nullptr || next->link_newer != nullptr) {
291
495k
      assert(next == nullptr || next->link_newer == head);
292
495k
      break;
293
495k
    }
294
0
    next->link_newer = head;
295
0
    head = next;
296
0
  }
297
495k
}
298
299
0
void WriteThread::CompleteLeader(WriteGroup& write_group) {
300
0
  assert(write_group.size > 0);
301
0
  Writer* leader = write_group.leader;
302
0
  if (write_group.size == 1) {
303
0
    write_group.leader = nullptr;
304
0
    write_group.last_writer = nullptr;
305
0
  } else {
306
0
    assert(leader->link_newer != nullptr);
307
0
    leader->link_newer->link_older = nullptr;
308
0
    write_group.leader = leader->link_newer;
309
0
  }
310
0
  write_group.size -= 1;
311
0
  SetState(leader, STATE_COMPLETED);
312
0
}
313
314
0
void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) {
315
0
  assert(write_group.size > 1);
316
0
  assert(w != write_group.leader);
317
0
  if (w == write_group.last_writer) {
318
0
    w->link_older->link_newer = nullptr;
319
0
    write_group.last_writer = w->link_older;
320
0
  } else {
321
0
    w->link_older->link_newer = w->link_newer;
322
0
    w->link_newer->link_older = w->link_older;
323
0
  }
324
0
  write_group.size -= 1;
325
0
  SetState(w, STATE_COMPLETED);
326
0
}
327
328
0
void WriteThread::BeginWriteStall() {
329
0
  ++stall_begun_count_;
330
0
  LinkOne(&write_stall_dummy_, &newest_writer_);
331
332
  // Walk writer list until w->write_group != nullptr. The current write group
333
  // will not have a mix of slowdown/no_slowdown, so its ok to stop at that
334
  // point
335
0
  Writer* w = write_stall_dummy_.link_older;
336
0
  Writer* prev = &write_stall_dummy_;
337
0
  while (w != nullptr && w->write_group == nullptr) {
338
0
    if (w->no_slowdown) {
339
0
      prev->link_older = w->link_older;
340
0
      w->status = Status::Incomplete("Write stall");
341
0
      SetState(w, STATE_COMPLETED);
342
      // Only update `link_newer` if it's already set.
343
      // `CreateMissingNewerLinks()` will update the nullptr `link_newer` later,
344
      // which assumes the the first non-nullptr `link_newer` is the last
345
      // nullptr link in the writer list.
346
      // If `link_newer` is set here, `CreateMissingNewerLinks()` may stop
347
      // updating the whole list when it sees the first non nullptr link.
348
0
      if (prev->link_older && prev->link_older->link_newer) {
349
0
        prev->link_older->link_newer = prev;
350
0
      }
351
0
      w = prev->link_older;
352
0
    } else {
353
0
      prev = w;
354
0
      w = w->link_older;
355
0
    }
356
0
  }
357
0
}
358
359
0
void WriteThread::EndWriteStall() {
360
0
  MutexLock lock(&stall_mu_);
361
362
  // Unlink write_stall_dummy_ from the write queue. This will unblock
363
  // pending write threads to enqueue themselves
364
0
  assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
365
  // write_stall_dummy_.link_older can be nullptr only if LockWAL() has been
366
  // called.
367
0
  if (write_stall_dummy_.link_older) {
368
0
    write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer;
369
0
  }
370
0
  newest_writer_.exchange(write_stall_dummy_.link_older);
371
372
0
  ++stall_ended_count_;
373
374
  // Wake up writers
375
0
  stall_cv_.SignalAll();
376
0
}
377
378
0
uint64_t WriteThread::GetBegunCountOfOutstandingStall() {
379
0
  if (stall_begun_count_ > stall_ended_count_) {
380
    // Oustanding stall in queue
381
0
    assert(newest_writer_.load(std::memory_order_relaxed) ==
382
0
           &write_stall_dummy_);
383
0
    return stall_begun_count_;
384
0
  } else {
385
    // No stall in queue
386
0
    assert(newest_writer_.load(std::memory_order_relaxed) !=
387
0
           &write_stall_dummy_);
388
0
    return 0;
389
0
  }
390
0
}
391
392
0
void WriteThread::WaitForStallEndedCount(uint64_t stall_count) {
393
0
  MutexLock lock(&stall_mu_);
394
395
0
  while (stall_ended_count_ < stall_count) {
396
0
    stall_cv_.Wait();
397
0
  }
398
0
}
399
400
static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup");
401
495k
void WriteThread::JoinBatchGroup(Writer* w) {
402
495k
  TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w);
403
495k
  assert(w->batch != nullptr);
404
405
495k
  bool linked_as_leader = LinkOne(w, &newest_writer_);
406
407
495k
  w->CheckWriteEnqueuedCallback();
408
409
495k
  if (linked_as_leader) {
410
495k
    SetState(w, STATE_GROUP_LEADER);
411
495k
  }
412
413
495k
  TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
414
495k
  TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait2", w);
415
416
495k
  if (!linked_as_leader) {
417
    /**
418
     * Wait util:
419
     * 1) An existing leader pick us as the new leader when it finishes
420
     * 2) An existing leader pick us as its follewer and
421
     * 2.1) finishes the memtable writes on our behalf
422
     * 2.2) Or tell us to finish the memtable writes in pralallel
423
     * 3) (pipelined write) An existing leader pick us as its follower and
424
     *    finish book-keeping and WAL write for us, enqueue us as pending
425
     *    memtable writer, and
426
     * 3.1) we become memtable writer group leader, or
427
     * 3.2) an existing memtable writer group leader tell us to finish memtable
428
     *      writes in parallel.
429
     */
430
0
    TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w);
431
0
    AwaitState(w,
432
0
               STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
433
0
                   STATE_PARALLEL_MEMTABLE_CALLER |
434
0
                   STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
435
0
               &jbg_ctx);
436
0
    TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
437
0
  }
438
495k
}
439
440
size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
441
495k
                                            WriteGroup* write_group) {
442
495k
  assert(leader->link_older == nullptr);
443
495k
  assert(leader->batch != nullptr);
444
495k
  assert(write_group != nullptr);
445
446
495k
  size_t size = WriteBatchInternal::ByteSize(leader->batch);
447
448
  // Allow the group to grow up to a maximum size, but if the
449
  // original write is small, limit the growth so we do not slow
450
  // down the small write too much.
451
495k
  size_t max_size = max_write_batch_group_size_bytes;
452
495k
  const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
453
495k
  if (size <= min_batch_size_bytes) {
454
495k
    max_size = size + min_batch_size_bytes;
455
495k
  }
456
457
495k
  leader->write_group = write_group;
458
495k
  write_group->leader = leader;
459
495k
  write_group->last_writer = leader;
460
495k
  write_group->size = 1;
461
495k
  Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
462
463
  // This is safe regardless of any db mutex status of the caller. Previous
464
  // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
465
  // (they emptied the list and then we added ourself as leader) or had to
466
  // explicitly wake us up (the list was non-empty when we added ourself,
467
  // so we have already received our MarkJoined).
468
495k
  CreateMissingNewerLinks(newest_writer);
469
470
  // This comment illustrates how the rest of the function works using an
471
  // example. Notation:
472
  //
473
  // - Items are `Writer`s
474
  // - Items prefixed by "@" have been included in `write_group`
475
  // - Items prefixed by "*" have compatible options with `leader`, but have not
476
  //   been included in `write_group` yet
477
  // - Items after several spaces are in `r_list`. These have incompatible
478
  //   options with `leader` and are temporarily separated from the main list.
479
  //
480
  // Each line below depicts the state of the linked lists at the beginning of
481
  // an iteration of the while-loop.
482
  //
483
  // @leader, n1, *n2, n3, *newest_writer
484
  // @leader, *n2, n3, *newest_writer,    n1
485
  // @leader, @n2, n3, *newest_writer,    n1
486
  //
487
  // After the while-loop, the `r_list` is grafted back onto the main list.
488
  //
489
  // case A: no new `Writer`s arrived
490
  // @leader, @n2, @newest_writer,        n1, n3
491
  // @leader, @n2, @newest_writer, n1, n3
492
  //
493
  // case B: a new `Writer` (n4) arrived
494
  // @leader, @n2, @newest_writer, n4     n1, n3
495
  // @leader, @n2, @newest_writer, n1, n3, n4
496
497
  // Tricky. Iteration start (leader) is exclusive and finish
498
  // (newest_writer) is inclusive. Iteration goes from old to new.
499
495k
  Writer* w = leader;
500
  // write_group end
501
495k
  Writer* we = leader;
502
  // declare r_list
503
495k
  Writer* rb = nullptr;
504
495k
  Writer* re = nullptr;
505
506
495k
  while (w != newest_writer) {
507
0
    assert(w->link_newer);
508
0
    w = w->link_newer;
509
510
0
    if ((w->sync && !leader->sync) ||
511
        // Do not include a sync write into a batch handled by a non-sync write.
512
0
        (w->no_slowdown != leader->no_slowdown) ||
513
        // Do not mix writes that are ok with delays with the ones that request
514
        // fail on delays.
515
0
        (w->disable_wal != leader->disable_wal) ||
516
        // Do not mix writes that enable WAL with the ones whose WAL disabled.
517
0
        (w->protection_bytes_per_key != leader->protection_bytes_per_key) ||
518
        // Do not mix writes with different levels of integrity protection.
519
0
        (w->rate_limiter_priority != leader->rate_limiter_priority) ||
520
        // Do not mix writes with different rate limiter priorities.
521
0
        (w->batch == nullptr) ||
522
        // Do not include those writes with nullptr batch. Those are not writes
523
        // those are something else. They want to be alone
524
0
        (w->callback != nullptr && !w->callback->AllowWriteBatching()) ||
525
        // dont batch writes that don't want to be batched
526
0
        (size + WriteBatchInternal::ByteSize(w->batch) > max_size) ||
527
        // Do not make batch too big
528
0
        (leader->ingest_wbwi || w->ingest_wbwi)
529
        // ingesting WBWI needs to be its own group
530
0
    ) {
531
      // remove from list
532
0
      w->link_older->link_newer = w->link_newer;
533
0
      if (w->link_newer != nullptr) {
534
0
        w->link_newer->link_older = w->link_older;
535
0
      }
536
      // insert into r_list
537
0
      if (re == nullptr) {
538
0
        rb = re = w;
539
0
        w->link_older = nullptr;
540
0
      } else {
541
0
        w->link_older = re;
542
0
        re->link_newer = w;
543
0
        re = w;
544
0
      }
545
0
    } else {
546
      // grow up
547
0
      we = w;
548
0
      w->write_group = write_group;
549
0
      size += WriteBatchInternal::ByteSize(w->batch);
550
0
      write_group->last_writer = w;
551
0
      write_group->size++;
552
0
    }
553
0
  }
554
  // append r_list after write_group end
555
495k
  if (rb != nullptr) {
556
0
    rb->link_older = we;
557
0
    re->link_newer = nullptr;
558
0
    we->link_newer = rb;
559
0
    if (!newest_writer_.compare_exchange_weak(w, re)) {
560
0
      while (w->link_older != newest_writer) {
561
0
        w = w->link_older;
562
0
      }
563
0
      w->link_older = re;
564
0
    }
565
0
  }
566
567
495k
  TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w);
568
495k
  return size;
569
495k
}
570
571
void WriteThread::EnterAsMemTableWriter(Writer* leader,
572
0
                                        WriteGroup* write_group) {
573
0
  assert(leader != nullptr);
574
0
  assert(leader->link_older == nullptr);
575
0
  assert(leader->batch != nullptr);
576
0
  assert(write_group != nullptr);
577
578
0
  size_t size = WriteBatchInternal::ByteSize(leader->batch);
579
580
  // Allow the group to grow up to a maximum size, but if the
581
  // original write is small, limit the growth so we do not slow
582
  // down the small write too much.
583
0
  size_t max_size = max_write_batch_group_size_bytes;
584
0
  const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
585
0
  if (size <= min_batch_size_bytes) {
586
0
    max_size = size + min_batch_size_bytes;
587
0
  }
588
589
0
  leader->write_group = write_group;
590
0
  write_group->leader = leader;
591
0
  write_group->size = 1;
592
0
  Writer* last_writer = leader;
593
594
0
  if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) {
595
0
    Writer* newest_writer = newest_memtable_writer_.load();
596
0
    CreateMissingNewerLinks(newest_writer);
597
598
0
    Writer* w = leader;
599
0
    while (w != newest_writer) {
600
0
      assert(w->link_newer);
601
0
      w = w->link_newer;
602
603
0
      if (w->batch == nullptr) {
604
0
        break;
605
0
      }
606
607
0
      if (w->batch->HasMerge()) {
608
0
        break;
609
0
      }
610
611
0
      if (!allow_concurrent_memtable_write_) {
612
0
        auto batch_size = WriteBatchInternal::ByteSize(w->batch);
613
0
        if (size + batch_size > max_size) {
614
          // Do not make batch too big
615
0
          break;
616
0
        }
617
0
        size += batch_size;
618
0
      }
619
620
0
      w->write_group = write_group;
621
0
      last_writer = w;
622
0
      write_group->size++;
623
0
    }
624
0
  }
625
626
0
  write_group->last_writer = last_writer;
627
0
  write_group->last_sequence =
628
0
      last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1;
629
0
}
630
631
void WriteThread::ExitAsMemTableWriter(Writer* /*self*/,
632
0
                                       WriteGroup& write_group) {
633
0
  Writer* leader = write_group.leader;
634
0
  Writer* last_writer = write_group.last_writer;
635
636
0
  Writer* newest_writer = last_writer;
637
0
  if (!newest_memtable_writer_.compare_exchange_strong(newest_writer,
638
0
                                                       nullptr)) {
639
0
    CreateMissingNewerLinks(newest_writer);
640
0
    Writer* next_leader = last_writer->link_newer;
641
0
    assert(next_leader != nullptr);
642
0
    next_leader->link_older = nullptr;
643
0
    SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER);
644
0
  }
645
0
  Writer* w = leader;
646
0
  while (true) {
647
0
    if (!write_group.status.ok()) {
648
0
      w->status = write_group.status;
649
0
    }
650
0
    Writer* next = w->link_newer;
651
0
    if (w != leader) {
652
0
      SetState(w, STATE_COMPLETED);
653
0
    }
654
0
    if (w == last_writer) {
655
0
      break;
656
0
    }
657
0
    assert(next);
658
0
    w = next;
659
0
  }
660
  // Note that leader has to exit last, since it owns the write group.
661
0
  SetState(leader, STATE_COMPLETED);
662
0
}
663
664
0
void WriteThread::SetMemWritersEachStride(Writer* w) {
665
0
  WriteGroup* write_group = w->write_group;
666
0
  Writer* last_writer = write_group->last_writer;
667
668
  // The stride is the same for each writer in write_group, so w will
669
  // call the writers with the same number in write_group mod total size
670
0
  size_t stride = static_cast<size_t>(std::sqrt(write_group->size));
671
0
  size_t count = 0;
672
0
  while (w) {
673
0
    if (count++ % stride == 0) {
674
0
      SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
675
0
    }
676
0
    w = (w == last_writer) ? nullptr : w->link_newer;
677
0
  }
678
0
}
679
680
0
void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) {
681
0
  assert(write_group != nullptr);
682
0
  size_t group_size = write_group->size;
683
0
  write_group->running.store(group_size);
684
685
  // The minimum number to allow the group use parallel caller mode.
686
  // The number must no lower than 3;
687
0
  const size_t MinParallelSize = 20;
688
689
  // The group_size is too small, and there is no need to have
690
  // the parallel partial callers.
691
0
  if (group_size < MinParallelSize) {
692
0
    for (auto w : *write_group) {
693
0
      SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
694
0
    }
695
0
    return;
696
0
  }
697
698
  // The stride is equal to std::sqrt(group_size) which can minimize
699
  // the total number of leader SetSate.
700
  // Set the leader itself STATE_PARALLEL_MEMTABLE_WRITER, and set
701
  // (stride-1) writers to be STATE_PARALLEL_MEMTABLE_CALLER.
702
0
  size_t stride = static_cast<size_t>(std::sqrt(group_size));
703
0
  auto w = write_group->leader;
704
0
  SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
705
706
0
  for (size_t i = 1; i < stride; i++) {
707
0
    w = w->link_newer;
708
0
    SetState(w, STATE_PARALLEL_MEMTABLE_CALLER);
709
0
  }
710
711
  // After setting all STATE_PARALLEL_MEMTABLE_CALLER, the leader also
712
  // does the job as STATE_PARALLEL_MEMTABLE_CALLER.
713
0
  w = w->link_newer;
714
0
  SetMemWritersEachStride(w);
715
0
}
716
717
static WriteThread::AdaptationContext cpmtw_ctx(
718
    "CompleteParallelMemTableWriter");
719
// This method is called by both the leader and parallel followers
720
0
bool WriteThread::CompleteParallelMemTableWriter(Writer* w) {
721
0
  auto* write_group = w->write_group;
722
0
  if (!w->status.ok()) {
723
0
    std::lock_guard<std::mutex> guard(write_group->leader->StateMutex());
724
0
    write_group->status = w->status;
725
0
  }
726
727
0
  if (write_group->running-- > 1) {
728
    // we're not the last one
729
0
    AwaitState(w, STATE_COMPLETED, &cpmtw_ctx);
730
0
    return false;
731
0
  }
732
  // else we're the last parallel worker and should perform exit duties.
733
0
  w->status = write_group->status;
734
  // Callers of this function must ensure w->status is checked.
735
0
  write_group->status.PermitUncheckedError();
736
0
  return true;
737
0
}
738
739
0
void WriteThread::ExitAsBatchGroupFollower(Writer* w) {
740
0
  auto* write_group = w->write_group;
741
742
0
  assert(w->state == STATE_PARALLEL_MEMTABLE_WRITER);
743
0
  assert(write_group->status.ok());
744
0
  ExitAsBatchGroupLeader(*write_group, write_group->status);
745
0
  assert(w->status.ok());
746
0
  assert(w->state == STATE_COMPLETED);
747
0
  SetState(write_group->leader, STATE_COMPLETED);
748
0
}
749
750
static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader");
751
void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
752
495k
                                         Status& status) {
753
495k
  TEST_SYNC_POINT_CALLBACK("WriteThread::ExitAsBatchGroupLeader:Start",
754
495k
                           &write_group);
755
756
495k
  Writer* leader = write_group.leader;
757
495k
  Writer* last_writer = write_group.last_writer;
758
495k
  assert(leader->link_older == nullptr);
759
760
  // If status is non-ok already, then write_group.status won't have the chance
761
  // of being propagated to caller.
762
495k
  if (!status.ok()) {
763
0
    write_group.status.PermitUncheckedError();
764
0
  }
765
766
  // Propagate memtable write error to the whole group.
767
495k
  if (status.ok() && !write_group.status.ok()) {
768
0
    status = write_group.status;
769
0
  }
770
771
495k
  if (enable_pipelined_write_) {
772
    // We insert a dummy Writer right before our current write_group. This
773
    // allows us to unlink our write_group without the risk that a subsequent
774
    // writer becomes a new leader and might overtake us and add itself to the
775
    // memtable-writer-list before we can do so. This ensures that writers are
776
    // added to the memtable-writer-list in the exact same order in which they
777
    // were in the newest_writer list.
778
    // This must happen before completing the writers from our group to prevent
779
    // a race where the owning thread of one of these writers can start a new
780
    // write operation.
781
0
    Writer dummy;
782
0
    Writer* head = newest_writer_.load(std::memory_order_acquire);
783
0
    if (head != last_writer ||
784
0
        !newest_writer_.compare_exchange_strong(head, &dummy)) {
785
      // Either last_writer wasn't the head during the load(), or it was the
786
      // head during the load() but somebody else pushed onto the list before
787
      // we did the compare_exchange_strong (causing it to fail). In the latter
788
      // case compare_exchange_strong has the effect of re-reading its first
789
      // param (head). No need to retry a failing CAS, because only a departing
790
      // leader (which we are at the moment) can remove nodes from the list.
791
0
      assert(head != last_writer);
792
793
      // After walking link_older starting from head (if not already done) we
794
      // will be able to traverse w->link_newer below.
795
0
      CreateMissingNewerLinks(head);
796
0
      assert(last_writer->link_newer != nullptr);
797
0
      last_writer->link_newer->link_older = &dummy;
798
0
      dummy.link_newer = last_writer->link_newer;
799
0
    }
800
801
    // Complete writers that don't write to memtable
802
0
    for (Writer* w = last_writer; w != leader;) {
803
0
      Writer* next = w->link_older;
804
0
      w->status = status;
805
0
      if (!w->ShouldWriteToMemtable()) {
806
0
        CompleteFollower(w, write_group);
807
0
      }
808
0
      w = next;
809
0
    }
810
0
    if (!leader->ShouldWriteToMemtable()) {
811
0
      CompleteLeader(write_group);
812
0
    }
813
814
0
    TEST_SYNC_POINT_CALLBACK(
815
0
        "WriteThread::ExitAsBatchGroupLeader:AfterCompleteWriters",
816
0
        &write_group);
817
818
    // Link the remaining of the group to memtable writer list.
819
    // We have to link our group to memtable writer queue before wake up the
820
    // next leader or set newest_writer_ to null, otherwise the next leader
821
    // can run ahead of us and link to memtable writer queue before we do.
822
0
    if (write_group.size > 0) {
823
0
      if (LinkGroup(write_group, &newest_memtable_writer_)) {
824
        // The leader can now be different from current writer.
825
0
        SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER);
826
0
      }
827
0
    }
828
829
    // Unlink the dummy writer from the list and identify the new leader
830
0
    head = newest_writer_.load(std::memory_order_acquire);
831
0
    if (head != &dummy ||
832
0
        !newest_writer_.compare_exchange_strong(head, nullptr)) {
833
0
      CreateMissingNewerLinks(head);
834
0
      Writer* new_leader = dummy.link_newer;
835
0
      assert(new_leader != nullptr);
836
0
      new_leader->link_older = nullptr;
837
0
      SetState(new_leader, STATE_GROUP_LEADER);
838
0
    }
839
840
0
    AwaitState(leader,
841
0
               STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_CALLER |
842
0
                   STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
843
0
               &eabgl_ctx);
844
495k
  } else {
845
495k
    Writer* head = newest_writer_.load(std::memory_order_acquire);
846
495k
    if (head != last_writer ||
847
495k
        !newest_writer_.compare_exchange_strong(head, nullptr)) {
848
      // Either last_writer wasn't the head during the load(), or it was the
849
      // head during the load() but somebody else pushed onto the list before
850
      // we did the compare_exchange_strong (causing it to fail).  In the
851
      // latter case compare_exchange_strong has the effect of re-reading
852
      // its first param (head).  No need to retry a failing CAS, because
853
      // only a departing leader (which we are at the moment) can remove
854
      // nodes from the list.
855
0
      assert(head != last_writer);
856
857
      // After walking link_older starting from head (if not already done)
858
      // we will be able to traverse w->link_newer below. This function
859
      // can only be called from an active leader, only a leader can
860
      // clear newest_writer_, we didn't, and only a clear newest_writer_
861
      // could cause the next leader to start their work without a call
862
      // to MarkJoined, so we can definitely conclude that no other leader
863
      // work is going on here (with or without db mutex).
864
0
      CreateMissingNewerLinks(head);
865
0
      assert(last_writer->link_newer != nullptr);
866
0
      assert(last_writer->link_newer->link_older == last_writer);
867
0
      last_writer->link_newer->link_older = nullptr;
868
869
      // Next leader didn't self-identify, because newest_writer_ wasn't
870
      // nullptr when they enqueued (we were definitely enqueued before them
871
      // and are still in the list).  That means leader handoff occurs when
872
      // we call MarkJoined
873
0
      SetState(last_writer->link_newer, STATE_GROUP_LEADER);
874
0
    }
875
    // else nobody else was waiting, although there might already be a new
876
    // leader now
877
878
495k
    while (last_writer != leader) {
879
0
      assert(last_writer);
880
0
      last_writer->status = status;
881
      // we need to read link_older before calling SetState, because as soon
882
      // as it is marked committed the other thread's Await may return and
883
      // deallocate the Writer.
884
0
      auto next = last_writer->link_older;
885
0
      SetState(last_writer, STATE_COMPLETED);
886
887
0
      last_writer = next;
888
0
    }
889
495k
  }
890
495k
}
891
892
static WriteThread::AdaptationContext eu_ctx("EnterUnbatched");
893
30.9k
void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
894
30.9k
  assert(w != nullptr && w->batch == nullptr);
895
30.9k
  mu->Unlock();
896
30.9k
  bool linked_as_leader = LinkOne(w, &newest_writer_);
897
30.9k
  if (!linked_as_leader) {
898
0
    TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
899
    // Last leader will not pick us as a follower since our batch is nullptr
900
0
    AwaitState(w, STATE_GROUP_LEADER, &eu_ctx);
901
0
  }
902
30.9k
  if (enable_pipelined_write_) {
903
0
    WaitForMemTableWriters();
904
0
  }
905
30.9k
  mu->Lock();
906
30.9k
}
907
908
30.9k
void WriteThread::ExitUnbatched(Writer* w) {
909
30.9k
  assert(w != nullptr);
910
30.9k
  Writer* newest_writer = w;
911
30.9k
  if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) {
912
0
    CreateMissingNewerLinks(newest_writer);
913
0
    Writer* next_leader = w->link_newer;
914
0
    assert(next_leader != nullptr);
915
0
    next_leader->link_older = nullptr;
916
0
    SetState(next_leader, STATE_GROUP_LEADER);
917
0
  }
918
30.9k
}
919
920
static WriteThread::AdaptationContext wfmw_ctx("WaitForMemTableWriters");
921
0
void WriteThread::WaitForMemTableWriters() {
922
0
  assert(enable_pipelined_write_);
923
0
  if (newest_memtable_writer_.load() == nullptr) {
924
0
    return;
925
0
  }
926
0
  Writer w;
927
0
  if (!LinkOne(&w, &newest_memtable_writer_)) {
928
0
    AwaitState(&w, STATE_MEMTABLE_WRITER_LEADER, &wfmw_ctx);
929
0
  }
930
0
  newest_memtable_writer_.store(nullptr);
931
0
}
932
933
}  // namespace ROCKSDB_NAMESPACE