Coverage Report

Created: 2024-09-08 07:17

/src/rocksdb/db/write_thread.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "db/write_thread.h"
7
8
#include <chrono>
9
#include <thread>
10
11
#include "db/column_family.h"
12
#include "monitoring/perf_context_imp.h"
13
#include "port/port.h"
14
#include "test_util/sync_point.h"
15
#include "util/random.h"
16
17
namespace ROCKSDB_NAMESPACE {
18
19
WriteThread::WriteThread(const ImmutableDBOptions& db_options)
20
    : max_yield_usec_(db_options.enable_write_thread_adaptive_yield
21
                          ? db_options.write_thread_max_yield_usec
22
                          : 0),
23
      slow_yield_usec_(db_options.write_thread_slow_yield_usec),
24
      allow_concurrent_memtable_write_(
25
          db_options.allow_concurrent_memtable_write),
26
      enable_pipelined_write_(db_options.enable_pipelined_write),
27
      max_write_batch_group_size_bytes(
28
          db_options.max_write_batch_group_size_bytes),
29
      newest_writer_(nullptr),
30
      newest_memtable_writer_(nullptr),
31
      last_sequence_(0),
32
      write_stall_dummy_(),
33
      stall_mu_(),
34
22.1k
      stall_cv_(&stall_mu_) {}
35
36
0
uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
37
  // We're going to block.  Lazily create the mutex.  We guarantee
38
  // propagation of this construction to the waker via the
39
  // STATE_LOCKED_WAITING state.  The waker won't try to touch the mutex
40
  // or the condvar unless they CAS away the STATE_LOCKED_WAITING that
41
  // we install below.
42
0
  w->CreateMutex();
43
44
0
  auto state = w->state.load(std::memory_order_acquire);
45
0
  assert(state != STATE_LOCKED_WAITING);
46
0
  if ((state & goal_mask) == 0 &&
47
0
      w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)) {
48
    // we have permission (and an obligation) to use StateMutex
49
0
    std::unique_lock<std::mutex> guard(w->StateMutex());
50
0
    w->StateCV().wait(guard, [w] {
51
0
      return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING;
52
0
    });
53
0
    state = w->state.load(std::memory_order_relaxed);
54
0
  }
55
  // else tricky.  Goal is met or CAS failed.  In the latter case the waker
56
  // must have changed the state, and compare_exchange_strong has updated
57
  // our local variable with the new one.  At the moment WriteThread never
58
  // waits for a transition across intermediate states, so we know that
59
  // since a state change has occurred the goal must have been met.
60
0
  assert((state & goal_mask) != 0);
61
0
  return state;
