/src/rocksdb/db/write_thread.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "db/write_thread.h" |
7 | | |
8 | | #include <chrono> |
9 | | #include <thread> |
10 | | |
11 | | #include "db/column_family.h" |
12 | | #include "monitoring/perf_context_imp.h" |
13 | | #include "port/port.h" |
14 | | #include "test_util/sync_point.h" |
15 | | #include "util/random.h" |
16 | | |
17 | | namespace ROCKSDB_NAMESPACE { |
18 | | |
19 | | WriteThread::WriteThread(const ImmutableDBOptions& db_options) |
20 | | : max_yield_usec_(db_options.enable_write_thread_adaptive_yield |
21 | | ? db_options.write_thread_max_yield_usec |
22 | | : 0), |
23 | | slow_yield_usec_(db_options.write_thread_slow_yield_usec), |
24 | | allow_concurrent_memtable_write_( |
25 | | db_options.allow_concurrent_memtable_write), |
26 | | enable_pipelined_write_(db_options.enable_pipelined_write), |
27 | | max_write_batch_group_size_bytes( |
28 | | db_options.max_write_batch_group_size_bytes), |
29 | | newest_writer_(nullptr), |
30 | | newest_memtable_writer_(nullptr), |
31 | | last_sequence_(0), |
32 | | write_stall_dummy_(), |
33 | | stall_mu_(), |
34 | 22.1k | stall_cv_(&stall_mu_) {} |
35 | | |
36 | 0 | uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) { |
37 | | // We're going to block. Lazily create the mutex. We guarantee |
38 | | // propagation of this construction to the waker via the |
39 | | // STATE_LOCKED_WAITING state. The waker won't try to touch the mutex |
40 | | // or the condvar unless they CAS away the STATE_LOCKED_WAITING that |
41 | | // we install below. |
42 | 0 | w->CreateMutex(); |
43 | |
|
44 | 0 | auto state = w->state.load(std::memory_order_acquire); |
45 | 0 | assert(state != STATE_LOCKED_WAITING); |
46 | 0 | if ((state & goal_mask) == 0 && |
47 | 0 | w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)) { |
48 | | // we have permission (and an obligation) to use StateMutex |
49 | 0 | std::unique_lock<std::mutex> guard(w->StateMutex()); |
50 | 0 | w->StateCV().wait(guard, [w] { |
51 | 0 | return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING; |
52 | 0 | }); |
53 | 0 | state = w->state.load(std::memory_order_relaxed); |
54 | 0 | } |
55 | | // else tricky. Goal is met or CAS failed. In the latter case the waker |
56 | | // must have changed the state, and compare_exchange_strong has updated |
57 | | // our local variable with the new one. At the moment WriteThread never |
58 | | // waits for a transition across intermediate states, so we know that |
59 | | // since a state change has occurred the goal must have been met. |
60 | 0 | assert((state & goal_mask) != 0); |
61 | 0 | return state; |
62 | 0 | } |
63 | | |
64 | | uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask, |
65 | 0 | AdaptationContext* ctx) { |
66 | 0 | uint8_t state = 0; |
67 | | |
68 | | // 1. Busy loop using "pause" for 1 micro sec |
69 | | // 2. Else SOMETIMES busy loop using "yield" for 100 micro sec (default) |
70 | | // 3. Else blocking wait |
71 | | |
72 | | // On a modern Xeon each loop takes about 7 nanoseconds (most of which |
73 | | // is the effect of the pause instruction), so 200 iterations is a bit |
74 | | // more than a microsecond. This is long enough that waits longer than |
75 | | // this can amortize the cost of accessing the clock and yielding. |
76 | 0 | for (uint32_t tries = 0; tries < 200; ++tries) { |
77 | 0 | state = w->state.load(std::memory_order_acquire); |
78 | 0 | if ((state & goal_mask) != 0) { |
79 | 0 | return state; |
80 | 0 | } |
81 | 0 | port::AsmVolatilePause(); |
82 | 0 | } |
83 | | |
84 | | // This is below the fast path, so that the stat is zero when all writes are |
85 | | // from the same thread. |
86 | 0 | PERF_TIMER_FOR_WAIT_GUARD(write_thread_wait_nanos); |
87 | | |
88 | | // If we're only going to end up waiting a short period of time, |
89 | | // it can be a lot more efficient to call std::this_thread::yield() |
90 | | // in a loop than to block in StateMutex(). For reference, on my 4.0 |
91 | | // SELinux test server with support for syscall auditing enabled, the |
92 | | // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is |
93 | | // 2.7 usec, and the average is more like 10 usec. That can be a big |
94 | | // drag on RockDB's single-writer design. Of course, spinning is a |
95 | | // bad idea if other threads are waiting to run or if we're going to |
96 | | // wait for a long time. How do we decide? |
97 | | // |
98 | | // We break waiting into 3 categories: short-uncontended, |
99 | | // short-contended, and long. If we had an oracle, then we would always |
100 | | // spin for short-uncontended, always block for long, and our choice for |
101 | | // short-contended might depend on whether we were trying to optimize |
102 | | // RocksDB throughput or avoid being greedy with system resources. |
103 | | // |
104 | | // Bucketing into short or long is easy by measuring elapsed time. |
105 | | // Differentiating short-uncontended from short-contended is a bit |
106 | | // trickier, but not too bad. We could look for involuntary context |
107 | | // switches using getrusage(RUSAGE_THREAD, ..), but it's less work |
108 | | // (portability code and CPU) to just look for yield calls that take |
109 | | // longer than we expect. sched_yield() doesn't actually result in any |
110 | | // context switch overhead if there are no other runnable processes |
111 | | // on the current core, in which case it usually takes less than |
112 | | // a microsecond. |
113 | | // |
114 | | // There are two primary tunables here: the threshold between "short" |
115 | | // and "long" waits, and the threshold at which we suspect that a yield |
116 | | // is slow enough to indicate we should probably block. If these |
117 | | // thresholds are chosen well then CPU-bound workloads that don't |
118 | | // have more threads than cores will experience few context switches |
119 | | // (voluntary or involuntary), and the total number of context switches |
120 | | // (voluntary and involuntary) will not be dramatically larger (maybe |
121 | | // 2x) than the number of voluntary context switches that occur when |
122 | | // --max_yield_wait_micros=0. |
123 | | // |
124 | | // There's another constant, which is the number of slow yields we will |
125 | | // tolerate before reversing our previous decision. Solitary slow |
126 | | // yields are pretty common (low-priority small jobs ready to run), |
127 | | // so this should be at least 2. We set this conservatively to 3 so |
128 | | // that we can also immediately schedule a ctx adaptation, rather than |
129 | | // waiting for the next update_ctx. |
130 | |
|
131 | 0 | const size_t kMaxSlowYieldsWhileSpinning = 3; |
132 | | |
133 | | // Whether the yield approach has any credit in this context. The credit is |
134 | | // added by yield being succesfull before timing out, and decreased otherwise. |
135 | 0 | auto& yield_credit = ctx->value; |
136 | | // Update the yield_credit based on sample runs or right after a hard failure |
137 | 0 | bool update_ctx = false; |
138 | | // Should we reinforce the yield credit |
139 | 0 | bool would_spin_again = false; |
140 | | // The samling base for updating the yeild credit. The sampling rate would be |
141 | | // 1/sampling_base. |
142 | 0 | const int sampling_base = 256; |
143 | |
|
144 | 0 | if (max_yield_usec_ > 0) { |
145 | 0 | update_ctx = Random::GetTLSInstance()->OneIn(sampling_base); |
146 | |
|
147 | 0 | if (update_ctx || yield_credit.load(std::memory_order_relaxed) >= 0) { |
148 | | // we're updating the adaptation statistics, or spinning has > |
149 | | // 50% chance of being shorter than max_yield_usec_ and causing no |
150 | | // involuntary context switches |
151 | 0 | auto spin_begin = std::chrono::steady_clock::now(); |
152 | | |
153 | | // this variable doesn't include the final yield (if any) that |
154 | | // causes the goal to be met |
155 | 0 | size_t slow_yield_count = 0; |
156 | |
|
157 | 0 | auto iter_begin = spin_begin; |
158 | 0 | while ((iter_begin - spin_begin) <= |
159 | 0 | std::chrono::microseconds(max_yield_usec_)) { |
160 | 0 | std::this_thread::yield(); |
161 | |
|
162 | 0 | state = w->state.load(std::memory_order_acquire); |
163 | 0 | if ((state & goal_mask) != 0) { |
164 | | // success |
165 | 0 | would_spin_again = true; |
166 | 0 | break; |
167 | 0 | } |
168 | | |
169 | 0 | auto now = std::chrono::steady_clock::now(); |
170 | 0 | if (now == iter_begin || |
171 | 0 | now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) { |
172 | | // conservatively count it as a slow yield if our clock isn't |
173 | | // accurate enough to measure the yield duration |
174 | 0 | ++slow_yield_count; |
175 | 0 | if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) { |
176 | | // Not just one ivcsw, but several. Immediately update yield_credit |
177 | | // and fall back to blocking |
178 | 0 | update_ctx = true; |
179 | 0 | break; |
180 | 0 | } |
181 | 0 | } |
182 | 0 | iter_begin = now; |
183 | 0 | } |
184 | 0 | } |
185 | 0 | } |
186 | |
|
187 | 0 | if ((state & goal_mask) == 0) { |
188 | 0 | TEST_SYNC_POINT_CALLBACK("WriteThread::AwaitState:BlockingWaiting", w); |
189 | 0 | state = BlockingAwaitState(w, goal_mask); |
190 | 0 | } |
191 | |
|
192 | 0 | if (update_ctx) { |
193 | | // Since our update is sample based, it is ok if a thread overwrites the |
194 | | // updates by other threads. Thus the update does not have to be atomic. |
195 | 0 | auto v = yield_credit.load(std::memory_order_relaxed); |
196 | | // fixed point exponential decay with decay constant 1/1024, with +1 |
197 | | // and -1 scaled to avoid overflow for int32_t |
198 | | // |
199 | | // On each update the positive credit is decayed by a facor of 1/1024 (i.e., |
200 | | // 0.1%). If the sampled yield was successful, the credit is also increased |
201 | | // by X. Setting X=2^17 ensures that the credit never exceeds |
202 | | // 2^17*2^10=2^27, which is lower than 2^31 the upperbound of int32_t. Same |
203 | | // logic applies to negative credits. |
204 | 0 | v = v - (v / 1024) + (would_spin_again ? 1 : -1) * 131072; |
205 | 0 | yield_credit.store(v, std::memory_order_relaxed); |
206 | 0 | } |
207 | |
|
208 | 0 | assert((state & goal_mask) != 0); |
209 | 0 | return state; |
210 | 0 | } |
211 | | |
212 | 1.16M | void WriteThread::SetState(Writer* w, uint8_t new_state) { |
213 | 1.16M | assert(w); |
214 | 1.16M | auto state = w->state.load(std::memory_order_acquire); |
215 | 1.16M | if (state == STATE_LOCKED_WAITING || |
216 | 1.16M | !w->state.compare_exchange_strong(state, new_state)) { |
217 | 0 | assert(state == STATE_LOCKED_WAITING); |
218 | |
|
219 | 0 | std::lock_guard<std::mutex> guard(w->StateMutex()); |
220 | 0 | assert(w->state.load(std::memory_order_relaxed) != new_state); |
221 | 0 | w->state.store(new_state, std::memory_order_relaxed); |
222 | 0 | w->StateCV().notify_one(); |
223 | 0 | } |
224 | 1.16M | } |
225 | | |
226 | 1.16M | bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) { |
227 | 1.16M | assert(newest_writer != nullptr); |
228 | 1.16M | assert(w->state == STATE_INIT); |
229 | 1.16M | Writer* writers = newest_writer->load(std::memory_order_relaxed); |
230 | 1.16M | while (true) { |
231 | 1.16M | assert(writers != w); |
232 | | // If write stall in effect, and w->no_slowdown is not true, |
233 | | // block here until stall is cleared. If its true, then return |
234 | | // immediately |
235 | 1.16M | if (writers == &write_stall_dummy_) { |
236 | 0 | if (w->no_slowdown) { |
237 | 0 | w->status = Status::Incomplete("Write stall"); |
238 | 0 | SetState(w, STATE_COMPLETED); |
239 | 0 | return false; |
240 | 0 | } |
241 | | // Since no_slowdown is false, wait here to be notified of the write |
242 | | // stall clearing |
243 | 0 | { |
244 | 0 | MutexLock lock(&stall_mu_); |
245 | 0 | writers = newest_writer->load(std::memory_order_relaxed); |
246 | 0 | if (writers == &write_stall_dummy_) { |
247 | 0 | TEST_SYNC_POINT_CALLBACK("WriteThread::WriteStall::Wait", w); |
248 | 0 | stall_cv_.Wait(); |
249 | | // Load newest_writers_ again since it may have changed |
250 | 0 | writers = newest_writer->load(std::memory_order_relaxed); |
251 | 0 | continue; |
252 | 0 | } |
253 | 0 | } |
254 | 0 | } |
255 | 1.16M | w->link_older = writers; |
256 | 1.16M | if (newest_writer->compare_exchange_weak(writers, w)) { |
257 | 1.16M | return (writers == nullptr); |
258 | 1.16M | } |
259 | 1.16M | } |
260 | 1.16M | } |
261 | | |
262 | | bool WriteThread::LinkGroup(WriteGroup& write_group, |
263 | 0 | std::atomic<Writer*>* newest_writer) { |
264 | 0 | assert(newest_writer != nullptr); |
265 | 0 | Writer* leader = write_group.leader; |
266 | 0 | Writer* last_writer = write_group.last_writer; |
267 | 0 | Writer* w = last_writer; |
268 | 0 | while (true) { |
269 | | // Unset link_newer pointers to make sure when we call |
270 | | // CreateMissingNewerLinks later it create all missing links. |
271 | 0 | w->link_newer = nullptr; |
272 | 0 | w->write_group = nullptr; |
273 | 0 | if (w == leader) { |
274 | 0 | break; |
275 | 0 | } |
276 | 0 | w = w->link_older; |
277 | 0 | } |
278 | 0 | Writer* newest = newest_writer->load(std::memory_order_relaxed); |
279 | 0 | while (true) { |
280 | 0 | leader->link_older = newest; |
281 | 0 | if (newest_writer->compare_exchange_weak(newest, last_writer)) { |
282 | 0 | return (newest == nullptr); |
283 | 0 | } |
284 | 0 | } |
285 | 0 | } |
286 | | |
287 | 1.16M | void WriteThread::CreateMissingNewerLinks(Writer* head) { |
288 | 1.16M | while (true) { |
289 | 1.16M | Writer* next = head->link_older; |
290 | 1.16M | if (next == nullptr || next->link_newer != nullptr) { |
291 | 1.16M | assert(next == nullptr || next->link_newer == head); |
292 | 1.16M | break; |
293 | 1.16M | } |
294 | 0 | next->link_newer = head; |
295 | 0 | head = next; |
296 | 0 | } |
297 | 1.16M | } |
298 | | |
299 | 0 | void WriteThread::CompleteLeader(WriteGroup& write_group) { |
300 | 0 | assert(write_group.size > 0); |
301 | 0 | Writer* leader = write_group.leader; |
302 | 0 | if (write_group.size == 1) { |
303 | 0 | write_group.leader = nullptr; |
304 | 0 | write_group.last_writer = nullptr; |
305 | 0 | } else { |
306 | 0 | assert(leader->link_newer != nullptr); |
307 | 0 | leader->link_newer->link_older = nullptr; |
308 | 0 | write_group.leader = leader->link_newer; |
309 | 0 | } |
310 | 0 | write_group.size -= 1; |
311 | 0 | SetState(leader, STATE_COMPLETED); |
312 | 0 | } |
313 | | |
314 | 0 | void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) { |
315 | 0 | assert(write_group.size > 1); |
316 | 0 | assert(w != write_group.leader); |
317 | 0 | if (w == write_group.last_writer) { |
318 | 0 | w->link_older->link_newer = nullptr; |
319 | 0 | write_group.last_writer = w->link_older; |
320 | 0 | } else { |
321 | 0 | w->link_older->link_newer = w->link_newer; |
322 | 0 | w->link_newer->link_older = w->link_older; |
323 | 0 | } |
324 | 0 | write_group.size -= 1; |
325 | 0 | SetState(w, STATE_COMPLETED); |
326 | 0 | } |
327 | | |
328 | 0 | void WriteThread::BeginWriteStall() { |
329 | 0 | ++stall_begun_count_; |
330 | 0 | LinkOne(&write_stall_dummy_, &newest_writer_); |
331 | | |
332 | | // Walk writer list until w->write_group != nullptr. The current write group |
333 | | // will not have a mix of slowdown/no_slowdown, so its ok to stop at that |
334 | | // point |
335 | 0 | Writer* w = write_stall_dummy_.link_older; |
336 | 0 | Writer* prev = &write_stall_dummy_; |
337 | 0 | while (w != nullptr && w->write_group == nullptr) { |
338 | 0 | if (w->no_slowdown) { |
339 | 0 | prev->link_older = w->link_older; |
340 | 0 | w->status = Status::Incomplete("Write stall"); |
341 | 0 | SetState(w, STATE_COMPLETED); |
342 | | // Only update `link_newer` if it's already set. |
343 | | // `CreateMissingNewerLinks()` will update the nullptr `link_newer` later, |
344 | | // which assumes the the first non-nullptr `link_newer` is the last |
345 | | // nullptr link in the writer list. |
346 | | // If `link_newer` is set here, `CreateMissingNewerLinks()` may stop |
347 | | // updating the whole list when it sees the first non nullptr link. |
348 | 0 | if (prev->link_older && prev->link_older->link_newer) { |
349 | 0 | prev->link_older->link_newer = prev; |
350 | 0 | } |
351 | 0 | w = prev->link_older; |
352 | 0 | } else { |
353 | 0 | prev = w; |
354 | 0 | w = w->link_older; |
355 | 0 | } |
356 | 0 | } |
357 | 0 | } |
358 | | |
359 | 0 | void WriteThread::EndWriteStall() { |
360 | 0 | MutexLock lock(&stall_mu_); |
361 | | |
362 | | // Unlink write_stall_dummy_ from the write queue. This will unblock |
363 | | // pending write threads to enqueue themselves |
364 | 0 | assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_); |
365 | | // write_stall_dummy_.link_older can be nullptr only if LockWAL() has been |
366 | | // called. |
367 | 0 | if (write_stall_dummy_.link_older) { |
368 | 0 | write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer; |
369 | 0 | } |
370 | 0 | newest_writer_.exchange(write_stall_dummy_.link_older); |
371 | |
|
372 | 0 | ++stall_ended_count_; |
373 | | |
374 | | // Wake up writers |
375 | 0 | stall_cv_.SignalAll(); |
376 | 0 | } |
377 | | |
378 | 0 | uint64_t WriteThread::GetBegunCountOfOutstandingStall() { |
379 | 0 | if (stall_begun_count_ > stall_ended_count_) { |
380 | | // Oustanding stall in queue |
381 | 0 | assert(newest_writer_.load(std::memory_order_relaxed) == |
382 | 0 | &write_stall_dummy_); |
383 | 0 | return stall_begun_count_; |
384 | 0 | } else { |
385 | | // No stall in queue |
386 | 0 | assert(newest_writer_.load(std::memory_order_relaxed) != |
387 | 0 | &write_stall_dummy_); |
388 | 0 | return 0; |
389 | 0 | } |
390 | 0 | } |
391 | | |
392 | 0 | void WriteThread::WaitForStallEndedCount(uint64_t stall_count) { |
393 | 0 | MutexLock lock(&stall_mu_); |
394 | |
|
395 | 0 | while (stall_ended_count_ < stall_count) { |
396 | 0 | stall_cv_.Wait(); |
397 | 0 | } |
398 | 0 | } |
399 | | |
400 | | static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup"); |
401 | 1.16M | void WriteThread::JoinBatchGroup(Writer* w) { |
402 | 1.16M | TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w); |
403 | 1.16M | assert(w->batch != nullptr); |
404 | | |
405 | 1.16M | bool linked_as_leader = LinkOne(w, &newest_writer_); |
406 | | |
407 | 1.16M | w->CheckWriteEnqueuedCallback(); |
408 | | |
409 | 1.16M | if (linked_as_leader) { |
410 | 1.16M | SetState(w, STATE_GROUP_LEADER); |
411 | 1.16M | } |
412 | | |
413 | 1.16M | TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w); |
414 | 1.16M | TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait2", w); |
415 | | |
416 | 1.16M | if (!linked_as_leader) { |
417 | | /** |
418 | | * Wait util: |
419 | | * 1) An existing leader pick us as the new leader when it finishes |
420 | | * 2) An existing leader pick us as its follewer and |
421 | | * 2.1) finishes the memtable writes on our behalf |
422 | | * 2.2) Or tell us to finish the memtable writes in pralallel |
423 | | * 3) (pipelined write) An existing leader pick us as its follower and |
424 | | * finish book-keeping and WAL write for us, enqueue us as pending |
425 | | * memtable writer, and |
426 | | * 3.1) we become memtable writer group leader, or |
427 | | * 3.2) an existing memtable writer group leader tell us to finish memtable |
428 | | * writes in parallel. |
429 | | */ |
430 | 0 | TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w); |
431 | 0 | AwaitState(w, |
432 | 0 | STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER | |
433 | 0 | STATE_PARALLEL_MEMTABLE_CALLER | |
434 | 0 | STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, |
435 | 0 | &jbg_ctx); |
436 | 0 | TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w); |
437 | 0 | } |
438 | 1.16M | } |
439 | | |
440 | | size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader, |
441 | 1.16M | WriteGroup* write_group) { |
442 | 1.16M | assert(leader->link_older == nullptr); |
443 | 1.16M | assert(leader->batch != nullptr); |
444 | 1.16M | assert(write_group != nullptr); |
445 | | |
446 | 1.16M | size_t size = WriteBatchInternal::ByteSize(leader->batch); |
447 | | |
448 | | // Allow the group to grow up to a maximum size, but if the |
449 | | // original write is small, limit the growth so we do not slow |
450 | | // down the small write too much. |
451 | 1.16M | size_t max_size = max_write_batch_group_size_bytes; |
452 | 1.16M | const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8; |
453 | 1.16M | if (size <= min_batch_size_bytes) { |
454 | 1.16M | max_size = size + min_batch_size_bytes; |
455 | 1.16M | } |
456 | | |
457 | 1.16M | leader->write_group = write_group; |
458 | 1.16M | write_group->leader = leader; |
459 | 1.16M | write_group->last_writer = leader; |
460 | 1.16M | write_group->size = 1; |
461 | 1.16M | Writer* newest_writer = newest_writer_.load(std::memory_order_acquire); |
462 | | |
463 | | // This is safe regardless of any db mutex status of the caller. Previous |
464 | | // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks |
465 | | // (they emptied the list and then we added ourself as leader) or had to |
466 | | // explicitly wake us up (the list was non-empty when we added ourself, |
467 | | // so we have already received our MarkJoined). |
468 | 1.16M | CreateMissingNewerLinks(newest_writer); |
469 | | |
470 | | // This comment illustrates how the rest of the function works using an |
471 | | // example. Notation: |
472 | | // |
473 | | // - Items are `Writer`s |
474 | | // - Items prefixed by "@" have been included in `write_group` |
475 | | // - Items prefixed by "*" have compatible options with `leader`, but have not |
476 | | // been included in `write_group` yet |
477 | | // - Items after several spaces are in `r_list`. These have incompatible |
478 | | // options with `leader` and are temporarily separated from the main list. |
479 | | // |
480 | | // Each line below depicts the state of the linked lists at the beginning of |
481 | | // an iteration of the while-loop. |
482 | | // |
483 | | // @leader, n1, *n2, n3, *newest_writer |
484 | | // @leader, *n2, n3, *newest_writer, n1 |
485 | | // @leader, @n2, n3, *newest_writer, n1 |
486 | | // |
487 | | // After the while-loop, the `r_list` is grafted back onto the main list. |
488 | | // |
489 | | // case A: no new `Writer`s arrived |
490 | | // @leader, @n2, @newest_writer, n1, n3 |
491 | | // @leader, @n2, @newest_writer, n1, n3 |
492 | | // |
493 | | // case B: a new `Writer` (n4) arrived |
494 | | // @leader, @n2, @newest_writer, n4 n1, n3 |
495 | | // @leader, @n2, @newest_writer, n1, n3, n4 |
496 | | |
497 | | // Tricky. Iteration start (leader) is exclusive and finish |
498 | | // (newest_writer) is inclusive. Iteration goes from old to new. |
499 | 1.16M | Writer* w = leader; |
500 | | // write_group end |
501 | 1.16M | Writer* we = leader; |
502 | | // declare r_list |
503 | 1.16M | Writer* rb = nullptr; |
504 | 1.16M | Writer* re = nullptr; |
505 | | |
506 | 1.16M | while (w != newest_writer) { |
507 | 0 | assert(w->link_newer); |
508 | 0 | w = w->link_newer; |
509 | |
|
510 | 0 | if ((w->sync && !leader->sync) || |
511 | | // Do not include a sync write into a batch handled by a non-sync write. |
512 | 0 | (w->no_slowdown != leader->no_slowdown) || |
513 | | // Do not mix writes that are ok with delays with the ones that request |
514 | | // fail on delays. |
515 | 0 | (w->disable_wal != leader->disable_wal) || |
516 | | // Do not mix writes that enable WAL with the ones whose WAL disabled. |
517 | 0 | (w->protection_bytes_per_key != leader->protection_bytes_per_key) || |
518 | | // Do not mix writes with different levels of integrity protection. |
519 | 0 | (w->rate_limiter_priority != leader->rate_limiter_priority) || |
520 | | // Do not mix writes with different rate limiter priorities. |
521 | 0 | (w->batch == nullptr) || |
522 | | // Do not include those writes with nullptr batch. Those are not writes |
523 | | // those are something else. They want to be alone |
524 | 0 | (w->callback != nullptr && !w->callback->AllowWriteBatching()) || |
525 | | // dont batch writes that don't want to be batched |
526 | 0 | (size + WriteBatchInternal::ByteSize(w->batch) > max_size) |
527 | | // Do not make batch too big |
528 | 0 | ) { |
529 | | // remove from list |
530 | 0 | w->link_older->link_newer = w->link_newer; |
531 | 0 | if (w->link_newer != nullptr) { |
532 | 0 | w->link_newer->link_older = w->link_older; |
533 | 0 | } |
534 | | // insert into r_list |
535 | 0 | if (re == nullptr) { |
536 | 0 | rb = re = w; |
537 | 0 | w->link_older = nullptr; |
538 | 0 | } else { |
539 | 0 | w->link_older = re; |
540 | 0 | re->link_newer = w; |
541 | 0 | re = w; |
542 | 0 | } |
543 | 0 | } else { |
544 | | // grow up |
545 | 0 | we = w; |
546 | 0 | w->write_group = write_group; |
547 | 0 | size += WriteBatchInternal::ByteSize(w->batch); |
548 | 0 | write_group->last_writer = w; |
549 | 0 | write_group->size++; |
550 | 0 | } |
551 | 0 | } |
552 | | // append r_list after write_group end |
553 | 1.16M | if (rb != nullptr) { |
554 | 0 | rb->link_older = we; |
555 | 0 | re->link_newer = nullptr; |
556 | 0 | we->link_newer = rb; |
557 | 0 | if (!newest_writer_.compare_exchange_weak(w, re)) { |
558 | 0 | while (w->link_older != newest_writer) { |
559 | 0 | w = w->link_older; |
560 | 0 | } |
561 | 0 | w->link_older = re; |
562 | 0 | } |
563 | 0 | } |
564 | | |
565 | 1.16M | TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w); |
566 | 1.16M | return size; |
567 | 1.16M | } |
568 | | |
569 | | void WriteThread::EnterAsMemTableWriter(Writer* leader, |
570 | 0 | WriteGroup* write_group) { |
571 | 0 | assert(leader != nullptr); |
572 | 0 | assert(leader->link_older == nullptr); |
573 | 0 | assert(leader->batch != nullptr); |
574 | 0 | assert(write_group != nullptr); |
575 | |
|
576 | 0 | size_t size = WriteBatchInternal::ByteSize(leader->batch); |
577 | | |
578 | | // Allow the group to grow up to a maximum size, but if the |
579 | | // original write is small, limit the growth so we do not slow |
580 | | // down the small write too much. |
581 | 0 | size_t max_size = max_write_batch_group_size_bytes; |
582 | 0 | const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8; |
583 | 0 | if (size <= min_batch_size_bytes) { |
584 | 0 | max_size = size + min_batch_size_bytes; |
585 | 0 | } |
586 | |
|
587 | 0 | leader->write_group = write_group; |
588 | 0 | write_group->leader = leader; |
589 | 0 | write_group->size = 1; |
590 | 0 | Writer* last_writer = leader; |
591 | |
|
592 | 0 | if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) { |
593 | 0 | Writer* newest_writer = newest_memtable_writer_.load(); |
594 | 0 | CreateMissingNewerLinks(newest_writer); |
595 | |
|
596 | 0 | Writer* w = leader; |
597 | 0 | while (w != newest_writer) { |
598 | 0 | assert(w->link_newer); |
599 | 0 | w = w->link_newer; |
600 | |
|
601 | 0 | if (w->batch == nullptr) { |
602 | 0 | break; |
603 | 0 | } |
604 | | |
605 | 0 | if (w->batch->HasMerge()) { |
606 | 0 | break; |
607 | 0 | } |
608 | | |
609 | 0 | if (!allow_concurrent_memtable_write_) { |
610 | 0 | auto batch_size = WriteBatchInternal::ByteSize(w->batch); |
611 | 0 | if (size + batch_size > max_size) { |
612 | | // Do not make batch too big |
613 | 0 | break; |
614 | 0 | } |
615 | 0 | size += batch_size; |
616 | 0 | } |
617 | | |
618 | 0 | w->write_group = write_group; |
619 | 0 | last_writer = w; |
620 | 0 | write_group->size++; |
621 | 0 | } |
622 | 0 | } |
623 | |
|
624 | 0 | write_group->last_writer = last_writer; |
625 | 0 | write_group->last_sequence = |
626 | 0 | last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1; |
627 | 0 | } |
628 | | |
629 | | void WriteThread::ExitAsMemTableWriter(Writer* /*self*/, |
630 | 0 | WriteGroup& write_group) { |
631 | 0 | Writer* leader = write_group.leader; |
632 | 0 | Writer* last_writer = write_group.last_writer; |
633 | |
|
634 | 0 | Writer* newest_writer = last_writer; |
635 | 0 | if (!newest_memtable_writer_.compare_exchange_strong(newest_writer, |
636 | 0 | nullptr)) { |
637 | 0 | CreateMissingNewerLinks(newest_writer); |
638 | 0 | Writer* next_leader = last_writer->link_newer; |
639 | 0 | assert(next_leader != nullptr); |
640 | 0 | next_leader->link_older = nullptr; |
641 | 0 | SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER); |
642 | 0 | } |
643 | 0 | Writer* w = leader; |
644 | 0 | while (true) { |
645 | 0 | if (!write_group.status.ok()) { |
646 | 0 | w->status = write_group.status; |
647 | 0 | } |
648 | 0 | Writer* next = w->link_newer; |
649 | 0 | if (w != leader) { |
650 | 0 | SetState(w, STATE_COMPLETED); |
651 | 0 | } |
652 | 0 | if (w == last_writer) { |
653 | 0 | break; |
654 | 0 | } |
655 | 0 | assert(next); |
656 | 0 | w = next; |
657 | 0 | } |
658 | | // Note that leader has to exit last, since it owns the write group. |
659 | 0 | SetState(leader, STATE_COMPLETED); |
660 | 0 | } |
661 | | |
662 | 0 | void WriteThread::SetMemWritersEachStride(Writer* w) { |
663 | 0 | WriteGroup* write_group = w->write_group; |
664 | 0 | Writer* last_writer = write_group->last_writer; |
665 | | |
666 | | // The stride is the same for each writer in write_group, so w will |
667 | | // call the writers with the same number in write_group mod total size |
668 | 0 | size_t stride = static_cast<size_t>(std::sqrt(write_group->size)); |
669 | 0 | size_t count = 0; |
670 | 0 | while (w) { |
671 | 0 | if (count++ % stride == 0) { |
672 | 0 | SetState(w, STATE_PARALLEL_MEMTABLE_WRITER); |
673 | 0 | } |
674 | 0 | w = (w == last_writer) ? nullptr : w->link_newer; |
675 | 0 | } |
676 | 0 | } |
677 | | |
678 | 0 | void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) { |
679 | 0 | assert(write_group != nullptr); |
680 | 0 | size_t group_size = write_group->size; |
681 | 0 | write_group->running.store(group_size); |
682 | | |
683 | | // The minimum number to allow the group use parallel caller mode. |
684 | | // The number must no lower than 3; |
685 | 0 | const size_t MinParallelSize = 20; |
686 | | |
687 | | // The group_size is too small, and there is no need to have |
688 | | // the parallel partial callers. |
689 | 0 | if (group_size < MinParallelSize) { |
690 | 0 | for (auto w : *write_group) { |
691 | 0 | SetState(w, STATE_PARALLEL_MEMTABLE_WRITER); |
692 | 0 | } |
693 | 0 | return; |
694 | 0 | } |
695 | | |
696 | | // The stride is equal to std::sqrt(group_size) which can minimize |
697 | | // the total number of leader SetSate. |
698 | | // Set the leader itself STATE_PARALLEL_MEMTABLE_WRITER, and set |
699 | | // (stride-1) writers to be STATE_PARALLEL_MEMTABLE_CALLER. |
700 | 0 | size_t stride = static_cast<size_t>(std::sqrt(group_size)); |
701 | 0 | auto w = write_group->leader; |
702 | 0 | SetState(w, STATE_PARALLEL_MEMTABLE_WRITER); |
703 | |
|
704 | 0 | for (size_t i = 1; i < stride; i++) { |
705 | 0 | w = w->link_newer; |
706 | 0 | SetState(w, STATE_PARALLEL_MEMTABLE_CALLER); |
707 | 0 | } |
708 | | |
709 | | // After setting all STATE_PARALLEL_MEMTABLE_CALLER, the leader also |
710 | | // does the job as STATE_PARALLEL_MEMTABLE_CALLER. |
711 | 0 | w = w->link_newer; |
712 | 0 | SetMemWritersEachStride(w); |
713 | 0 | } |
714 | | |
715 | | static WriteThread::AdaptationContext cpmtw_ctx( |
716 | | "CompleteParallelMemTableWriter"); |
717 | | // This method is called by both the leader and parallel followers |
718 | 0 | bool WriteThread::CompleteParallelMemTableWriter(Writer* w) { |
719 | 0 | auto* write_group = w->write_group; |
720 | 0 | if (!w->status.ok()) { |
721 | 0 | std::lock_guard<std::mutex> guard(write_group->leader->StateMutex()); |
722 | 0 | write_group->status = w->status; |
723 | 0 | } |
724 | |
|
725 | 0 | if (write_group->running-- > 1) { |
726 | | // we're not the last one |
727 | 0 | AwaitState(w, STATE_COMPLETED, &cpmtw_ctx); |
728 | 0 | return false; |
729 | 0 | } |
730 | | // else we're the last parallel worker and should perform exit duties. |
731 | 0 | w->status = write_group->status; |
732 | | // Callers of this function must ensure w->status is checked. |
733 | 0 | write_group->status.PermitUncheckedError(); |
734 | 0 | return true; |
735 | 0 | } |
736 | | |
737 | 0 | void WriteThread::ExitAsBatchGroupFollower(Writer* w) { |
738 | 0 | auto* write_group = w->write_group; |
739 | |
|
740 | 0 | assert(w->state == STATE_PARALLEL_MEMTABLE_WRITER); |
741 | 0 | assert(write_group->status.ok()); |
742 | 0 | ExitAsBatchGroupLeader(*write_group, write_group->status); |
743 | 0 | assert(w->status.ok()); |
744 | 0 | assert(w->state == STATE_COMPLETED); |
745 | 0 | SetState(write_group->leader, STATE_COMPLETED); |
746 | 0 | } |
747 | | |
748 | | static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader"); |
749 | | void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, |
750 | 1.16M | Status& status) { |
751 | 1.16M | TEST_SYNC_POINT_CALLBACK("WriteThread::ExitAsBatchGroupLeader:Start", |
752 | 1.16M | &write_group); |
753 | | |
754 | 1.16M | Writer* leader = write_group.leader; |
755 | 1.16M | Writer* last_writer = write_group.last_writer; |
756 | 1.16M | assert(leader->link_older == nullptr); |
757 | | |
758 | | // If status is non-ok already, then write_group.status won't have the chance |
759 | | // of being propagated to caller. |
760 | 1.16M | if (!status.ok()) { |
761 | 0 | write_group.status.PermitUncheckedError(); |
762 | 0 | } |
763 | | |
764 | | // Propagate memtable write error to the whole group. |
765 | 1.16M | if (status.ok() && !write_group.status.ok()) { |
766 | 0 | status = write_group.status; |
767 | 0 | } |
768 | | |
769 | 1.16M | if (enable_pipelined_write_) { |
770 | | // We insert a dummy Writer right before our current write_group. This |
771 | | // allows us to unlink our write_group without the risk that a subsequent |
772 | | // writer becomes a new leader and might overtake us and add itself to the |
773 | | // memtable-writer-list before we can do so. This ensures that writers are |
774 | | // added to the memtable-writer-list in the exact same order in which they |
775 | | // were in the newest_writer list. |
776 | | // This must happen before completing the writers from our group to prevent |
777 | | // a race where the owning thread of one of these writers can start a new |
778 | | // write operation. |
779 | 0 | Writer dummy; |
780 | 0 | Writer* head = newest_writer_.load(std::memory_order_acquire); |
781 | 0 | if (head != last_writer || |
782 | 0 | !newest_writer_.compare_exchange_strong(head, &dummy)) { |
783 | | // Either last_writer wasn't the head during the load(), or it was the |
784 | | // head during the load() but somebody else pushed onto the list before |
785 | | // we did the compare_exchange_strong (causing it to fail). In the latter |
786 | | // case compare_exchange_strong has the effect of re-reading its first |
787 | | // param (head). No need to retry a failing CAS, because only a departing |
788 | | // leader (which we are at the moment) can remove nodes from the list. |
789 | 0 | assert(head != last_writer); |
790 | | |
791 | | // After walking link_older starting from head (if not already done) we |
792 | | // will be able to traverse w->link_newer below. |
793 | 0 | CreateMissingNewerLinks(head); |
794 | 0 | assert(last_writer->link_newer != nullptr); |
795 | 0 | last_writer->link_newer->link_older = &dummy; |
796 | 0 | dummy.link_newer = last_writer->link_newer; |
797 | 0 | } |
798 | | |
799 | | // Complete writers that don't write to memtable |
800 | 0 | for (Writer* w = last_writer; w != leader;) { |
801 | 0 | Writer* next = w->link_older; |
802 | 0 | w->status = status; |
803 | 0 | if (!w->ShouldWriteToMemtable()) { |
804 | 0 | CompleteFollower(w, write_group); |
805 | 0 | } |
806 | 0 | w = next; |
807 | 0 | } |
808 | 0 | if (!leader->ShouldWriteToMemtable()) { |
809 | 0 | CompleteLeader(write_group); |
810 | 0 | } |
811 | |
|
812 | 0 | TEST_SYNC_POINT_CALLBACK( |
813 | 0 | "WriteThread::ExitAsBatchGroupLeader:AfterCompleteWriters", |
814 | 0 | &write_group); |
815 | | |
816 | | // Link the remaining of the group to memtable writer list. |
817 | | // We have to link our group to memtable writer queue before wake up the |
818 | | // next leader or set newest_writer_ to null, otherwise the next leader |
819 | | // can run ahead of us and link to memtable writer queue before we do. |
820 | 0 | if (write_group.size > 0) { |
821 | 0 | if (LinkGroup(write_group, &newest_memtable_writer_)) { |
822 | | // The leader can now be different from current writer. |
823 | 0 | SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER); |
824 | 0 | } |
825 | 0 | } |
826 | | |
827 | | // Unlink the dummy writer from the list and identify the new leader |
828 | 0 | head = newest_writer_.load(std::memory_order_acquire); |
829 | 0 | if (head != &dummy || |
830 | 0 | !newest_writer_.compare_exchange_strong(head, nullptr)) { |
831 | 0 | CreateMissingNewerLinks(head); |
832 | 0 | Writer* new_leader = dummy.link_newer; |
833 | 0 | assert(new_leader != nullptr); |
834 | 0 | new_leader->link_older = nullptr; |
835 | 0 | SetState(new_leader, STATE_GROUP_LEADER); |
836 | 0 | } |
837 | |
|
838 | 0 | AwaitState(leader, |
839 | 0 | STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_CALLER | |
840 | 0 | STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, |
841 | 0 | &eabgl_ctx); |
842 | 1.16M | } else { |
843 | 1.16M | Writer* head = newest_writer_.load(std::memory_order_acquire); |
844 | 1.16M | if (head != last_writer || |
845 | 1.16M | !newest_writer_.compare_exchange_strong(head, nullptr)) { |
846 | | // Either last_writer wasn't the head during the load(), or it was the |
847 | | // head during the load() but somebody else pushed onto the list before |
848 | | // we did the compare_exchange_strong (causing it to fail). In the |
849 | | // latter case compare_exchange_strong has the effect of re-reading |
850 | | // its first param (head). No need to retry a failing CAS, because |
851 | | // only a departing leader (which we are at the moment) can remove |
852 | | // nodes from the list. |
853 | 0 | assert(head != last_writer); |
854 | | |
855 | | // After walking link_older starting from head (if not already done) |
856 | | // we will be able to traverse w->link_newer below. This function |
857 | | // can only be called from an active leader, only a leader can |
858 | | // clear newest_writer_, we didn't, and only a clear newest_writer_ |
859 | | // could cause the next leader to start their work without a call |
860 | | // to MarkJoined, so we can definitely conclude that no other leader |
861 | | // work is going on here (with or without db mutex). |
862 | 0 | CreateMissingNewerLinks(head); |
863 | 0 | assert(last_writer->link_newer != nullptr); |
864 | 0 | assert(last_writer->link_newer->link_older == last_writer); |
865 | 0 | last_writer->link_newer->link_older = nullptr; |
866 | | |
867 | | // Next leader didn't self-identify, because newest_writer_ wasn't |
868 | | // nullptr when they enqueued (we were definitely enqueued before them |
869 | | // and are still in the list). That means leader handoff occurs when |
870 | | // we call MarkJoined |
871 | 0 | SetState(last_writer->link_newer, STATE_GROUP_LEADER); |
872 | 0 | } |
873 | | // else nobody else was waiting, although there might already be a new |
874 | | // leader now |
875 | | |
876 | 1.16M | while (last_writer != leader) { |
877 | 0 | assert(last_writer); |
878 | 0 | last_writer->status = status; |
879 | | // we need to read link_older before calling SetState, because as soon |
880 | | // as it is marked committed the other thread's Await may return and |
881 | | // deallocate the Writer. |
882 | 0 | auto next = last_writer->link_older; |
883 | 0 | SetState(last_writer, STATE_COMPLETED); |
884 | |
|
885 | 0 | last_writer = next; |
886 | 0 | } |
887 | 1.16M | } |
888 | 1.16M | } |
889 | | |
890 | | static WriteThread::AdaptationContext eu_ctx("EnterUnbatched"); |
891 | 869 | void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) { |
892 | 869 | assert(w != nullptr && w->batch == nullptr); |
893 | 869 | mu->Unlock(); |
894 | 869 | bool linked_as_leader = LinkOne(w, &newest_writer_); |
895 | 869 | if (!linked_as_leader) { |
896 | 0 | TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait"); |
897 | | // Last leader will not pick us as a follower since our batch is nullptr |
898 | 0 | AwaitState(w, STATE_GROUP_LEADER, &eu_ctx); |
899 | 0 | } |
900 | 869 | if (enable_pipelined_write_) { |
901 | 0 | WaitForMemTableWriters(); |
902 | 0 | } |
903 | 869 | mu->Lock(); |
904 | 869 | } |
905 | | |
906 | 869 | void WriteThread::ExitUnbatched(Writer* w) { |
907 | 869 | assert(w != nullptr); |
908 | 869 | Writer* newest_writer = w; |
909 | 869 | if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) { |
910 | 0 | CreateMissingNewerLinks(newest_writer); |
911 | 0 | Writer* next_leader = w->link_newer; |
912 | 0 | assert(next_leader != nullptr); |
913 | 0 | next_leader->link_older = nullptr; |
914 | 0 | SetState(next_leader, STATE_GROUP_LEADER); |
915 | 0 | } |
916 | 869 | } |
917 | | |
918 | | static WriteThread::AdaptationContext wfmw_ctx("WaitForMemTableWriters"); |
919 | 0 | void WriteThread::WaitForMemTableWriters() { |
920 | 0 | assert(enable_pipelined_write_); |
921 | 0 | if (newest_memtable_writer_.load() == nullptr) { |
922 | 0 | return; |
923 | 0 | } |
924 | 0 | Writer w; |
925 | 0 | if (!LinkOne(&w, &newest_memtable_writer_)) { |
926 | 0 | AwaitState(&w, STATE_MEMTABLE_WRITER_LEADER, &wfmw_ctx); |
927 | 0 | } |
928 | 0 | newest_memtable_writer_.store(nullptr); |
929 | 0 | } |
930 | | |
931 | | } // namespace ROCKSDB_NAMESPACE |