Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package record
6 :
7 : import (
8 : "context"
9 : "encoding/binary"
10 : "io"
11 : "runtime/pprof"
12 : "sync"
13 : "sync/atomic"
14 : "time"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/crc"
19 : "github.com/prometheus/client_golang/prometheus"
20 : )
21 :
22 : var walSyncLabels = pprof.Labels("pebble", "wal-sync")
23 : var errClosedWriter = errors.New("pebble/record: closed LogWriter")
24 :
25 : type block struct {
26 : // buf[:written] has already been filled with fragments. Updated atomically.
27 : written atomic.Int32
28 : // buf[:flushed] has already been flushed to w.
29 : flushed int32
30 : buf [blockSize]byte
31 : }
32 :
33 : type flusher interface {
34 : Flush() error
35 : }
36 :
37 : type syncer interface {
38 : Sync() error
39 : }
40 :
41 : const (
42 : syncConcurrencyBits = 12
43 :
44 : // SyncConcurrency is the maximum number of concurrent sync operations that
45 : // can be performed. Note that a sync operation is initiated either by a call
46 : // to SyncRecord or by a call to Close. Exported as this value also limits
47 : // the commit concurrency in commitPipeline.
48 : SyncConcurrency = 1 << syncConcurrencyBits
49 : )
50 :
51 : type syncSlot struct {
52 : wg *sync.WaitGroup
53 : err *error
54 : }
55 :
56 : // syncQueue is a lock-free fixed-size single-producer, single-consumer
57 : // queue. The single-producer can push to the head, and the single-consumer can
58 : // pop multiple values from the tail. Popping calls Done() on each of the
59 : // available *sync.WaitGroup elements.
60 : type syncQueue struct {
61 : // headTail packs together a 32-bit head index and a 32-bit tail index. Both
62 : // are indexes into slots modulo len(slots)-1.
63 : //
64 : // tail = index of oldest data in queue
65 : // head = index of next slot to fill
66 : //
67 : // Slots in the range [tail, head) are owned by consumers. A consumer
68 : // continues to own a slot outside this range until it nils the slot, at
69 : // which point ownership passes to the producer.
70 : //
71 : // The head index is stored in the most-significant bits so that we can
72 : // atomically add to it and the overflow is harmless.
73 : headTail atomic.Uint64
74 :
75 : // slots is a ring buffer of values stored in this queue. The size must be a
76 : // power of 2. A slot is in use until the tail index has moved beyond it.
77 : slots [SyncConcurrency]syncSlot
78 :
79 : // blocked is an atomic boolean which indicates whether syncing is currently
80 : // blocked or can proceed. It is used by the implementation of
81 : // min-sync-interval to block syncing until the min interval has passed.
82 : blocked atomic.Bool
83 : }
84 :
85 : const dequeueBits = 32
86 :
87 1 : func (q *syncQueue) unpack(ptrs uint64) (head, tail uint32) {
88 1 : const mask = 1<<dequeueBits - 1
89 1 : head = uint32((ptrs >> dequeueBits) & mask)
90 1 : tail = uint32(ptrs & mask)
91 1 : return
92 1 : }
93 :
94 1 : func (q *syncQueue) push(wg *sync.WaitGroup, err *error) {
95 1 : ptrs := q.headTail.Load()
96 1 : head, tail := q.unpack(ptrs)
97 1 : if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
98 0 : panic("pebble: queue is full")
99 : }
100 :
101 1 : slot := &q.slots[head&uint32(len(q.slots)-1)]
102 1 : slot.wg = wg
103 1 : slot.err = err
104 1 :
105 1 : // Increment head. This passes ownership of slot to dequeue and acts as a
106 1 : // store barrier for writing the slot.
107 1 : q.headTail.Add(1 << dequeueBits)
108 : }
109 :
110 0 : func (q *syncQueue) setBlocked() {
111 0 : q.blocked.Store(true)
112 0 : }
113 :
114 1 : func (q *syncQueue) clearBlocked() {
115 1 : q.blocked.Store(false)
116 1 : }
117 :
118 1 : func (q *syncQueue) empty() bool {
119 1 : head, tail, _ := q.load()
120 1 : return head == tail
121 1 : }
122 :
123 : // load returns the head, tail of the queue for what should be synced to the
124 : // caller. It can return a head, tail of zero if syncing is blocked due to
125 : // min-sync-interval. It additionally returns the real length of this queue,
126 : // regardless of whether syncing is blocked.
127 1 : func (q *syncQueue) load() (head, tail, realLength uint32) {
128 1 : ptrs := q.headTail.Load()
129 1 : head, tail = q.unpack(ptrs)
130 1 : realLength = head - tail
131 1 : if q.blocked.Load() {
132 0 : return 0, 0, realLength
133 0 : }
134 1 : return head, tail, realLength
135 : }
136 :
137 : // REQUIRES: queueSemChan is non-nil.
138 1 : func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{}) error {
139 1 : if tail == head {
140 0 : // Queue is empty.
141 0 : return nil
142 0 : }
143 :
144 1 : for ; tail != head; tail++ {
145 1 : slot := &q.slots[tail&uint32(len(q.slots)-1)]
146 1 : wg := slot.wg
147 1 : if wg == nil {
148 0 : return errors.Errorf("nil waiter at %d", errors.Safe(tail&uint32(len(q.slots)-1)))
149 0 : }
150 1 : *slot.err = err
151 1 : slot.wg = nil
152 1 : slot.err = nil
153 1 : // We need to bump the tail count before releasing the queueSemChan
154 1 : // semaphore as releasing the semaphore can cause a blocked goroutine to
155 1 : // acquire the semaphore and enqueue before we've "freed" space in the
156 1 : // queue.
157 1 : q.headTail.Add(1)
158 1 : wg.Done()
159 1 : // Is always non-nil in production, unless using wal package for WAL
160 1 : // failover.
161 1 : if queueSemChan != nil {
162 1 : <-queueSemChan
163 1 : }
164 : }
165 :
166 1 : return nil
167 : }
168 :
169 : // pendingSyncs abstracts out the handling of pending sync requests. In
170 : // standalone mode the implementation is a thin wrapper around syncQueue. In
171 : // the mode where the LogWriter can be subject to failover, there is no queue
172 : // kept in the LogWriter and the signaling to those waiting for sync is
173 : // handled in the wal package.
174 : //
175 : // To avoid heap allocations due to the use of this interface, the parameters
176 : // and return values follow some strict rules:
177 : // - The PendingSync parameter can be reused by the caller after push returns.
178 : // The implementation should be a pointer backed by a struct that is already
179 : // heap allocated, which the caller can reuse for the next push call.
180 : // - The pendingSyncSnapshot return value must be backed by the pendingSyncs
181 : // implementation, so calling snapshotForPop again will cause the previous
182 : // snapshot to be overwritten.
183 : type pendingSyncs interface {
184 : push(PendingSync)
185 : setBlocked()
186 : clearBlocked()
187 : empty() bool
188 : snapshotForPop() pendingSyncsSnapshot
189 : pop(snap pendingSyncsSnapshot, err error) error
190 : }
191 :
192 : type pendingSyncsSnapshot interface {
193 : empty() bool
194 : }
195 :
196 : // PendingSync abstracts the sync specification for a record queued on the
197 : // LogWriter. The only implementations are provided in this package since
198 : // syncRequested is not exported.
199 : type PendingSync interface {
200 : syncRequested() bool
201 : }
202 :
203 : // The implementation of pendingSyncs in standalone mode.
204 : type pendingSyncsWithSyncQueue struct {
205 : syncQueue
206 : syncQueueLen *base.GaugeSampleMetric
207 : snapshotBacking syncQueueSnapshot
208 : // See the comment for LogWriterConfig.QueueSemChan.
209 : queueSemChan chan struct{}
210 : }
211 :
212 : var _ pendingSyncs = &pendingSyncsWithSyncQueue{}
213 :
214 1 : func (q *pendingSyncsWithSyncQueue) push(ps PendingSync) {
215 1 : ps2 := ps.(*pendingSyncForSyncQueue)
216 1 : q.syncQueue.push(ps2.wg, ps2.err)
217 1 : }
218 :
219 1 : func (q *pendingSyncsWithSyncQueue) snapshotForPop() pendingSyncsSnapshot {
220 1 : head, tail, realLength := q.syncQueue.load()
221 1 : q.snapshotBacking = syncQueueSnapshot{
222 1 : head: head,
223 1 : tail: tail,
224 1 : }
225 1 : q.syncQueueLen.AddSample(int64(realLength))
226 1 : return &q.snapshotBacking
227 1 : }
228 :
229 1 : func (q *pendingSyncsWithSyncQueue) pop(snap pendingSyncsSnapshot, err error) error {
230 1 : s := snap.(*syncQueueSnapshot)
231 1 : return q.syncQueue.pop(s.head, s.tail, err, q.queueSemChan)
232 1 : }
233 :
234 : // The implementation of pendingSyncsSnapshot in standalone mode.
235 : type syncQueueSnapshot struct {
236 : head, tail uint32
237 : }
238 :
239 1 : func (s *syncQueueSnapshot) empty() bool {
240 1 : return s.head == s.tail
241 1 : }
242 :
243 : // The implementation of pendingSync in standalone mode.
244 : type pendingSyncForSyncQueue struct {
245 : wg *sync.WaitGroup
246 : err *error
247 : }
248 :
249 1 : func (ps *pendingSyncForSyncQueue) syncRequested() bool {
250 1 : return ps.wg != nil
251 1 : }
252 :
253 : // The implementation of pendingSyncs in failover mode.
254 : type pendingSyncsWithHighestSyncIndex struct {
255 : // The highest "index" queued that is requesting a sync. Initialized
256 : // to NoSyncIndex, and reset to NoSyncIndex after the sync.
257 : index atomic.Int64
258 : snapshotBacking PendingSyncIndex
259 : // blocked is an atomic boolean which indicates whether syncing is currently
260 : // blocked or can proceed. It is used by the implementation of
261 : // min-sync-interval to block syncing until the min interval has passed.
262 : blocked atomic.Bool
263 : externalSyncQueueCallback ExternalSyncQueueCallback
264 : }
265 :
266 : // NoSyncIndex is the value of PendingSyncIndex when a sync is not requested.
267 : const NoSyncIndex = -1
268 :
269 : func (si *pendingSyncsWithHighestSyncIndex) init(
270 : externalSyncQueueCallback ExternalSyncQueueCallback,
271 0 : ) {
272 0 : si.index.Store(NoSyncIndex)
273 0 : si.externalSyncQueueCallback = externalSyncQueueCallback
274 0 : }
275 :
276 0 : func (si *pendingSyncsWithHighestSyncIndex) push(ps PendingSync) {
277 0 : ps2 := ps.(*PendingSyncIndex)
278 0 : si.index.Store(ps2.Index)
279 0 : }
280 :
281 0 : func (si *pendingSyncsWithHighestSyncIndex) setBlocked() {
282 0 : si.blocked.Store(true)
283 0 : }
284 :
285 0 : func (si *pendingSyncsWithHighestSyncIndex) clearBlocked() {
286 0 : si.blocked.Store(false)
287 0 : }
288 :
289 0 : func (si *pendingSyncsWithHighestSyncIndex) empty() bool {
290 0 : return si.load() == NoSyncIndex
291 0 : }
292 :
293 0 : func (si *pendingSyncsWithHighestSyncIndex) snapshotForPop() pendingSyncsSnapshot {
294 0 : si.snapshotBacking = PendingSyncIndex{Index: si.load()}
295 0 : return &si.snapshotBacking
296 0 : }
297 :
298 0 : func (si *pendingSyncsWithHighestSyncIndex) load() int64 {
299 0 : index := si.index.Load()
300 0 : if index != NoSyncIndex && si.blocked.Load() {
301 0 : index = NoSyncIndex
302 0 : }
303 0 : return index
304 : }
305 :
306 0 : func (si *pendingSyncsWithHighestSyncIndex) pop(snap pendingSyncsSnapshot, err error) error {
307 0 : index := snap.(*PendingSyncIndex)
308 0 : if index.Index == NoSyncIndex {
309 0 : return nil
310 0 : }
311 : // Set to NoSyncIndex if a higher index has not queued.
312 0 : si.index.CompareAndSwap(index.Index, NoSyncIndex)
313 0 : si.externalSyncQueueCallback(*index, err)
314 0 : return nil
315 : }
316 :
317 : // PendingSyncIndex implements both pendingSyncsSnapshot and PendingSync.
318 : type PendingSyncIndex struct {
319 : // Index is some state meaningful to the user of LogWriter. The LogWriter
320 : // itself only examines whether Index is equal to NoSyncIndex.
321 : Index int64
322 : }
323 :
324 0 : func (s *PendingSyncIndex) empty() bool {
325 0 : return s.Index == NoSyncIndex
326 0 : }
327 :
328 0 : func (s *PendingSyncIndex) syncRequested() bool {
329 0 : return s.Index != NoSyncIndex
330 0 : }
331 :
332 : // flusherCond is a specialized condition variable that allows its condition to
333 : // change and readiness be signalled without holding its associated mutex. In
334 : // particular, when a waiter is added to syncQueue atomically, this condition
335 : // variable can be signalled without holding flusher.Mutex.
336 : type flusherCond struct {
337 : mu *sync.Mutex
338 : q pendingSyncs
339 : cond sync.Cond
340 : }
341 :
342 1 : func (c *flusherCond) init(mu *sync.Mutex, q pendingSyncs) {
343 1 : c.mu = mu
344 1 : c.q = q
345 1 : // Yes, this is a bit circular, but that is intentional. flusherCond.cond.L
346 1 : // points flusherCond so that when cond.L.Unlock is called flusherCond.Unlock
347 1 : // will be called and we can check the !syncQueue.empty() condition.
348 1 : c.cond.L = c
349 1 : }
350 :
351 1 : func (c *flusherCond) Signal() {
352 1 : // Pass-through to the cond var.
353 1 : c.cond.Signal()
354 1 : }
355 :
356 1 : func (c *flusherCond) Wait() {
357 1 : // Pass-through to the cond var. Note that internally the cond var implements
358 1 : // Wait as:
359 1 : //
360 1 : // t := notifyListAdd()
361 1 : // L.Unlock()
362 1 : // notifyListWait(t)
363 1 : // L.Lock()
364 1 : //
365 1 : // We've configured the cond var to call flusherReady.Unlock() which allows
366 1 : // us to check the !syncQueue.empty() condition without a danger of missing a
367 1 : // notification. Any call to flusherReady.Signal() after notifyListAdd() is
368 1 : // called will cause the subsequent notifyListWait() to return immediately.
369 1 : c.cond.Wait()
370 1 : }
371 :
372 1 : func (c *flusherCond) Lock() {
373 1 : c.mu.Lock()
374 1 : }
375 :
376 1 : func (c *flusherCond) Unlock() {
377 1 : c.mu.Unlock()
378 1 : if !c.q.empty() {
379 1 : // If the current goroutine is about to block on sync.Cond.Wait, this call
380 1 : // to Signal will prevent that. The comment in Wait above explains a bit
381 1 : // about what is going on here, but it is worth reiterating:
382 1 : //
383 1 : // flusherCond.Wait()
384 1 : // sync.Cond.Wait()
385 1 : // t := notifyListAdd()
386 1 : // flusherCond.Unlock() <-- we are here
387 1 : // notifyListWait(t)
388 1 : // flusherCond.Lock()
389 1 : //
390 1 : // The call to Signal here results in:
391 1 : //
392 1 : // sync.Cond.Signal()
393 1 : // notifyListNotifyOne()
394 1 : //
395 1 : // The call to notifyListNotifyOne() will prevent the call to
396 1 : // notifyListWait(t) from blocking.
397 1 : c.cond.Signal()
398 1 : }
399 : }
400 :
401 : type durationFunc func() time.Duration
402 :
403 : // syncTimer is an interface for timers, modeled on the closure callback mode
404 : // of time.Timer. See time.AfterFunc and LogWriter.afterFunc. syncTimer is used
405 : // by tests to mock out the timer functionality used to implement
406 : // min-sync-interval.
407 : type syncTimer interface {
408 : Reset(time.Duration) bool
409 : Stop() bool
410 : }
411 :
412 : // LogWriter writes records to an underlying io.Writer. In order to support WAL
413 : // file reuse, a LogWriter's records are tagged with the WAL's file
414 : // number. When reading a log file a record from a previous incarnation of the
415 : // file will return the error ErrInvalidLogNum.
416 : type LogWriter struct {
417 : // w is the underlying writer.
418 : w io.Writer
419 : // c is w as a closer.
420 : c io.Closer
421 : // s is w as a syncer.
422 : s syncer
423 : // logNum is the low 32-bits of the log's file number.
424 : logNum uint32
425 : // blockNum is the zero based block number for the current block.
426 : blockNum int64
427 : // err is any accumulated error. It originates in flusher.err, and is
428 : // updated to reflect flusher.err when a block is full and getting enqueued.
429 : // Therefore, there is a lag between when flusher.err has a non-nil error,
430 : // and when that non-nil error is reflected in LogWriter.err. On close, it
431 : // is set to errClosedWriter to inform accidental future calls to
432 : // SyncRecord*.
433 : err error
434 : // block is the current block being written. Protected by flusher.Mutex.
435 : block *block
436 : free struct {
437 : sync.Mutex
438 : blocks []*block
439 : }
440 :
441 : flusher struct {
442 : sync.Mutex
443 : // Flusher ready is a condition variable that is signalled when there are
444 : // blocks to flush, syncing has been requested, or the LogWriter has been
445 : // closed. For signalling of a sync, it is safe to call without holding
446 : // flusher.Mutex.
447 : ready flusherCond
448 : // Set to true when the flush loop should be closed.
449 : close bool
450 : // Closed when the flush loop has terminated.
451 : closed chan struct{}
452 : // Accumulated flush error.
453 : err error
454 : // minSyncInterval is the minimum duration between syncs.
455 : minSyncInterval durationFunc
456 : fsyncLatency prometheus.Histogram
457 : pending []*block
458 : // Pushing and popping from pendingSyncs does not require flusher mutex to
459 : // be held.
460 : pendingSyncs pendingSyncs
461 : metrics *LogWriterMetrics
462 : }
463 :
464 : // afterFunc is a hook to allow tests to mock out the timer functionality
465 : // used for min-sync-interval. In normal operation this points to
466 : // time.AfterFunc.
467 : afterFunc func(d time.Duration, f func()) syncTimer
468 :
469 : // Backing for both pendingSyncs implementations.
470 : pendingSyncsBackingQ pendingSyncsWithSyncQueue
471 : pendingSyncsBackingIndex pendingSyncsWithHighestSyncIndex
472 :
473 : pendingSyncForSyncQueueBacking pendingSyncForSyncQueue
474 : }
475 :
476 : // LogWriterConfig is a struct used for configuring new LogWriters
477 : type LogWriterConfig struct {
478 : WALMinSyncInterval durationFunc
479 : WALFsyncLatency prometheus.Histogram
480 : // QueueSemChan is an optional channel to pop from when popping from
481 : // LogWriter.flusher.syncQueue. It functions as a semaphore that prevents
482 : // the syncQueue from overflowing (which will cause a panic). All production
483 : // code ensures this is non-nil.
484 : QueueSemChan chan struct{}
485 :
486 : // ExternalSyncQueueCallback is set to non-nil when the LogWriter is used
487 : // as part of a WAL implementation that can failover between LogWriters.
488 : //
489 : // In this case, QueueSemChan is always nil, and SyncRecordGeneralized must
490 : // be used with a PendingSync parameter that is implemented by
491 : // PendingSyncIndex. When an index is synced (which implies all earlier
492 : // indices are also synced), this callback is invoked. The caller must not
493 : // hold any mutex when invoking this callback, since the lock ordering
494 : // requirement in this case is that any higher layer locks (in the wal
495 : // package) precede the lower layer locks (in the record package). These
496 : // callbacks are serialized since they are invoked from the flushLoop.
497 : ExternalSyncQueueCallback ExternalSyncQueueCallback
498 : }
499 :
500 : // ExternalSyncQueueCallback is to be run when a PendingSync has been
501 : // processed, either successfully or with an error.
502 : type ExternalSyncQueueCallback func(doneSync PendingSyncIndex, err error)
503 :
504 : // initialAllocatedBlocksCap is the initial capacity of the various slices
505 : // intended to hold LogWriter blocks. The LogWriter may allocate more blocks
506 : // than this threshold allows.
507 : const initialAllocatedBlocksCap = 32
508 :
509 : // blockPool pools *blocks to avoid allocations. Blocks are only added to the
510 : // Pool when a LogWriter is closed. Before that, free blocks are maintained
511 : // within a LogWriter's own internal free list `w.free.blocks`.
512 : var blockPool = sync.Pool{
513 1 : New: func() any { return &block{} },
514 : }
515 :
516 : // NewLogWriter returns a new LogWriter.
517 : //
518 : // The io.Writer may also be used as an io.Closer and syncer. No other methods
519 : // will be called on the writer.
520 : func NewLogWriter(
521 : w io.Writer, logNum base.DiskFileNum, logWriterConfig LogWriterConfig,
522 1 : ) *LogWriter {
523 1 : c, _ := w.(io.Closer)
524 1 : s, _ := w.(syncer)
525 1 : r := &LogWriter{
526 1 : w: w,
527 1 : c: c,
528 1 : s: s,
529 1 : // NB: we truncate the 64-bit log number to 32-bits. This is ok because a)
530 1 : // we are very unlikely to reach a file number of 4 billion and b) the log
531 1 : // number is used as a validation check and using only the low 32-bits is
532 1 : // sufficient for that purpose.
533 1 : logNum: uint32(logNum),
534 1 : afterFunc: func(d time.Duration, f func()) syncTimer {
535 0 : return time.AfterFunc(d, f)
536 0 : },
537 : }
538 1 : m := &LogWriterMetrics{}
539 1 : if logWriterConfig.ExternalSyncQueueCallback != nil {
540 0 : r.pendingSyncsBackingIndex.init(logWriterConfig.ExternalSyncQueueCallback)
541 0 : r.flusher.pendingSyncs = &r.pendingSyncsBackingIndex
542 1 : } else {
543 1 : r.pendingSyncsBackingQ = pendingSyncsWithSyncQueue{
544 1 : syncQueueLen: &m.SyncQueueLen,
545 1 : queueSemChan: logWriterConfig.QueueSemChan,
546 1 : }
547 1 : r.flusher.pendingSyncs = &r.pendingSyncsBackingQ
548 1 : }
549 :
550 1 : r.free.blocks = make([]*block, 0, initialAllocatedBlocksCap)
551 1 : r.block = blockPool.Get().(*block)
552 1 : r.flusher.ready.init(&r.flusher.Mutex, r.flusher.pendingSyncs)
553 1 : r.flusher.closed = make(chan struct{})
554 1 : r.flusher.pending = make([]*block, 0, cap(r.free.blocks))
555 1 : r.flusher.metrics = m
556 1 :
557 1 : f := &r.flusher
558 1 : f.minSyncInterval = logWriterConfig.WALMinSyncInterval
559 1 : f.fsyncLatency = logWriterConfig.WALFsyncLatency
560 1 :
561 1 : go func() {
562 1 : pprof.Do(context.Background(), walSyncLabels, r.flushLoop)
563 1 : }()
564 1 : return r
565 : }
566 :
567 1 : func (w *LogWriter) flushLoop(context.Context) {
568 1 : f := &w.flusher
569 1 : f.Lock()
570 1 :
571 1 : // Initialize idleStartTime to when the loop starts.
572 1 : idleStartTime := time.Now()
573 1 : var syncTimer syncTimer
574 1 : defer func() {
575 1 : // Capture the idle duration between the last piece of work and when the
576 1 : // loop terminated.
577 1 : f.metrics.WriteThroughput.IdleDuration += time.Since(idleStartTime)
578 1 : if syncTimer != nil {
579 0 : syncTimer.Stop()
580 0 : }
581 1 : close(f.closed)
582 1 : f.Unlock()
583 : }()
584 :
585 : // The flush loop performs flushing of full and partial data blocks to the
586 : // underlying writer (LogWriter.w), syncing of the writer, and notification
587 : // to sync requests that they have completed.
588 : //
589 : // - flusher.ready is a condition variable that is signalled when there is
590 : // work to do. Full blocks are contained in flusher.pending. The current
591 : // partial block is in LogWriter.block. And sync operations are held in
592 : // flusher.syncQ.
593 : //
594 : // - The decision to sync is determined by whether there are any sync
595 : // requests present in flusher.syncQ and whether enough time has elapsed
596 : // since the last sync. If not enough time has elapsed since the last sync,
597 : // flusher.syncQ.blocked will be set to 1. If syncing is blocked,
598 : // syncQueue.empty() will return true and syncQueue.load() will return 0,0
599 : // (i.e. an empty list).
600 : //
601 : // - flusher.syncQ.blocked is cleared by a timer that is initialized when
602 : // blocked is set to 1. When blocked is 1, no syncing will take place, but
603 : // flushing will continue to be performed. The on/off toggle for syncing
604 : // does not need to be carefully synchronized with the rest of processing
605 : // -- all we need to ensure is that after any transition to blocked=1 there
606 : // is eventually a transition to blocked=0. syncTimer performs this
607 : // transition. Note that any change to min-sync-interval will not take
608 : // effect until the previous timer elapses.
609 : //
610 : // - Picking up the syncing work to perform requires coordination with
611 : // picking up the flushing work. Specifically, flushing work is queued
612 : // before syncing work. The guarantee of this code is that when a sync is
613 : // requested, any previously queued flush work will be synced. This
614 : // motivates reading the syncing work (f.syncQ.load()) before picking up
615 : // the flush work (w.block.written.Load()).
616 :
617 : // The list of full blocks that need to be written. This is copied from
618 : // f.pending on every loop iteration, though the number of elements is
619 : // usually small (most frequently 1). In the case of the WAL LogWriter, the
620 : // number of blocks is bounded by the size of the WAL's corresponding
621 : // memtable (MemtableSize/BlockSize). With the default 64 MiB memtables,
622 : // this works out to at most 2048 elements if the entirety of the memtable's
623 : // contents are queued.
624 1 : pending := make([]*block, 0, cap(f.pending))
625 1 : for {
626 1 : for {
627 1 : // Grab the portion of the current block that requires flushing. Note that
628 1 : // the current block can be added to the pending blocks list after we release
629 1 : // the flusher lock, but it won't be part of pending.
630 1 : written := w.block.written.Load()
631 1 : if len(f.pending) > 0 || written > w.block.flushed || !f.pendingSyncs.empty() {
632 1 : break
633 : }
634 1 : if f.close {
635 1 : // If the writer is closed, pretend the sync timer fired immediately so
636 1 : // that we can process any queued sync requests.
637 1 : f.pendingSyncs.clearBlocked()
638 1 : if !f.pendingSyncs.empty() {
639 0 : break
640 : }
641 1 : return
642 : }
643 1 : f.ready.Wait()
644 1 : continue
645 : }
646 : // Found work to do, so no longer idle.
647 : //
648 : // NB: it is safe to read pending before loading from the syncQ since
649 : // mutations to pending require the w.flusher mutex, which is held here.
650 : // There is no risk that someone will concurrently add to pending, so the
651 : // following sequence, which would pick up a syncQ entry without the
652 : // corresponding data, is impossible:
653 : //
654 : // Thread enqueueing This thread
655 : // 1. read pending
656 : // 2. add block to pending
657 : // 3. add to syncQ
658 : // 4. read syncQ
659 1 : workStartTime := time.Now()
660 1 : idleDuration := workStartTime.Sub(idleStartTime)
661 1 : pending = append(pending[:0], f.pending...)
662 1 : f.pending = f.pending[:0]
663 1 : f.metrics.PendingBufferLen.AddSample(int64(len(pending)))
664 1 :
665 1 : // Grab the list of sync waiters. Note that syncQueue.load() will return
666 1 : // 0,0 while we're waiting for the min-sync-interval to expire. This
667 1 : // allows flushing to proceed even if we're not ready to sync.
668 1 : snap := f.pendingSyncs.snapshotForPop()
669 1 :
670 1 : // Grab the portion of the current block that requires flushing. Note that
671 1 : // the current block can be added to the pending blocks list after we
672 1 : // release the flusher lock, but it won't be part of pending. This has to
673 1 : // be ordered after we get the list of sync waiters from syncQ in order to
674 1 : // prevent a race where a waiter adds itself to syncQ, but this thread
675 1 : // picks up the entry in syncQ and not the buffered data.
676 1 : written := w.block.written.Load()
677 1 : data := w.block.buf[w.block.flushed:written]
678 1 : w.block.flushed = written
679 1 :
680 1 : fErr := f.err
681 1 : f.Unlock()
682 1 : // If flusher has an error, we propagate it to waiters. Note in spite of
683 1 : // error we consume the pending list above to free blocks for writers.
684 1 : if fErr != nil {
685 0 : // NB: pop may invoke ExternalSyncQueueCallback, which is why we have
686 0 : // called f.Unlock() above. We will acquire the lock again below.
687 0 : f.pendingSyncs.pop(snap, fErr)
688 0 : // Update the idleStartTime if work could not be done, so that we don't
689 0 : // include the duration we tried to do work as idle. We don't bother
690 0 : // with the rest of the accounting, which means we will undercount.
691 0 : idleStartTime = time.Now()
692 0 : f.Lock()
693 0 : continue
694 : }
695 1 : synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, snap)
696 1 : f.Lock()
697 1 : if synced && f.fsyncLatency != nil {
698 1 : f.fsyncLatency.Observe(float64(syncLatency))
699 1 : }
700 1 : f.err = err
701 1 : if f.err != nil {
702 0 : f.pendingSyncs.clearBlocked()
703 0 : // Update the idleStartTime if work could not be done, so that we don't
704 0 : // include the duration we tried to do work as idle. We don't bother
705 0 : // with the rest of the accounting, which means we will undercount.
706 0 : idleStartTime = time.Now()
707 0 : continue
708 : }
709 :
710 1 : if synced && f.minSyncInterval != nil {
711 0 : // A sync was performed. Make sure we've waited for the min sync
712 0 : // interval before syncing again.
713 0 : if min := f.minSyncInterval(); min > 0 {
714 0 : f.pendingSyncs.setBlocked()
715 0 : if syncTimer == nil {
716 0 : syncTimer = w.afterFunc(min, func() {
717 0 : f.pendingSyncs.clearBlocked()
718 0 : f.ready.Signal()
719 0 : })
720 0 : } else {
721 0 : syncTimer.Reset(min)
722 0 : }
723 : }
724 : }
725 : // Finished work, and started idling.
726 1 : idleStartTime = time.Now()
727 1 : workDuration := idleStartTime.Sub(workStartTime)
728 1 : f.metrics.WriteThroughput.Bytes += bytesWritten
729 1 : f.metrics.WriteThroughput.WorkDuration += workDuration
730 1 : f.metrics.WriteThroughput.IdleDuration += idleDuration
731 : }
732 : }
733 :
734 : func (w *LogWriter) flushPending(
735 : data []byte, pending []*block, snap pendingSyncsSnapshot,
736 1 : ) (synced bool, syncLatency time.Duration, bytesWritten int64, err error) {
737 1 : defer func() {
738 1 : // Translate panics into errors. The errors will cause flushLoop to shut
739 1 : // down, but allows us to do so in a controlled way and avoid swallowing
740 1 : // the stack that created the panic if panic'ing itself hits a panic
741 1 : // (e.g. unlock of unlocked mutex).
742 1 : if r := recover(); r != nil {
743 0 : err = errors.Newf("%v", r)
744 0 : }
745 : }()
746 :
747 1 : for _, b := range pending {
748 0 : bytesWritten += blockSize - int64(b.flushed)
749 0 : if err = w.flushBlock(b); err != nil {
750 0 : break
751 : }
752 : }
753 1 : if n := len(data); err == nil && n > 0 {
754 1 : bytesWritten += int64(n)
755 1 : _, err = w.w.Write(data)
756 1 : }
757 :
758 1 : synced = !snap.empty()
759 1 : if synced {
760 1 : if err == nil && w.s != nil {
761 1 : syncLatency, err = w.syncWithLatency()
762 1 : } else {
763 0 : synced = false
764 0 : }
765 1 : f := &w.flusher
766 1 : if popErr := f.pendingSyncs.pop(snap, err); popErr != nil {
767 0 : return synced, syncLatency, bytesWritten, firstError(err, popErr)
768 0 : }
769 : }
770 :
771 1 : return synced, syncLatency, bytesWritten, err
772 : }
773 :
774 1 : func (w *LogWriter) syncWithLatency() (time.Duration, error) {
775 1 : start := time.Now()
776 1 : err := w.s.Sync()
777 1 : syncLatency := time.Since(start)
778 1 : return syncLatency, err
779 1 : }
780 :
781 0 : func (w *LogWriter) flushBlock(b *block) error {
782 0 : if _, err := w.w.Write(b.buf[b.flushed:]); err != nil {
783 0 : return err
784 0 : }
785 0 : b.written.Store(0)
786 0 : b.flushed = 0
787 0 : w.free.Lock()
788 0 : w.free.blocks = append(w.free.blocks, b)
789 0 : w.free.Unlock()
790 0 : return nil
791 : }
792 :
793 : // queueBlock queues the current block for writing to the underlying writer,
794 : // allocates a new block and reserves space for the next header.
795 0 : func (w *LogWriter) queueBlock() {
796 0 : // Allocate a new block, blocking until one is available. We do this first
797 0 : // because w.block is protected by w.flusher.Mutex.
798 0 : w.free.Lock()
799 0 : if len(w.free.blocks) == 0 {
800 0 : w.free.blocks = append(w.free.blocks, blockPool.Get().(*block))
801 0 : }
802 0 : nextBlock := w.free.blocks[len(w.free.blocks)-1]
803 0 : w.free.blocks = w.free.blocks[:len(w.free.blocks)-1]
804 0 : w.free.Unlock()
805 0 :
806 0 : f := &w.flusher
807 0 : f.Lock()
808 0 : f.pending = append(f.pending, w.block)
809 0 : w.block = nextBlock
810 0 : f.ready.Signal()
811 0 : w.err = w.flusher.err
812 0 : f.Unlock()
813 0 :
814 0 : w.blockNum++
815 : }
816 :
817 : // Close flushes and syncs any unwritten data and closes the writer.
818 : // Where required, external synchronisation is provided by commitPipeline.mu.
819 1 : func (w *LogWriter) Close() error {
820 1 : return w.closeInternal(PendingSyncIndex{Index: NoSyncIndex})
821 1 : }
822 :
823 : // CloseWithLastQueuedRecord is like Close, but optionally accepts a
824 : // lastQueuedRecord, that the caller will be notified about when synced.
825 0 : func (w *LogWriter) CloseWithLastQueuedRecord(lastQueuedRecord PendingSyncIndex) error {
826 0 : return w.closeInternal(lastQueuedRecord)
827 0 : }
828 :
829 1 : func (w *LogWriter) closeInternal(lastQueuedRecord PendingSyncIndex) error {
830 1 : f := &w.flusher
831 1 :
832 1 : // Emit an EOF trailer signifying the end of this log. This helps readers
833 1 : // differentiate between a corrupted entry in the middle of a log from
834 1 : // garbage at the tail from a recycled log file.
835 1 : w.emitEOFTrailer()
836 1 :
837 1 : // Signal the flush loop to close.
838 1 : f.Lock()
839 1 : f.close = true
840 1 : f.ready.Signal()
841 1 : f.Unlock()
842 1 :
843 1 : // Wait for the flush loop to close. The flush loop will not close until all
844 1 : // pending data has been written or an error occurs.
845 1 : <-f.closed
846 1 :
847 1 : // Sync any flushed data to disk. NB: flushLoop will sync after flushing the
848 1 : // last buffered data only if it was requested via syncQ, so we need to sync
849 1 : // here to ensure that all the data is synced.
850 1 : err := w.flusher.err
851 1 : var syncLatency time.Duration
852 1 : if err == nil && w.s != nil {
853 1 : syncLatency, err = w.syncWithLatency()
854 1 : }
855 1 : f.Lock()
856 1 : if err == nil && f.fsyncLatency != nil {
857 1 : f.fsyncLatency.Observe(float64(syncLatency))
858 1 : }
859 1 : free := w.free.blocks
860 1 : f.Unlock()
861 1 :
862 1 : // NB: the caller of closeInternal may not care about a non-nil cerr below
863 1 : // if all queued writes have been successfully written and synced.
864 1 : if lastQueuedRecord.Index != NoSyncIndex {
865 0 : w.pendingSyncsBackingIndex.externalSyncQueueCallback(lastQueuedRecord, err)
866 0 : }
867 1 : if w.c != nil {
868 1 : cerr := w.c.Close()
869 1 : w.c = nil
870 1 : err = firstError(err, cerr)
871 1 : }
872 :
873 1 : for _, b := range free {
874 0 : b.flushed = 0
875 0 : b.written.Store(0)
876 0 : blockPool.Put(b)
877 0 : }
878 :
879 1 : w.err = errClosedWriter
880 1 : return err
881 : }
882 :
883 : // firstError returns the first non-nil error of err0 and err1, or nil if both
884 : // are nil.
885 1 : func firstError(err0, err1 error) error {
886 1 : if err0 != nil {
887 0 : return err0
888 0 : }
889 1 : return err1
890 : }
891 :
892 : // WriteRecord writes a complete record. Returns the offset just past the end
893 : // of the record.
894 : // External synchronisation provided by commitPipeline.mu.
895 0 : func (w *LogWriter) WriteRecord(p []byte) (int64, error) {
896 0 : logSize, err := w.SyncRecord(p, nil, nil)
897 0 : return logSize, err
898 0 : }
899 :
900 : // SyncRecord writes a complete record. If wg != nil the record will be
901 : // asynchronously persisted to the underlying writer and done will be called on
902 : // the wait group upon completion. Returns the offset just past the end of the
903 : // record.
904 : // External synchronisation provided by commitPipeline.mu.
905 : func (w *LogWriter) SyncRecord(
906 : p []byte, wg *sync.WaitGroup, err *error,
907 1 : ) (logSize int64, err2 error) {
908 1 : w.pendingSyncForSyncQueueBacking = pendingSyncForSyncQueue{
909 1 : wg: wg,
910 1 : err: err,
911 1 : }
912 1 : return w.SyncRecordGeneralized(p, &w.pendingSyncForSyncQueueBacking)
913 1 : }
914 :
915 : // SyncRecordGeneralized is a version of SyncRecord that accepts a
916 : // PendingSync.
917 1 : func (w *LogWriter) SyncRecordGeneralized(p []byte, ps PendingSync) (logSize int64, err2 error) {
918 1 : if w.err != nil {
919 0 : return -1, w.err
920 0 : }
921 :
922 : // The `i == 0` condition ensures we handle empty records. Such records can
923 : // possibly be generated for VersionEdits stored in the MANIFEST. While the
924 : // MANIFEST is currently written using Writer, it is good to support the same
925 : // semantics with LogWriter.
926 1 : for i := 0; i == 0 || len(p) > 0; i++ {
927 1 : p = w.emitFragment(i, p)
928 1 : }
929 :
930 1 : if ps.syncRequested() {
931 1 : // If we've been asked to persist the record, add the WaitGroup to the sync
932 1 : // queue and signal the flushLoop. Note that flushLoop will write partial
933 1 : // blocks to the file if syncing has been requested. The contract is that
934 1 : // any record written to the LogWriter to this point will be flushed to the
935 1 : // OS and synced to disk.
936 1 : f := &w.flusher
937 1 : f.pendingSyncs.push(ps)
938 1 : f.ready.Signal()
939 1 : }
940 :
941 1 : offset := w.blockNum*blockSize + int64(w.block.written.Load())
942 1 : // Note that we don't return w.err here as a concurrent call to Close would
943 1 : // race with our read. That's ok because the only error we could be seeing is
944 1 : // one to syncing for which the caller can receive notification of by passing
945 1 : // in a non-nil err argument.
946 1 : return offset, nil
947 : }
948 :
949 : // Size returns the current size of the file.
950 : // External synchronisation provided by commitPipeline.mu.
951 1 : func (w *LogWriter) Size() int64 {
952 1 : return w.blockNum*blockSize + int64(w.block.written.Load())
953 1 : }
954 :
955 1 : func (w *LogWriter) emitEOFTrailer() {
956 1 : // Write a recyclable chunk header with a different log number. Readers
957 1 : // will treat the header as EOF when the log number does not match.
958 1 : b := w.block
959 1 : i := b.written.Load()
960 1 : binary.LittleEndian.PutUint32(b.buf[i+0:i+4], 0) // CRC
961 1 : binary.LittleEndian.PutUint16(b.buf[i+4:i+6], 0) // Size
962 1 : b.buf[i+6] = recyclableFullChunkType
963 1 : binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum+1) // Log number
964 1 : b.written.Store(i + int32(recyclableHeaderSize))
965 1 : }
966 :
967 1 : func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte) {
968 1 : b := w.block
969 1 : i := b.written.Load()
970 1 : first := n == 0
971 1 : last := blockSize-i-recyclableHeaderSize >= int32(len(p))
972 1 :
973 1 : if last {
974 1 : if first {
975 1 : b.buf[i+6] = recyclableFullChunkType
976 1 : } else {
977 0 : b.buf[i+6] = recyclableLastChunkType
978 0 : }
979 0 : } else {
980 0 : if first {
981 0 : b.buf[i+6] = recyclableFirstChunkType
982 0 : } else {
983 0 : b.buf[i+6] = recyclableMiddleChunkType
984 0 : }
985 : }
986 :
987 1 : binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum)
988 1 :
989 1 : r := copy(b.buf[i+recyclableHeaderSize:], p)
990 1 : j := i + int32(recyclableHeaderSize+r)
991 1 : binary.LittleEndian.PutUint32(b.buf[i+0:i+4], crc.New(b.buf[i+6:j]).Value())
992 1 : binary.LittleEndian.PutUint16(b.buf[i+4:i+6], uint16(r))
993 1 : b.written.Store(j)
994 1 :
995 1 : if blockSize-b.written.Load() < recyclableHeaderSize {
996 0 : // There is no room for another fragment in the block, so fill the
997 0 : // remaining bytes with zeros and queue the block for flushing.
998 0 : clear(b.buf[b.written.Load():])
999 0 : w.queueBlock()
1000 0 : }
1001 1 : return p[r:]
1002 : }
1003 :
1004 : // Metrics must typically be called after Close, since the callee will no
1005 : // longer modify the returned LogWriterMetrics. It is also current if there is
1006 : // nothing left to flush in the flush loop, but that is an implementation
1007 : // detail that callers should not rely on.
1008 1 : func (w *LogWriter) Metrics() LogWriterMetrics {
1009 1 : w.flusher.Lock()
1010 1 : defer w.flusher.Unlock()
1011 1 : m := *w.flusher.metrics
1012 1 : return m
1013 1 : }
1014 :
1015 : // LogWriterMetrics contains misc metrics for the log writer.
1016 : type LogWriterMetrics struct {
1017 : WriteThroughput base.ThroughputMetric
1018 : PendingBufferLen base.GaugeSampleMetric
1019 : SyncQueueLen base.GaugeSampleMetric
1020 : }
1021 :
1022 : // Merge merges metrics from x. Requires that x is non-nil.
1023 1 : func (m *LogWriterMetrics) Merge(x *LogWriterMetrics) error {
1024 1 : m.WriteThroughput.Merge(x.WriteThroughput)
1025 1 : m.PendingBufferLen.Merge(x.PendingBufferLen)
1026 1 : m.SyncQueueLen.Merge(x.SyncQueueLen)
1027 1 : return nil
1028 1 : }
|