62
0
}
63
64
uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask,
65
0
                                AdaptationContext* ctx) {
66
0
  uint8_t state = 0;
67
68
  // 1. Busy loop using "pause" for 1 micro sec
69
  // 2. Else SOMETIMES busy loop using "yield" for 100 micro sec (default)
70
  // 3. Else blocking wait
71
72
  // On a modern Xeon each loop takes about 7 nanoseconds (most of which
73
  // is the effect of the pause instruction), so 200 iterations is a bit
74
  // more than a microsecond.  This is long enough that waits longer than
75
  // this can amortize the cost of accessing the clock and yielding.
76
0
  for (uint32_t tries = 0; tries < 200; ++tries) {
77
0
    state = w->state.load(std::memory_order_acquire);
78
0
    if ((state & goal_mask) != 0) {
79
0
      return state;
80
0
    }
81
0
    port::AsmVolatilePause();
82
0
  }
83
84
  // This is below the fast path, so that the stat is zero when all writes are
85
  // from the same thread.
86
0
  PERF_TIMER_FOR_WAIT_GUARD(write_thread_wait_nanos);
87
88
  // If we're only going to end up waiting a short period of time,
89
  // it can be a lot more efficient to call std::this_thread::yield()
90
  // in a loop than to block in StateMutex().  For reference, on my 4.0
91
  // SELinux test server with support for syscall auditing enabled, the
92
  // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is
93
  // 2.7 usec, and the average is more like 10 usec.  That can be a big
94
  // drag on RockDB's single-writer design.  Of course, spinning is a
95
  // bad idea if other threads are waiting to run or if we're going to
96
  // wait for a long time.  How do we decide?
97
  //
98
  // We break waiting into 3 categories: short-uncontended,
99
  // short-contended, and long.  If we had an oracle, then we would always
100
  // spin for short-uncontended, always block for long, and our choice for
101
  // short-contended might depend on whether we were trying to optimize
102
  // RocksDB throughput or avoid being greedy with system resources.
103
  //
104
  // Bucketing into short or long is easy by measuring elapsed time.
105
  // Differentiating short-uncontended from short-contended is a bit
106
  // trickier, but not too bad.  We could look for involuntary context
107
  // switches using getrusage(RUSAGE_THREAD, ..), but it's less work
108
  // (portability code and CPU) to just look for yield calls that take
109
  // longer than we expect.  sched_yield() doesn't actually result in any
110
  // context switch overhead if there are no other runnable processes
111
  // on the current core, in which case it usually takes less than
112
  // a microsecond.
113
  //
114
  // There are two primary tunables here: the threshold between "short"
115
  // and "long" waits, and the threshold at which we suspect that a yield
116
  // is slow enough to indicate we should probably block.  If these
117
  // thresholds are chosen well then CPU-bound workloads that don't
118
  // have more threads than cores will experience few context switches
119
  // (voluntary or involuntary), and the total number of context switches
120
  // (voluntary and involuntary) will not be dramatically larger (maybe
121
  // 2x) than the number of voluntary context switches that occur when
122
  // --max_yield_wait_micros=0.
123
  //
124
  // There's another constant, which is the number of slow yields we will
125
  // tolerate before reversing our previous decision.  Solitary slow
126
  // yields are pretty common (low-priority small jobs ready to run),
127
  // so this should be at least 2.  We set this conservatively to 3 so
128
  // that we can also immediately schedule a ctx adaptation, rather than
129
  // waiting for the next update_ctx.
130
131
0
  const size_t kMaxSlowYieldsWhileSpinning = 3;
132
133
  // Whether the yield approach has any credit in this context. The credit is
134
  // added by yield being succesfull before timing out, and decreased otherwise.
135
0
  auto& yield_credit = ctx->value;
136
  // Update the yield_credit based on sample runs or right after a hard failure
137
0
  bool update_ctx = false;
138
  // Should we reinforce the yield credit
139
0
  bool would_spin_again = false;
140
  // The samling base for updating the yeild credit. The sampling rate would be
141
  // 1/sampling_base.
142
0
  const int sampling_base = 256;
143
144
0
  if (max_yield_usec_ > 0) {
145
0
    update_ctx = Random::GetTLSInstance()->OneIn(sampling_base);
146
147
0
    if (update_ctx || yield_credit.load(std::memory_order_relaxed) >= 0) {
148
      // we're updating the adaptation statistics, or spinning has >
149
      // 50% chance of being shorter than max_yield_usec_ and causing no
150
      // involuntary context switches
151
0
      auto spin_begin = std::chrono::steady_clock::now();
152
153
      // this variable doesn't include the final yield (if any) that
154
      // causes the goal to be met
155
0
      size_t slow_yield_count = 0;
156
157
0
      auto iter_begin = spin_begin;
158
0
      while ((iter_begin - spin_begin) <=
159
0
             std::chrono::microseconds(max_yield_usec_)) {
160
0
        std::this_thread::yield();
161
162
0
        state = w->state.load(std::memory_order_acquire);
163
0
        if ((state & goal_mask) != 0) {
164
          // success
165
0
          would_spin_again = true;
166
0
          break;
167
0
        }
168
169
0
        auto now = std::chrono::steady_clock::now();
170
0
        if (now == iter_begin ||
171
0
            now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) {
172
          // conservatively count it as a slow yield if our clock isn't
173
          // accurate enough to measure the yield duration
174
0
          ++slow_yield_count;
175
0
          if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) {
176
            // Not just one ivcsw, but several.  Immediately update yield_credit
177
            // and fall back to blocking
178
0
            update_ctx = true;
179
0
            break;
180
0
          }
181
0
        }
182
0
        iter_begin = now;
183
0
      }
184
0
    }
185
0
  }
186
187
0
  if ((state & goal_mask) == 0) {
188
0
    TEST_SYNC_POINT_CALLBACK("WriteThread::AwaitState:BlockingWaiting", w);
189
0
    state = BlockingAwaitState(w, goal_mask);
190
0
  }
191
192
0
  if (update_ctx) {
193
    // Since our update is sample based, it is ok if a thread overwrites the
194
    // updates by other threads. Thus the update does not have to be atomic.
195
0
    auto v = yield_credit.load(std::memory_order_relaxed);
196
    // fixed point exponential decay with decay constant 1/1024, with +1
197
    // and -1 scaled to avoid overflow for int32_t
198
    //
199
    // On each update the positive credit is decayed by a facor of 1/1024 (i.e.,
200
    // 0.1%). If the sampled yield was successful, the credit is also increased
201
    // by X. Setting X=2^17 ensures that the credit never exceeds
202
    // 2^17*2^10=2^27, which is lower than 2^31 the upperbound of int32_t. Same
203
    // logic applies to negative credits.
204
0
    v = v - (v / 1024) + (would_spin_again ? 1 : -1) * 131072;
205
0
    yield_credit.store(v, std::memory_order_relaxed);
206
0
  }
207
208
0
  assert((state & goal_mask) != 0);
209
0
  return state;
210
0
}
211
212
1.16M
void WriteThread::SetState(Writer* w, uint8_t new_state) {
213
1.16M
  assert(w);
214
1.16M
  auto state = w->state.load(std::memory_order_acquire);
215
1.16M
  if (state == STATE_LOCKED_WAITING ||
216
1.16M
      !w->state.compare_exchange_strong(state, new_state)) {
217
0
    assert(state == STATE_LOCKED_WAITING);
218
219
0
    std::lock_guard<std::mutex> guard(w->StateMutex());
220
0
    assert(w->state.load(std::memory_order_relaxed) != new_state);
221
0
    w->state.store(new_state, std::memory_order_relaxed);
222
0
    w->StateCV().notify_one();
223
0
  }
224
1.16M
}
225
226
1.16M
bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
227
1.16M
  assert(newest_writer != nullptr);
228
1.16M
  assert(w->state == STATE_INIT);
229
1.16M
  Writer* writers = newest_writer->load(std::memory_order_relaxed);
230
1.16M
  while (true) {
231
1.16M
    assert(writers != w);
232
    // If write stall in effect, and w->no_slowdown is not true,
233
    // block here until stall is cleared. If its true, then return
234
    // immediately
235
1.16M
    if (writers == &write_stall_dummy_) {
236
0
      if (w->no_slowdown) {
237
0
        w->status = Status::Incomplete("Write stall");
238
0
        SetState(w, STATE_COMPLETED);
239
0
        return false;
240
0
      }
241
      // Since no_slowdown is false, wait here to be notified of the write
242
      // stall clearing
243
0
      {
244
0
        MutexLock lock(&stall_mu_);
245
0
        writers = newest_writer->load(std::memory_order_relaxed);
246
0
        if (writers == &write_stall_dummy_) {
247
0
          TEST_SYNC_POINT_CALLBACK("WriteThread::WriteStall::Wait", w);
248
0
          stall_cv_.Wait();
249
          // Load newest_writers_ again since it may have changed
250
0
          writers = newest_writer->load(std::memory_order_relaxed);
251
0
          continue;
252
0
        }
253
0
      }
254
0
    }
255
1.16M
    w->link_older = writers;
256
1.16M
    if (newest_writer->compare_exchange_weak(writers, w)) {
257
1.16M
      return (writers == nullptr);
258
1.16M
    }
259
1.16M
  }
260
1.16M
}
261
262
bool WriteThread::LinkGroup(WriteGroup& write_group,
263
0
                            std::atomic<Writer*>* newest_writer) {
264
0
  assert(newest_writer != nullptr);
265
0
  Writer* leader = write_group.leader;
266
0
  Writer* last_writer = write_group.last_writer;
267
0
  Writer* w = last_writer;
268
0
  while (true) {
269
    // Unset link_newer pointers to make sure when we call
270
    // CreateMissingNewerLinks later it create all missing links.
271
0
    w->link_newer = nullptr;
272
0
    w->write_group = nullptr;
273
0
    if (w == leader) {
274
0
      break;
275
0
    }
276
0
    w = w->link_older;
277
0
  }
278
0
  Writer* newest = newest_writer->load(std::memory_order_relaxed);
279
0
  while (true) {
280
0
    leader->link_older = newest;
281
0
    if (newest_writer->compare_exchange_weak(newest, last_writer)) {
282
0
      return (newest == nullptr);
283
0
    }
284
0
  }
285
0
}
286
287
1.16M
void WriteThread::CreateMissingNewerLinks(Writer* head) {
288
1.16M
  while (true) {
289
1.16M
    Writer* next = head->link_older;
290
1.16M
    if (next == nullptr || next->link_newer != nullptr) {
291
1.16M
      assert(next == nullptr || next->link_newer == head);
292
1.16M
      break;
293
1.16M
    }
294
0
    next->link_newer = head;
295
0
    head = next;
296
0
  }
297
1.16M
}
298
299
0
void WriteThread::CompleteLeader(WriteGroup& write_group) {
300
0
  assert(write_group.size > 0);
301
0
  Writer* leader = write_group.leader;
302
0
  if (write_group.size == 1) {
303
0
    write_group.leader = nullptr;
304
0
    write_group.last_writer = nullptr;
305
0
  } else {
306
0
    assert(leader->link_newer != nullptr);
307
0
    leader->link_newer->link_older = nullptr;
308
0
    write_group.leader = leader->link_newer;
309
0
  }
310
0
  write_group.size -= 1;
311
0
  SetState(leader, STATE_COMPLETED);
312
0
}
313
314
0
void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) {
315
0
  assert(write_group.size > 1);
316
0
  assert(w != write_group.leader);
317
0
  if (w == write_group.last_writer) {
318
0
    w->link_older->link_newer = nullptr;
319
0
    write_group.last_writer = w->link_older;
320
0
  } else {
321
0
    w->link_older->link_newer = w->link_newer;
322
0
    w->link_newer->link_older = w->link_older;
323
0
  }
324
0
  write_group.size -= 1;
325
0
  SetState(w, STATE_COMPLETED);
326
0
}
327
328
0
void WriteThread::BeginWriteStall() {
329
0
  ++stall_begun_count_;
330
0
  LinkOne(&write_stall_dummy_, &newest_writer_);
331
332
  // Walk writer list until w->write_group != nullptr. The current write group
333
  // will not have a mix of slowdown/no_slowdown, so its ok to stop at that
334
  // point
335
0
  Writer* w = write_stall_dummy_.link_older;
336
0
  Writer* prev = &write_stall_dummy_;
337
0
  while (w != nullptr && w->write_group == nullptr) {
338
0
    if (w->no_slowdown) {
339
0
      prev->link_older = w->link_older;
340
0
      w->status = Status::Incomplete("Write stall");
341
0
      SetState(w, STATE_COMPLETED);
342
      // Only update `link_newer` if it's already set.
343
      // `CreateMissingNewerLinks()` will update the nullptr `link_newer` later,
344
      // which assumes the the first non-nullptr `link_newer` is the last
345
      // nullptr link in the writer list.
346
      // If `link_newer` is set here, `CreateMissingNewerLinks()` may stop
347
      // updating the whole list when it sees the first non nullptr link.
348
0
      if (prev->link_older && prev->link_older->link_newer) {
349
0
        prev->link_older->link_newer = prev;
350
0
      }
351
0
      w = prev->link_older;
352
0
    } else {
353
0
      prev = w;
354
0
      w = w->link_older;
355
0
    }
356
0
  }
357
0
}
358
359
0
void WriteThread::EndWriteStall() {
360
0
  MutexLock lock(&stall_mu_);
361
362
  // Unlink write_stall_dummy_ from the write queue. This will unblock
363
  // pending write threads to enqueue themselves
364
0
  assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
365
  // write_stall_dummy_.link_older can be nullptr only if LockWAL() has been
366
  // called.
367
0
  if (write_stall_dummy_.link_older) {
368
0
    write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer;
369
0
  }
370
0
  newest_writer_.exchange(write_stall_dummy_.link_older);
371
372
0
  ++stall_ended_count_;
373
374
  // Wake up writers
375
0
  stall_cv_.SignalAll();
376
0
}
377
378
0
uint64_t WriteThread::GetBegunCountOfOutstandingStall() {
379
0
  if (stall_begun_count_ > stall_ended_count_) {
380
    // Oustanding stall in queue
381
0
    assert(newest_writer_.load(std::memory_order_relaxed) ==
382
0
           &write_stall_dummy_);
383
0
    return stall_begun_count_;
384
0
  } else {
385
    // No stall in queue
386
0
    assert(newest_writer_.load(std::memory_order_relaxed) !=
387
0
           &write_stall_dummy_);
388
0
    return 0;
389
0
  }
390
0
}
391
392
0
void WriteThread::WaitForStallEndedCount(uint64_t stall_count) {
393
0
  MutexLock lock(&stall_mu_);
394
395
0
  while (stall_ended_count_ < stall_count) {
396
0
    stall_cv_.Wait();
397
0
  }
398
0
}
399
400
static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup");
401
1.16M
void WriteThread::JoinBatchGroup(Writer* w) {
402
1.16M
  TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w);
403
1.16M
  assert(w->batch != nullptr);
404
405
1.16M
  bool linked_as_leader = LinkOne(w, &newest_writer_);
406
407
1.16M
  w->CheckWriteEnqueuedCallback();
408
409
1.16M
  if (linked_as_leader) {
410
1.16M
    SetState(w, STATE_GROUP_LEADER);
411
1.16M
  }
412
413
1.16M
  TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
414
1.16M
  TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait2", w);
415
416
1.16M
  if (!linked_as_leader) {
417
    /**
418
     * Wait util:
419
     * 1) An existing leader pick us as the new leader when it finishes
420
     * 2) An existing leader pick us as its follewer and
421
     * 2.1) finishes the memtable writes on our behalf
422
     * 2.2) Or tell us to finish the memtable writes in pralallel
423
     * 3) (pipelined write) An existing leader pick us as its follower and
424
     *    finish book-keeping and WAL write for us, enqueue us as pending
425
     *    memtable writer, and
426
     * 3.1) we become memtable writer group leader, or
427
     * 3.2) an existing memtable writer group leader tell us to finish memtable
428
     *      writes in parallel.
429
     */
430
0
    TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w);
431
0
    AwaitState(w,
432
0
               STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
433
0
                   STATE_PARALLEL_MEMTABLE_CALLER |
434
0
                   STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
435
0
               &jbg_ctx);
436
0
    TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
437
0
  }
438
1.16M
}
439
440
size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
441
1.16M
                                            WriteGroup* write_group) {
442
1.16M
  assert(leader->link_older == nullptr);
443
1.16M
  assert(leader->batch != nullptr);
444
1.16M
  assert(write_group != nullptr);
445
446
1.16M
  size_t size = WriteBatchInternal::ByteSize(leader->batch);
447
448
  // Allow the group to grow up to a maximum size, but if the
449
  // original write is small, limit the growth so we do not slow
450
  // down the small write too much.
451
1.16M
  size_t max_size = max_write_batch_group_size_bytes;
452
1.16M
  const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
453
1.16M
  if (size <= min_batch_size_bytes) {
454
1.16M
    max_size = size + min_batch_size_bytes;
455
1.16M
  }
456
457
1.16M
  leader->write_group = write_group;
458
1.16M
  write_group->leader = leader;
459
1.16M
  write_group->last_writer = leader;
460
1.16M
  write_group->size = 1;
461
1.16M
  Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
462
463
  // This is safe regardless of any db mutex status of the caller. Previous
464
  // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
465
  // (they emptied the list and then we added ourself as leader) or had to
466
  // explicitly wake us up (the list was non-empty when we added ourself,
467
  // so we have already received our MarkJoined).
468
1.16M
  CreateMissingNewerLinks(newest_writer);
469
470
  // This comment illustrates how the rest of the function works using an
471
  // example. Notation:
472
  //
473
  // - Items are `Writer`s
474
  // - Items prefixed by "@" have been included in `write_group`
475
  // - Items prefixed by "*" have compatible options with `leader`, but have not
476
  //   been included in `write_group` yet
477
  // - Items after several spaces are in `r_list`. These have incompatible
478
  //   options with `leader` and are temporarily separated from the main list.
479
  //
480
  // Each line below depicts the state of the linked lists at the beginning of
481
  // an iteration of the while-loop.
482
  //
483
  // @leader, n1, *n2, n3, *newest_writer
484
  // @leader, *n2, n3, *newest_writer,    n1
485
  // @leader, @n2, n3, *newest_writer,    n1
486
  //
487
  // After the while-loop, the `r_list` is grafted back onto the main list.
488
  //
489
  // case A: no new `Writer`s arrived
490
  // @leader, @n2, @newest_writer,        n1, n3
491
  // @leader, @n2, @newest_writer, n1, n3
492
  //
493
  // case B: a new `Writer` (n4) arrived
494
  // @leader, @n2, @newest_writer, n4     n1, n3
495
  // @leader, @n2, @newest_writer, n1, n3, n4
496
497
  // Tricky. Iteration start (leader) is exclusive and finish
498
  // (newest_writer) is inclusive. Iteration goes from old to new.
499
1.16M
  Writer* w = leader;
500
  // write_group end
501
1.16M
  Writer* we = leader;
502
  // declare r_list
503
1.16M
  Writer* rb = nullptr;
504
1.16M
  Writer* re = nullptr;
505
506
1.16M
  while (w != newest_writer) {
507
0
    assert(w->link_newer);
508
0
    w = w->link_newer;
509
510
0
    if ((w->sync && !leader->sync) ||
511
        // Do not include a sync write into a batch handled by a non-sync write.
512
0
        (w->no_slowdown != leader->no_slowdown) ||
513
        // Do not mix writes that are ok with delays with the ones that request
514
        // fail on delays.
515
0
        (w->disable_wal != leader->disable_wal) ||
516
        // Do not mix writes that enable WAL with the ones whose WAL disabled.
517
0
        (w->protection_bytes_per_key != leader->protection_bytes_per_key) ||
518
        // Do not mix writes with different levels of integrity protection.
519
0
        (w->rate_limiter_priority != leader->rate_limiter_priority) ||
520
        // Do not mix writes with different rate limiter priorities.
521
0
        (w->batch == nullptr) ||
522
        // Do not include those writes with nullptr batch. Those are not writes
523
        // those are something else. They want to be alone
524
0
        (w->callback != nullptr && !w->callback->AllowWriteBatching()) ||
525
        // dont batch writes that don't want to be batched
526
0
        (size + WriteBatchInternal::ByteSize(w->batch) > max_size)
527
        // Do not make batch too big
528
0
    ) {
529
      // remove from list
530
0
      w->link_older->link_newer = w->link_newer;
531
0
      if (w->link_newer != nullptr) {
532
0
        w->link_newer->link_older = w->link_older;
533
0
      }
534
      // insert into r_list
535
0
      if (re == nullptr) {
536
0
        rb = re = w;
537
0
        w->link_older = nullptr;
538
0
      } else {
539
0
        w->link_older = re;
540
0
        re->link_newer = w;
541
0
        re = w;
542
0
      }
543
0
    } else {
544
      // grow up
545
0
      we = w;
546
0
      w->write_group = write_group;
547
0
      size += WriteBatchInternal::ByteSize(w->batch);
548
0
      write_group->last_writer = w;
549
0
      write_group->size++;
550
0
    }
551
0
  }
552
  // append r_list after write_group end
553
1.16M
  if (rb != nullptr) {
554
0
    rb->link_older = we;
555
0
    re->link_newer = nullptr;
556
0
    we->link_newer = rb;
557
0
    if (!newest_writer_.compare_exchange_weak(w, re)) {
558
0
      while (w->link_older != newest_writer) {
559
0
        w = w->link_older;
560
0
      }
561
0
      w->link_older = re;
562
0
    }
563
0
  }
564
565
1.16M
  TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w);
566
1.16M
  return size;
567
1.16M
}
568
569
void WriteThread::EnterAsMemTableWriter(Writer* leader,
570
0
                                        WriteGroup* write_group) {
571
0
  assert(leader != nullptr);
572
0
  assert(leader->link_older == nullptr);
573
0
  assert(leader->batch != nullptr);
574
0
  assert(write_group != nullptr);
575
576
0
  size_t size = WriteBatchInternal::ByteSize(leader->batch);
577
578
  // Allow the group to grow up to a maximum size, but if the
579
  // original write is small, limit the growth so we do not slow
580
  // down the small write too much.
581
0
  size_t max_size = max_write_batch_group_size_bytes;
582
0
  const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
583
0
  if (size <= min_batch_size_bytes) {
584
0
    max_size = size + min_batch_size_bytes;
585
0
  }
586
587
0
  leader->write_group = write_group;
588
0
  write_group->leader = leader;
589
0
  write_group->size = 1;
590
0
  Writer* last_writer = leader;
591
592
0
  if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) {
593
0
    Writer* newest_writer = newest_memtable_writer_.load();
594
0
    CreateMissingNewerLinks(newest_writer);
595
596
0
    Writer* w = leader;
597
0
    while (w != newest_writer) {
598
0
      assert(w->link_newer);
599
0
      w = w->link_newer;
600
601
0
      if (w->batch == nullptr) {
602
0
        break;
603
0
      }
604
605
0
      if (w->batch->HasMerge()) {
606
0
        break;
607
0
      }
608
609
0
      if (!allow_concurrent_memtable_write_) {
610
0
        auto batch_size = WriteBatchInternal::ByteSize(w->batch);
611
0
        if (size + batch_size > max_size) {
612
          // Do not make batch too big
613
0
          break;
614
0
        }
615
0
        size += batch_size;
616
0
      }
617
618
0
      w->write_group = write_group;
619
0
      last_writer = w;
620
0
      write_group->size++;
621
0
    }
622
0
  }
623
624
0
  write_group->last_writer = last_writer;
625
0
  write_group->last_sequence =
626
0
      last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1;
627
0
}
628
629
void WriteThread::ExitAsMemTableWriter(Writer* /*self*/,
630
0
                                       WriteGroup& write_group) {
631
0
  Writer* leader = write_group.leader;
632
0
  Writer* last_writer = write_group.last_writer;
633
634
0
  Writer* newest_writer = last_writer;
635
0
  if (!newest_memtable_writer_.compare_exchange_strong(newest_writer,
636
0
                                                       nullptr)) {
637
0
    CreateMissingNewerLinks(newest_writer);
638
0
    Writer* next_leader = last_writer->link_newer;
639
0
    assert(next_leader != nullptr);
640
0
    next_leader->link_older = nullptr;
641
0
    SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER);
642
0
  }
643
0
  Writer* w = leader;
644
0
  while (true) {
645
0
    if (!write_group.status.ok()) {
646
0
      w->status = write_group.status;
647
0
    }
648
0
    Writer* next = w->link_newer;
649
0
    if (w != leader) {
650
0
      SetState(w, STATE_COMPLETED);
651
0
    }
652
0
    if (w == last_writer) {
653
0
      break;
654
0
    }
655
0
    assert(next);
656
0
    w = next;
657
0
  }
658
  // Note that leader has to exit last, since it owns the write group.
659
0
  SetState(leader, STATE_COMPLETED);
660
0
}
661
662
0
void WriteThread::SetMemWritersEachStride(Writer* w) {
663
0
  WriteGroup* write_group = w->write_group;
664
0
  Writer* last_writer = write_group->last_writer;
665
666
  // The stride is the same for each writer in write_group, so w will
667
  // call the writers with the same number in write_group mod total size
668
0
  size_t stride = static_cast<size_t>(std::sqrt(write_group->size));
669
0
  size_t count = 0;
670
0
  while (w) {
671
0
    if (count++ % stride == 0) {
672
0
      SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
673
0
    }
674
0
    w = (w == last_writer) ? nullptr : w->link_newer;
675
0
  }
676
0
}
677
678
0
void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) {
679
0
  assert(write_group != nullptr);
680
0
  size_t group_size = write_group->size;
681
0
  write_group->running.store(group_size);
682
683
  // The minimum number to allow the group use parallel caller mode.
684
  // The number must no lower than 3;
685
0
  const size_t MinParallelSize = 20;
686
687
  // The group_size is too small, and there is no need to have
688
  // the parallel partial callers.
689
0
  if (group_size < MinParallelSize) {
690
0
    for (auto w : *write_group) {
691
0
      SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
692
0
    }
693
0
    return;
694
0
  }
695
696
  // The stride is equal to std::sqrt(group_size) which can minimize
697
  // the total number of leader SetSate.
698
  // Set the leader itself STATE_PARALLEL_MEMTABLE_WRITER, and set
699
  // (stride-1) writers to be STATE_PARALLEL_MEMTABLE_CALLER.
700
0
  size_t stride = static_cast<size_t>(std::sqrt(group_size));
701
0
  auto w = write_group->leader;
702
0
  SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
703
704
0
  for (size_t i = 1; i < stride; i++) {
705
0
    w = w->link_newer;
706
0
    SetState(w, STATE_PARALLEL_MEMTABLE_CALLER);
707
0
  }
708
709
  // After setting all STATE_PARALLEL_MEMTABLE_CALLER, the leader also
710
  // does the job as STATE_PARALLEL_MEMTABLE_CALLER.
711
0
  w = w->link_newer;
712
0
  SetMemWritersEachStride(w);
713
0
}
714
715
static WriteThread::AdaptationContext cpmtw_ctx(
716
    "CompleteParallelMemTableWriter");
717
// This method is called by both the leader and parallel followers
718
0
bool WriteThread::CompleteParallelMemTableWriter(Writer* w) {
719
0
  auto* write_group = w->write_group;
720
0
  if (!w->status.ok()) {
721
0
    std::lock_guard<std::mutex> guard(write_group->leader->StateMutex());
722
0
    write_group->status = w->status;
723
0
  }
724
725
0
  if (write_group->running-- > 1) {
726
    // we're not the last one
727
0
    AwaitState(w, STATE_COMPLETED, &cpmtw_ctx);
728
0
    return false;
729
0
  }
730
  // else we're the last parallel worker and should perform exit duties.
731
0
  w->status = write_group->status;
732
  // Callers of this function must ensure w->status is checked.
733
0
  write_group->status.PermitUncheckedError();
734
0
  return true;
735
0
}
736
737
0
void WriteThread::ExitAsBatchGroupFollower(Writer* w) {
738
0
  auto* write_group = w->write_group;
739
740
0
  assert(w->state == STATE_PARALLEL_MEMTABLE_WRITER);
741
0
  assert(write_group->status.ok());
742
0
  ExitAsBatchGroupLeader(*write_group, write_group->status);
743
0
  assert(w->status.ok());
744
0
  assert(w->state == STATE_COMPLETED);
745
0
  SetState(write_group->leader, STATE_COMPLETED);
746
0
}
747
748
static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader");
749
void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
750
1.16M
                                         Status& status) {
751
1.16M
  TEST_SYNC_POINT_CALLBACK("WriteThread::ExitAsBatchGroupLeader:Start",
752
1.16M
                           &write_group);
753
754
1.16M
  Writer* leader = write_group.leader;
755
1.16M
  Writer* last_writer = write_group.last_writer;
756
1.16M
  assert(leader->link_older == nullptr);
757
758
  // If status is non-ok already, then write_group.status won't have the chance
759
  // of being propagated to caller.
760
1.16M
  if (!status.ok()) {
761
0
    write_group.status.PermitUncheckedError();
762
0
  }
763
764
  // Propagate memtable write error to the whole group.
765
1.16M
  if (status.ok() && !write_group.status.ok()) {
766
0
    status = write_group.status;
767
0
  }
768
769
1.16M
  if (enable_pipelined_write_) {
770
    // We insert a dummy Writer right before our current write_group. This
771
    // allows us to unlink our write_group without the risk that a subsequent
772
    // writer becomes a new leader and might overtake us and add itself to the
773
    // memtable-writer-list before we can do so. This ensures that writers are
774
    // added to the memtable-writer-list in the exact same order in which they
775
    // were in the newest_writer list.
776
    // This must happen before completing the writers from our group to prevent
777
    // a race where the owning thread of one of these writers can start a new
778
    // write operation.
779
0
    Writer dummy;
780
0
    Writer* head = newest_writer_.load(std::memory_order_acquire);
781
0
    if (head != last_writer ||
782
0
        !newest_writer_.compare_exchange_strong(head, &dummy)) {
783
      // Either last_writer wasn't the head during the load(), or it was the
784
      // head during the load() but somebody else pushed onto the list before
785
      // we did the compare_exchange_strong (causing it to fail). In the latter
786
      // case compare_exchange_strong has the effect of re-reading its first
787
      // param (head). No need to retry a failing CAS, because only a departing
788
      // leader (which we are at the moment) can remove nodes from the list.
789
0
      assert(head != last_writer);
790
791
      // After walking link_older starting from head (if not already done) we
792
      // will be able to traverse w->link_newer below.
793
0
      CreateMissingNewerLinks(head);
794
0
      assert(last_writer->link_newer != nullptr);
795
0
      last_writer->link_newer->link_older = &dummy;
796
0
      dummy.link_newer = last_writer->link_newer;
797
0
    }
798
799
    // Complete writers that don't write to memtable
800
0
    for (Writer* w = last_writer; w != leader;) {
801
0
      Writer* next = w->link_older;
802
0
      w->status = status;
803
0
      if (!w->ShouldWriteToMemtable()) {
804
0
        CompleteFollower(w, write_group);
805
0
      }
806
0
      w = next;
807
0
    }
808
0
    if (!leader->ShouldWriteToMemtable()) {
809
0
      CompleteLeader(write_group);
810
0
    }
811
812
0
    TEST_SYNC_POINT_CALLBACK(
813
0
        "WriteThread::ExitAsBatchGroupLeader:AfterCompleteWriters",
814
0
        &write_group);
815
816
    // Link the remaining of the group to memtable writer list.
817
    // We have to link our group to memtable writer queue before wake up the
818
    // next leader or set newest_writer_ to null, otherwise the next leader
819
    // can run ahead of us and link to memtable writer queue before we do.
820
0
    if (write_group.size > 0) {
821
0
      if (LinkGroup(write_group, &newest_memtable_writer_)) {
822
        // The leader can now be different from current writer.
823
0
        SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER);
824
0
      }
825
0
    }
826
827
    // Unlink the dummy writer from the list and identify the new leader
828
0
    head = newest_writer_.load(std::memory_order_acquire);
829
0
    if (head != &dummy ||
830
0
        !newest_writer_.compare_exchange_strong(head, nullptr)) {
831
0
      CreateMissingNewerLinks(head);
832
0
      Writer* new_leader = dummy.link_newer;
833
0
      assert(new_leader != nullptr);
834
0
      new_leader->link_older = nullptr;
835
0
      SetState(new_leader, STATE_GROUP_LEADER);
836
0
    }
837
838
0
    AwaitState(leader,
839
0
               STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_CALLER |
840
0
                   STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
841
0
               &eabgl_ctx);
842
1.16M
  } else {
843
1.16M
    Writer* head = newest_writer_.load(std::memory_order_acquire);
844
1.16M
    if (head != last_writer ||
845
1.16M
        !newest_writer_.compare_exchange_strong(head, nullptr)) {
846
      // Either last_writer wasn't the head during the load(), or it was the
847
      // head during the load() but somebody else pushed onto the list before
848
      // we did the compare_exchange_strong (causing it to fail).  In the
849
      // latter case compare_exchange_strong has the effect of re-reading
850
      // its first param (head).  No need to retry a failing CAS, because
851
      // only a departing leader (which we are at the moment) can remove
852
      // nodes from the list.
853
0
      assert(head != last_writer);
854
855
      // After walking link_older starting from head (if not already done)
856
      // we will be able to traverse w->link_newer below. This function
857
      // can only be called from an active leader, only a leader can
858
      // clear newest_writer_, we didn't, and only a clear newest_writer_
859
      // could cause the next leader to start their work without a call
860
      // to MarkJoined, so we can definitely conclude that no other leader
861
      // work is going on here (with or without db mutex).
862
0
      CreateMissingNewerLinks(head);
863
0
      assert(last_writer->link_newer != nullptr);
864
0
      assert(last_writer->link_newer->link_older == last_writer);
865
0
      last_writer->link_newer->link_older = nullptr;
866
867
      // Next leader didn't self-identify, because newest_writer_ wasn't
868
      // nullptr when they enqueued (we were definitely enqueued before them
869
      // and are still in the list).  That means leader handoff occurs when
870
      // we call MarkJoined
871
0
      SetState(last_writer->link_newer, STATE_GROUP_LEADER);
872
0
    }
873
    // else nobody else was waiting, although there might already be a new
874
    // leader now
875
876
1.16M
    while (last_writer != leader) {
877
0
      assert(last_writer);
878
0
      last_writer->status = status;
879
      // we need to read link_older before calling SetState, because as soon
880
      // as it is marked committed the other thread's Await may return and
881
      // deallocate the Writer.
882
0
      auto next = last_writer->link_older;
883
0
      SetState(last_writer, STATE_COMPLETED);
884
885
0
      last_writer = next;
886
0
    }
887
1.16M
  }
888
1.16M
}
889
890
static WriteThread::AdaptationContext eu_ctx("EnterUnbatched");
891
869
void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
892
869
  assert(w != nullptr && w->batch == nullptr);
893
869
  mu->Unlock();
894
869
  bool linked_as_leader = LinkOne(w, &newest_writer_);
895
869
  if (!linked_as_leader) {
896
0
    TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
897
    // Last leader will not pick us as a follower since our batch is nullptr
898
0
    AwaitState(w, STATE_GROUP_LEADER, &eu_ctx);
899
0
  }
900
869
  if (enable_pipelined_write_) {
901
0
    WaitForMemTableWriters();
902
0
  }
903
869
  mu->Lock();
904
869
}
905
906
869
void WriteThread::ExitUnbatched(Writer* w) {
907
869
  assert(w != nullptr);
908
869
  Writer* newest_writer = w;
909
869
  if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) {
910
0
    CreateMissingNewerLinks(newest_writer);
911
0
    Writer* next_leader = w->link_newer;
912
0
    assert(next_leader != nullptr);
913
0
    next_leader->link_older = nullptr;
914
0
    SetState(next_leader, STATE_GROUP_LEADER);
915
0
  }
916
869
}
917
918
static WriteThread::AdaptationContext wfmw_ctx("WaitForMemTableWriters");
919
0
void WriteThread::WaitForMemTableWriters() {
920
0
  assert(enable_pipelined_write_);
921
0
  if (newest_memtable_writer_.load() == nullptr) {
922
0
    return;
923
0
  }
924
0
  Writer w;
925
0
  if (!LinkOne(&w, &newest_memtable_writer_)) {
926
0
    AwaitState(&w, STATE_MEMTABLE_WRITER_LEADER, &wfmw_ctx);
927
0
  }
928
0
  newest_memtable_writer_.store(nullptr);
929
0
}
930
931
}  // namespace ROCKSDB_NAMESPACE