Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package record
6 :
7 : import (
8 : "context"
9 : "encoding/binary"
10 : "io"
11 : "runtime/pprof"
12 : "sync"
13 : "sync/atomic"
14 : "time"
15 :
16 : "github.com/cockroachdb/crlib/crtime"
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/crc"
20 : "github.com/prometheus/client_golang/prometheus"
21 : )
22 :
23 : var walSyncLabels = pprof.Labels("pebble", "wal-sync")
24 : var errClosedWriter = errors.New("pebble/record: closed LogWriter")
25 :
26 : type block struct {
27 : // buf[:written] has already been filled with fragments. Updated atomically.
28 : written atomic.Int32
29 : // buf[:flushed] has already been flushed to w.
30 : flushed int32
31 : buf [blockSize]byte
32 : }
33 :
34 : type flusher interface {
35 : Flush() error
36 : }
37 :
38 : type syncer interface {
39 : Sync() error
40 : }
41 :
42 : const (
43 : syncConcurrencyBits = 12
44 :
45 : // SyncConcurrency is the maximum number of concurrent sync operations that
46 : // can be performed. Note that a sync operation is initiated either by a call
47 : // to SyncRecord or by a call to Close. Exported as this value also limits
48 : // the commit concurrency in commitPipeline.
49 : SyncConcurrency = 1 << syncConcurrencyBits
50 : )
51 :
52 : type syncSlot struct {
53 : wg *sync.WaitGroup
54 : err *error
55 : }
56 :
57 : // syncQueue is a lock-free fixed-size single-producer, single-consumer
58 : // queue. The single-producer can push to the head, and the single-consumer can
59 : // pop multiple values from the tail. Popping calls Done() on each of the
60 : // available *sync.WaitGroup elements.
61 : type syncQueue struct {
62 : // headTail packs together a 32-bit head index and a 32-bit tail index. Both
63 : // are indexes into slots modulo len(slots)-1.
64 : //
65 : // tail = index of oldest data in queue
66 : // head = index of next slot to fill
67 : //
68 : // Slots in the range [tail, head) are owned by consumers. A consumer
69 : // continues to own a slot outside this range until it nils the slot, at
70 : // which point ownership passes to the producer.
71 : //
72 : // The head index is stored in the most-significant bits so that we can
73 : // atomically add to it and the overflow is harmless.
74 : headTail atomic.Uint64
75 :
76 : // slots is a ring buffer of values stored in this queue. The size must be a
77 : // power of 2. A slot is in use until the tail index has moved beyond it.
78 : slots [SyncConcurrency]syncSlot
79 :
80 : // blocked is an atomic boolean which indicates whether syncing is currently
81 : // blocked or can proceed. It is used by the implementation of
82 : // min-sync-interval to block syncing until the min interval has passed.
83 : blocked atomic.Bool
84 : }
85 :
86 : const dequeueBits = 32
87 :
88 : // unpack extracts the head and tail indices from a 64-bit unsigned integer.
89 1 : func (q *syncQueue) unpack(ptrs uint64) (head, tail uint32) {
90 1 : const mask = 1<<dequeueBits - 1
91 1 : head = uint32((ptrs >> dequeueBits) & mask)
92 1 : tail = uint32(ptrs & mask)
93 1 : return head, tail
94 1 : }
95 :
96 1 : func (q *syncQueue) push(wg *sync.WaitGroup, err *error) {
97 1 : ptrs := q.headTail.Load()
98 1 : head, tail := q.unpack(ptrs)
99 1 : if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
100 0 : panic("pebble: queue is full")
101 : }
102 :
103 1 : slot := &q.slots[head&uint32(len(q.slots)-1)]
104 1 : slot.wg = wg
105 1 : slot.err = err
106 1 :
107 1 : // Increment head. This passes ownership of slot to dequeue and acts as a
108 1 : // store barrier for writing the slot.
109 1 : q.headTail.Add(1 << dequeueBits)
110 : }
111 :
112 1 : func (q *syncQueue) setBlocked() {
113 1 : q.blocked.Store(true)
114 1 : }
115 :
116 1 : func (q *syncQueue) clearBlocked() {
117 1 : q.blocked.Store(false)
118 1 : }
119 :
120 1 : func (q *syncQueue) empty() bool {
121 1 : head, tail, _ := q.load()
122 1 : return head == tail
123 1 : }
124 :
125 : // load returns the head, tail of the queue for what should be synced to the
126 : // caller. It can return a head, tail of zero if syncing is blocked due to
127 : // min-sync-interval. It additionally returns the real length of this queue,
128 : // regardless of whether syncing is blocked.
129 1 : func (q *syncQueue) load() (head, tail, realLength uint32) {
130 1 : ptrs := q.headTail.Load()
131 1 : head, tail = q.unpack(ptrs)
132 1 : realLength = head - tail
133 1 : if q.blocked.Load() {
134 1 : return 0, 0, realLength
135 1 : }
136 1 : return head, tail, realLength
137 : }
138 :
139 : // REQUIRES: queueSemChan is non-nil.
140 1 : func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{}) error {
141 1 : if tail == head {
142 1 : // Queue is empty.
143 1 : return nil
144 1 : }
145 :
146 1 : for ; tail != head; tail++ {
147 1 : slot := &q.slots[tail&uint32(len(q.slots)-1)]
148 1 : wg := slot.wg
149 1 : if wg == nil {
150 0 : return errors.Errorf("nil waiter at %d", errors.Safe(tail&uint32(len(q.slots)-1)))
151 0 : }
152 1 : *slot.err = err
153 1 : slot.wg = nil
154 1 : slot.err = nil
155 1 : // We need to bump the tail count before releasing the queueSemChan
156 1 : // semaphore as releasing the semaphore can cause a blocked goroutine to
157 1 : // acquire the semaphore and enqueue before we've "freed" space in the
158 1 : // queue.
159 1 : q.headTail.Add(1)
160 1 : wg.Done()
161 1 : // Is always non-nil in production, unless using wal package for WAL
162 1 : // failover.
163 1 : if queueSemChan != nil {
164 1 : <-queueSemChan
165 1 : }
166 : }
167 :
168 1 : return nil
169 : }
170 :
171 : // pendingSyncs abstracts out the handling of pending sync requests. In
172 : // standalone mode the implementation is a thin wrapper around syncQueue. In
173 : // the mode where the LogWriter can be subject to failover, there is no queue
174 : // kept in the LogWriter and the signaling to those waiting for sync is
175 : // handled in the wal package.
176 : //
177 : // To avoid heap allocations due to the use of this interface, the parameters
178 : // and return values follow some strict rules:
179 : // - The PendingSync parameter can be reused by the caller after push returns.
180 : // The implementation should be a pointer backed by a struct that is already
181 : // heap allocated, which the caller can reuse for the next push call.
182 : // - The pendingSyncSnapshot return value must be backed by the pendingSyncs
183 : // implementation, so calling snapshotForPop again will cause the previous
184 : // snapshot to be overwritten.
185 : type pendingSyncs interface {
186 : push(PendingSync)
187 : setBlocked()
188 : clearBlocked()
189 : empty() bool
190 : snapshotForPop() pendingSyncsSnapshot
191 : pop(snap pendingSyncsSnapshot, err error) error
192 : }
193 :
194 : type pendingSyncsSnapshot interface {
195 : empty() bool
196 : }
197 :
198 : // PendingSync abstracts the sync specification for a record queued on the
199 : // LogWriter. The only implementations are provided in this package since
200 : // syncRequested is not exported.
201 : type PendingSync interface {
202 : syncRequested() bool
203 : }
204 :
205 : // The implementation of pendingSyncs in standalone mode.
206 : type pendingSyncsWithSyncQueue struct {
207 : syncQueue
208 : syncQueueLen *base.GaugeSampleMetric
209 : snapshotBacking syncQueueSnapshot
210 : // See the comment for LogWriterConfig.QueueSemChan.
211 : queueSemChan chan struct{}
212 : }
213 :
214 : var _ pendingSyncs = &pendingSyncsWithSyncQueue{}
215 :
216 1 : func (q *pendingSyncsWithSyncQueue) push(ps PendingSync) {
217 1 : ps2 := ps.(*pendingSyncForSyncQueue)
218 1 : q.syncQueue.push(ps2.wg, ps2.err)
219 1 : }
220 :
221 1 : func (q *pendingSyncsWithSyncQueue) snapshotForPop() pendingSyncsSnapshot {
222 1 : head, tail, realLength := q.syncQueue.load()
223 1 : q.snapshotBacking = syncQueueSnapshot{
224 1 : head: head,
225 1 : tail: tail,
226 1 : }
227 1 : q.syncQueueLen.AddSample(int64(realLength))
228 1 : return &q.snapshotBacking
229 1 : }
230 :
231 1 : func (q *pendingSyncsWithSyncQueue) pop(snap pendingSyncsSnapshot, err error) error {
232 1 : s := snap.(*syncQueueSnapshot)
233 1 : return q.syncQueue.pop(s.head, s.tail, err, q.queueSemChan)
234 1 : }
235 :
236 : // The implementation of pendingSyncsSnapshot in standalone mode.
237 : type syncQueueSnapshot struct {
238 : head, tail uint32
239 : }
240 :
241 1 : func (s *syncQueueSnapshot) empty() bool {
242 1 : return s.head == s.tail
243 1 : }
244 :
245 : // The implementation of pendingSync in standalone mode.
246 : type pendingSyncForSyncQueue struct {
247 : wg *sync.WaitGroup
248 : err *error
249 : }
250 :
251 1 : func (ps *pendingSyncForSyncQueue) syncRequested() bool {
252 1 : return ps.wg != nil
253 1 : }
254 :
255 : // The implementation of pendingSyncs in failover mode.
256 : type pendingSyncsWithHighestSyncIndex struct {
257 : // The highest "index" queued that is requesting a sync. Initialized
258 : // to NoSyncIndex, and reset to NoSyncIndex after the sync.
259 : index atomic.Int64
260 : snapshotBacking PendingSyncIndex
261 : // blocked is an atomic boolean which indicates whether syncing is currently
262 : // blocked or can proceed. It is used by the implementation of
263 : // min-sync-interval to block syncing until the min interval has passed.
264 : blocked atomic.Bool
265 : externalSyncQueueCallback ExternalSyncQueueCallback
266 : }
267 :
268 : // NoSyncIndex is the value of PendingSyncIndex when a sync is not requested.
269 : const NoSyncIndex = -1
270 :
271 : func (si *pendingSyncsWithHighestSyncIndex) init(
272 : externalSyncQueueCallback ExternalSyncQueueCallback,
273 1 : ) {
274 1 : si.index.Store(NoSyncIndex)
275 1 : si.externalSyncQueueCallback = externalSyncQueueCallback
276 1 : }
277 :
278 1 : func (si *pendingSyncsWithHighestSyncIndex) push(ps PendingSync) {
279 1 : ps2 := ps.(*PendingSyncIndex)
280 1 : si.index.Store(ps2.Index)
281 1 : }
282 :
283 1 : func (si *pendingSyncsWithHighestSyncIndex) setBlocked() {
284 1 : si.blocked.Store(true)
285 1 : }
286 :
287 1 : func (si *pendingSyncsWithHighestSyncIndex) clearBlocked() {
288 1 : si.blocked.Store(false)
289 1 : }
290 :
291 1 : func (si *pendingSyncsWithHighestSyncIndex) empty() bool {
292 1 : return si.load() == NoSyncIndex
293 1 : }
294 :
295 1 : func (si *pendingSyncsWithHighestSyncIndex) snapshotForPop() pendingSyncsSnapshot {
296 1 : si.snapshotBacking = PendingSyncIndex{Index: si.load()}
297 1 : return &si.snapshotBacking
298 1 : }
299 :
300 1 : func (si *pendingSyncsWithHighestSyncIndex) load() int64 {
301 1 : index := si.index.Load()
302 1 : if index != NoSyncIndex && si.blocked.Load() {
303 1 : index = NoSyncIndex
304 1 : }
305 1 : return index
306 : }
307 :
308 1 : func (si *pendingSyncsWithHighestSyncIndex) pop(snap pendingSyncsSnapshot, err error) error {
309 1 : index := snap.(*PendingSyncIndex)
310 1 : if index.Index == NoSyncIndex {
311 1 : return nil
312 1 : }
313 : // Set to NoSyncIndex if a higher index has not queued.
314 1 : si.index.CompareAndSwap(index.Index, NoSyncIndex)
315 1 : si.externalSyncQueueCallback(*index, err)
316 1 : return nil
317 : }
318 :
319 : // PendingSyncIndex implements both pendingSyncsSnapshot and PendingSync.
320 : type PendingSyncIndex struct {
321 : // Index is some state meaningful to the user of LogWriter. The LogWriter
322 : // itself only examines whether Index is equal to NoSyncIndex.
323 : Index int64
324 : }
325 :
326 1 : func (s *PendingSyncIndex) empty() bool {
327 1 : return s.Index == NoSyncIndex
328 1 : }
329 :
330 1 : func (s *PendingSyncIndex) syncRequested() bool {
331 1 : return s.Index != NoSyncIndex
332 1 : }
333 :
334 : // flusherCond is a specialized condition variable that allows its condition to
335 : // change and readiness be signalled without holding its associated mutex. In
336 : // particular, when a waiter is added to syncQueue atomically, this condition
337 : // variable can be signalled without holding flusher.Mutex.
338 : type flusherCond struct {
339 : mu *sync.Mutex
340 : q pendingSyncs
341 : cond sync.Cond
342 : }
343 :
344 1 : func (c *flusherCond) init(mu *sync.Mutex, q pendingSyncs) {
345 1 : c.mu = mu
346 1 : c.q = q
347 1 : // Yes, this is a bit circular, but that is intentional. flusherCond.cond.L
348 1 : // points flusherCond so that when cond.L.Unlock is called flusherCond.Unlock
349 1 : // will be called and we can check the !syncQueue.empty() condition.
350 1 : c.cond.L = c
351 1 : }
352 :
353 1 : func (c *flusherCond) Signal() {
354 1 : // Pass-through to the cond var.
355 1 : c.cond.Signal()
356 1 : }
357 :
358 1 : func (c *flusherCond) Wait() {
359 1 : // Pass-through to the cond var. Note that internally the cond var implements
360 1 : // Wait as:
361 1 : //
362 1 : // t := notifyListAdd()
363 1 : // L.Unlock()
364 1 : // notifyListWait(t)
365 1 : // L.Lock()
366 1 : //
367 1 : // We've configured the cond var to call flusherReady.Unlock() which allows
368 1 : // us to check the !syncQueue.empty() condition without a danger of missing a
369 1 : // notification. Any call to flusherReady.Signal() after notifyListAdd() is
370 1 : // called will cause the subsequent notifyListWait() to return immediately.
371 1 : c.cond.Wait()
372 1 : }
373 :
374 1 : func (c *flusherCond) Lock() {
375 1 : c.mu.Lock()
376 1 : }
377 :
378 1 : func (c *flusherCond) Unlock() {
379 1 : c.mu.Unlock()
380 1 : if !c.q.empty() {
381 1 : // If the current goroutine is about to block on sync.Cond.Wait, this call
382 1 : // to Signal will prevent that. The comment in Wait above explains a bit
383 1 : // about what is going on here, but it is worth reiterating:
384 1 : //
385 1 : // flusherCond.Wait()
386 1 : // sync.Cond.Wait()
387 1 : // t := notifyListAdd()
388 1 : // flusherCond.Unlock() <-- we are here
389 1 : // notifyListWait(t)
390 1 : // flusherCond.Lock()
391 1 : //
392 1 : // The call to Signal here results in:
393 1 : //
394 1 : // sync.Cond.Signal()
395 1 : // notifyListNotifyOne()
396 1 : //
397 1 : // The call to notifyListNotifyOne() will prevent the call to
398 1 : // notifyListWait(t) from blocking.
399 1 : c.cond.Signal()
400 1 : }
401 : }
402 :
403 : type durationFunc func() time.Duration
404 :
405 : // syncTimer is an interface for timers, modeled on the closure callback mode
406 : // of time.Timer. See time.AfterFunc and LogWriter.afterFunc. syncTimer is used
407 : // by tests to mock out the timer functionality used to implement
408 : // min-sync-interval.
409 : type syncTimer interface {
410 : Reset(time.Duration) bool
411 : Stop() bool
412 : }
413 :
414 : // LogWriter writes records to an underlying io.Writer. In order to support WAL
415 : // file reuse, a LogWriter's records are tagged with the WAL's file
416 : // number. When reading a log file a record from a previous incarnation of the
417 : // file will return the error ErrInvalidLogNum.
418 : type LogWriter struct {
419 : // w is the underlying writer.
420 : w io.Writer
421 : // c is w as a closer.
422 : c io.Closer
423 : // s is w as a syncer.
424 : s syncer
425 : // logNum is the low 32-bits of the log's file number.
426 : logNum uint32
427 : // blockNum is the zero based block number for the current block.
428 : blockNum int64
429 : // err is any accumulated error. It originates in flusher.err, and is
430 : // updated to reflect flusher.err when a block is full and getting enqueued.
431 : // Therefore, there is a lag between when flusher.err has a non-nil error,
432 : // and when that non-nil error is reflected in LogWriter.err. On close, it
433 : // is set to errClosedWriter to inform accidental future calls to
434 : // SyncRecord*.
435 : err error
436 : // block is the current block being written. Protected by flusher.Mutex.
437 : block *block
438 : free struct {
439 : sync.Mutex
440 : blocks []*block
441 : }
442 :
443 : flusher struct {
444 : sync.Mutex
445 : // Flusher ready is a condition variable that is signalled when there are
446 : // blocks to flush, syncing has been requested, or the LogWriter has been
447 : // closed. For signalling of a sync, it is safe to call without holding
448 : // flusher.Mutex.
449 : ready flusherCond
450 : // Set to true when the flush loop should be closed.
451 : close bool
452 : // Closed when the flush loop has terminated.
453 : closed chan struct{}
454 : // Accumulated flush error.
455 : err error
456 : // minSyncInterval is the minimum duration between syncs.
457 : minSyncInterval durationFunc
458 : fsyncLatency prometheus.Histogram
459 : pending []*block
460 : // Pushing and popping from pendingSyncs does not require flusher mutex to
461 : // be held.
462 : pendingSyncs pendingSyncs
463 : metrics *LogWriterMetrics
464 : }
465 :
466 : // afterFunc is a hook to allow tests to mock out the timer functionality
467 : // used for min-sync-interval. In normal operation this points to
468 : // time.AfterFunc.
469 : afterFunc func(d time.Duration, f func()) syncTimer
470 :
471 : // Backing for both pendingSyncs implementations.
472 : pendingSyncsBackingQ pendingSyncsWithSyncQueue
473 : pendingSyncsBackingIndex pendingSyncsWithHighestSyncIndex
474 :
475 : pendingSyncForSyncQueueBacking pendingSyncForSyncQueue
476 : }
477 :
478 : // LogWriterConfig is a struct used for configuring new LogWriters
479 : type LogWriterConfig struct {
480 : WALMinSyncInterval durationFunc
481 : WALFsyncLatency prometheus.Histogram
482 : // QueueSemChan is an optional channel to pop from when popping from
483 : // LogWriter.flusher.syncQueue. It functions as a semaphore that prevents
484 : // the syncQueue from overflowing (which will cause a panic). All production
485 : // code ensures this is non-nil.
486 : QueueSemChan chan struct{}
487 :
488 : // ExternalSyncQueueCallback is set to non-nil when the LogWriter is used
489 : // as part of a WAL implementation that can failover between LogWriters.
490 : //
491 : // In this case, QueueSemChan is always nil, and SyncRecordGeneralized must
492 : // be used with a PendingSync parameter that is implemented by
493 : // PendingSyncIndex. When an index is synced (which implies all earlier
494 : // indices are also synced), this callback is invoked. The caller must not
495 : // hold any mutex when invoking this callback, since the lock ordering
496 : // requirement in this case is that any higher layer locks (in the wal
497 : // package) precede the lower layer locks (in the record package). These
498 : // callbacks are serialized since they are invoked from the flushLoop.
499 : ExternalSyncQueueCallback ExternalSyncQueueCallback
500 : }
501 :
502 : // ExternalSyncQueueCallback is to be run when a PendingSync has been
503 : // processed, either successfully or with an error.
504 : type ExternalSyncQueueCallback func(doneSync PendingSyncIndex, err error)
505 :
506 : // initialAllocatedBlocksCap is the initial capacity of the various slices
507 : // intended to hold LogWriter blocks. The LogWriter may allocate more blocks
508 : // than this threshold allows.
509 : const initialAllocatedBlocksCap = 32
510 :
511 : // blockPool pools *blocks to avoid allocations. Blocks are only added to the
512 : // Pool when a LogWriter is closed. Before that, free blocks are maintained
513 : // within a LogWriter's own internal free list `w.free.blocks`.
514 : var blockPool = sync.Pool{
515 1 : New: func() any { return &block{} },
516 : }
517 :
518 : // NewLogWriter returns a new LogWriter.
519 : //
520 : // The io.Writer may also be used as an io.Closer and syncer. No other methods
521 : // will be called on the writer.
522 : func NewLogWriter(
523 : w io.Writer, logNum base.DiskFileNum, logWriterConfig LogWriterConfig,
524 1 : ) *LogWriter {
525 1 : c, _ := w.(io.Closer)
526 1 : s, _ := w.(syncer)
527 1 : r := &LogWriter{
528 1 : w: w,
529 1 : c: c,
530 1 : s: s,
531 1 : // NB: we truncate the 64-bit log number to 32-bits. This is ok because a)
532 1 : // we are very unlikely to reach a file number of 4 billion and b) the log
533 1 : // number is used as a validation check and using only the low 32-bits is
534 1 : // sufficient for that purpose.
535 1 : logNum: uint32(logNum),
536 1 : afterFunc: func(d time.Duration, f func()) syncTimer {
537 0 : return time.AfterFunc(d, f)
538 0 : },
539 : }
540 1 : m := &LogWriterMetrics{}
541 1 : if logWriterConfig.ExternalSyncQueueCallback != nil {
542 1 : r.pendingSyncsBackingIndex.init(logWriterConfig.ExternalSyncQueueCallback)
543 1 : r.flusher.pendingSyncs = &r.pendingSyncsBackingIndex
544 1 : } else {
545 1 : r.pendingSyncsBackingQ = pendingSyncsWithSyncQueue{
546 1 : syncQueueLen: &m.SyncQueueLen,
547 1 : queueSemChan: logWriterConfig.QueueSemChan,
548 1 : }
549 1 : r.flusher.pendingSyncs = &r.pendingSyncsBackingQ
550 1 : }
551 :
552 1 : r.free.blocks = make([]*block, 0, initialAllocatedBlocksCap)
553 1 : r.block = blockPool.Get().(*block)
554 1 : r.flusher.ready.init(&r.flusher.Mutex, r.flusher.pendingSyncs)
555 1 : r.flusher.closed = make(chan struct{})
556 1 : r.flusher.pending = make([]*block, 0, cap(r.free.blocks))
557 1 : r.flusher.metrics = m
558 1 :
559 1 : f := &r.flusher
560 1 : f.minSyncInterval = logWriterConfig.WALMinSyncInterval
561 1 : f.fsyncLatency = logWriterConfig.WALFsyncLatency
562 1 :
563 1 : go func() {
564 1 : pprof.Do(context.Background(), walSyncLabels, r.flushLoop)
565 1 : }()
566 1 : return r
567 : }
568 :
569 1 : func (w *LogWriter) flushLoop(context.Context) {
570 1 : f := &w.flusher
571 1 : f.Lock()
572 1 :
573 1 : // Initialize idleStartTime to when the loop starts.
574 1 : idleStartTime := crtime.NowMono()
575 1 : var syncTimer syncTimer
576 1 : defer func() {
577 1 : // Capture the idle duration between the last piece of work and when the
578 1 : // loop terminated.
579 1 : f.metrics.WriteThroughput.IdleDuration += idleStartTime.Elapsed()
580 1 : if syncTimer != nil {
581 1 : syncTimer.Stop()
582 1 : }
583 1 : close(f.closed)
584 1 : f.Unlock()
585 : }()
586 :
587 : // The flush loop performs flushing of full and partial data blocks to the
588 : // underlying writer (LogWriter.w), syncing of the writer, and notification
589 : // to sync requests that they have completed.
590 : //
591 : // - flusher.ready is a condition variable that is signalled when there is
592 : // work to do. Full blocks are contained in flusher.pending. The current
593 : // partial block is in LogWriter.block. And sync operations are held in
594 : // flusher.syncQ.
595 : //
596 : // - The decision to sync is determined by whether there are any sync
597 : // requests present in flusher.syncQ and whether enough time has elapsed
598 : // since the last sync. If not enough time has elapsed since the last sync,
599 : // flusher.syncQ.blocked will be set to 1. If syncing is blocked,
600 : // syncQueue.empty() will return true and syncQueue.load() will return 0,0
601 : // (i.e. an empty list).
602 : //
603 : // - flusher.syncQ.blocked is cleared by a timer that is initialized when
604 : // blocked is set to 1. When blocked is 1, no syncing will take place, but
605 : // flushing will continue to be performed. The on/off toggle for syncing
606 : // does not need to be carefully synchronized with the rest of processing
607 : // -- all we need to ensure is that after any transition to blocked=1 there
608 : // is eventually a transition to blocked=0. syncTimer performs this
609 : // transition. Note that any change to min-sync-interval will not take
610 : // effect until the previous timer elapses.
611 : //
612 : // - Picking up the syncing work to perform requires coordination with
613 : // picking up the flushing work. Specifically, flushing work is queued
614 : // before syncing work. The guarantee of this code is that when a sync is
615 : // requested, any previously queued flush work will be synced. This
616 : // motivates reading the syncing work (f.syncQ.load()) before picking up
617 : // the flush work (w.block.written.Load()).
618 :
619 : // The list of full blocks that need to be written. This is copied from
620 : // f.pending on every loop iteration, though the number of elements is
621 : // usually small (most frequently 1). In the case of the WAL LogWriter, the
622 : // number of blocks is bounded by the size of the WAL's corresponding
623 : // memtable (MemtableSize/BlockSize). With the default 64 MiB memtables,
624 : // this works out to at most 2048 elements if the entirety of the memtable's
625 : // contents are queued.
626 1 : pending := make([]*block, 0, cap(f.pending))
627 1 : for {
628 1 : for {
629 1 : // Grab the portion of the current block that requires flushing. Note that
630 1 : // the current block can be added to the pending blocks list after we release
631 1 : // the flusher lock, but it won't be part of pending.
632 1 : written := w.block.written.Load()
633 1 : if len(f.pending) > 0 || written > w.block.flushed || !f.pendingSyncs.empty() {
634 1 : break
635 : }
636 1 : if f.close {
637 1 : // If the writer is closed, pretend the sync timer fired immediately so
638 1 : // that we can process any queued sync requests.
639 1 : f.pendingSyncs.clearBlocked()
640 1 : if !f.pendingSyncs.empty() {
641 1 : break
642 : }
643 1 : return
644 : }
645 1 : f.ready.Wait()
646 1 : continue
647 : }
648 : // Found work to do, so no longer idle.
649 : //
650 : // NB: it is safe to read pending before loading from the syncQ since
651 : // mutations to pending require the w.flusher mutex, which is held here.
652 : // There is no risk that someone will concurrently add to pending, so the
653 : // following sequence, which would pick up a syncQ entry without the
654 : // corresponding data, is impossible:
655 : //
656 : // Thread enqueueing This thread
657 : // 1. read pending
658 : // 2. add block to pending
659 : // 3. add to syncQ
660 : // 4. read syncQ
661 1 : workStartTime := crtime.NowMono()
662 1 : idleDuration := workStartTime.Sub(idleStartTime)
663 1 : pending = append(pending[:0], f.pending...)
664 1 : f.pending = f.pending[:0]
665 1 : f.metrics.PendingBufferLen.AddSample(int64(len(pending)))
666 1 :
667 1 : // Grab the list of sync waiters. Note that syncQueue.load() will return
668 1 : // 0,0 while we're waiting for the min-sync-interval to expire. This
669 1 : // allows flushing to proceed even if we're not ready to sync.
670 1 : snap := f.pendingSyncs.snapshotForPop()
671 1 :
672 1 : // Grab the portion of the current block that requires flushing. Note that
673 1 : // the current block can be added to the pending blocks list after we
674 1 : // release the flusher lock, but it won't be part of pending. This has to
675 1 : // be ordered after we get the list of sync waiters from syncQ in order to
676 1 : // prevent a race where a waiter adds itself to syncQ, but this thread
677 1 : // picks up the entry in syncQ and not the buffered data.
678 1 : written := w.block.written.Load()
679 1 : data := w.block.buf[w.block.flushed:written]
680 1 : w.block.flushed = written
681 1 :
682 1 : fErr := f.err
683 1 : f.Unlock()
684 1 : // If flusher has an error, we propagate it to waiters. Note in spite of
685 1 : // error we consume the pending list above to free blocks for writers.
686 1 : if fErr != nil {
687 1 : // NB: pop may invoke ExternalSyncQueueCallback, which is why we have
688 1 : // called f.Unlock() above. We will acquire the lock again below.
689 1 : f.pendingSyncs.pop(snap, fErr)
690 1 : // Update the idleStartTime if work could not be done, so that we don't
691 1 : // include the duration we tried to do work as idle. We don't bother
692 1 : // with the rest of the accounting, which means we will undercount.
693 1 : idleStartTime = crtime.NowMono()
694 1 : f.Lock()
695 1 : continue
696 : }
697 1 : synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, snap)
698 1 : f.Lock()
699 1 : if synced && f.fsyncLatency != nil {
700 1 : f.fsyncLatency.Observe(float64(syncLatency))
701 1 : }
702 1 : f.err = err
703 1 : if f.err != nil {
704 1 : f.pendingSyncs.clearBlocked()
705 1 : // Update the idleStartTime if work could not be done, so that we don't
706 1 : // include the duration we tried to do work as idle. We don't bother
707 1 : // with the rest of the accounting, which means we will undercount.
708 1 : idleStartTime = crtime.NowMono()
709 1 : continue
710 : }
711 :
712 1 : if synced && f.minSyncInterval != nil {
713 1 : // A sync was performed. Make sure we've waited for the min sync
714 1 : // interval before syncing again.
715 1 : if min := f.minSyncInterval(); min > 0 {
716 1 : f.pendingSyncs.setBlocked()
717 1 : if syncTimer == nil {
718 1 : syncTimer = w.afterFunc(min, func() {
719 1 : f.pendingSyncs.clearBlocked()
720 1 : f.ready.Signal()
721 1 : })
722 1 : } else {
723 1 : syncTimer.Reset(min)
724 1 : }
725 : }
726 : }
727 : // Finished work, and started idling.
728 1 : idleStartTime = crtime.NowMono()
729 1 : workDuration := idleStartTime.Sub(workStartTime)
730 1 : f.metrics.WriteThroughput.Bytes += bytesWritten
731 1 : f.metrics.WriteThroughput.WorkDuration += workDuration
732 1 : f.metrics.WriteThroughput.IdleDuration += idleDuration
733 : }
734 : }
735 :
736 : func (w *LogWriter) flushPending(
737 : data []byte, pending []*block, snap pendingSyncsSnapshot,
738 1 : ) (synced bool, syncLatency time.Duration, bytesWritten int64, err error) {
739 1 : defer func() {
740 1 : // Translate panics into errors. The errors will cause flushLoop to shut
741 1 : // down, but allows us to do so in a controlled way and avoid swallowing
742 1 : // the stack that created the panic if panic'ing itself hits a panic
743 1 : // (e.g. unlock of unlocked mutex).
744 1 : if r := recover(); r != nil {
745 0 : err = errors.Newf("%v", r)
746 0 : }
747 : }()
748 :
749 1 : for _, b := range pending {
750 1 : bytesWritten += blockSize - int64(b.flushed)
751 1 : if err = w.flushBlock(b); err != nil {
752 0 : break
753 : }
754 : }
755 1 : if n := len(data); err == nil && n > 0 {
756 1 : bytesWritten += int64(n)
757 1 : _, err = w.w.Write(data)
758 1 : }
759 :
760 1 : synced = !snap.empty()
761 1 : if synced {
762 1 : if err == nil && w.s != nil {
763 1 : syncLatency, err = w.syncWithLatency()
764 1 : } else {
765 1 : synced = false
766 1 : }
767 1 : f := &w.flusher
768 1 : if popErr := f.pendingSyncs.pop(snap, err); popErr != nil {
769 0 : return synced, syncLatency, bytesWritten, firstError(err, popErr)
770 0 : }
771 : }
772 :
773 1 : return synced, syncLatency, bytesWritten, err
774 : }
775 :
776 1 : func (w *LogWriter) syncWithLatency() (time.Duration, error) {
777 1 : start := crtime.NowMono()
778 1 : err := w.s.Sync()
779 1 : syncLatency := start.Elapsed()
780 1 : return syncLatency, err
781 1 : }
782 :
783 1 : func (w *LogWriter) flushBlock(b *block) error {
784 1 : if _, err := w.w.Write(b.buf[b.flushed:]); err != nil {
785 0 : return err
786 0 : }
787 1 : b.written.Store(0)
788 1 : b.flushed = 0
789 1 : w.free.Lock()
790 1 : w.free.blocks = append(w.free.blocks, b)
791 1 : w.free.Unlock()
792 1 : return nil
793 : }
794 :
795 : // queueBlock queues the current block for writing to the underlying writer,
796 : // allocates a new block and reserves space for the next header.
797 1 : func (w *LogWriter) queueBlock() {
798 1 : // Allocate a new block, blocking until one is available. We do this first
799 1 : // because w.block is protected by w.flusher.Mutex.
800 1 : w.free.Lock()
801 1 : if len(w.free.blocks) == 0 {
802 1 : w.free.blocks = append(w.free.blocks, blockPool.Get().(*block))
803 1 : }
804 1 : nextBlock := w.free.blocks[len(w.free.blocks)-1]
805 1 : w.free.blocks = w.free.blocks[:len(w.free.blocks)-1]
806 1 : w.free.Unlock()
807 1 :
808 1 : f := &w.flusher
809 1 : f.Lock()
810 1 : f.pending = append(f.pending, w.block)
811 1 : w.block = nextBlock
812 1 : f.ready.Signal()
813 1 : w.err = w.flusher.err
814 1 : f.Unlock()
815 1 :
816 1 : w.blockNum++
817 : }
818 :
819 : // Close flushes and syncs any unwritten data and closes the writer.
820 : // Where required, external synchronisation is provided by commitPipeline.mu.
821 1 : func (w *LogWriter) Close() error {
822 1 : return w.closeInternal(PendingSyncIndex{Index: NoSyncIndex})
823 1 : }
824 :
825 : // CloseWithLastQueuedRecord is like Close, but optionally accepts a
826 : // lastQueuedRecord, that the caller will be notified about when synced.
827 1 : func (w *LogWriter) CloseWithLastQueuedRecord(lastQueuedRecord PendingSyncIndex) error {
828 1 : return w.closeInternal(lastQueuedRecord)
829 1 : }
830 :
831 1 : func (w *LogWriter) closeInternal(lastQueuedRecord PendingSyncIndex) error {
832 1 : f := &w.flusher
833 1 :
834 1 : // Emit an EOF trailer signifying the end of this log. This helps readers
835 1 : // differentiate between a corrupted entry in the middle of a log from
836 1 : // garbage at the tail from a recycled log file.
837 1 : w.emitEOFTrailer()
838 1 :
839 1 : // Signal the flush loop to close.
840 1 : f.Lock()
841 1 : f.close = true
842 1 : f.ready.Signal()
843 1 : f.Unlock()
844 1 :
845 1 : // Wait for the flush loop to close. The flush loop will not close until all
846 1 : // pending data has been written or an error occurs.
847 1 : <-f.closed
848 1 :
849 1 : // Sync any flushed data to disk. NB: flushLoop will sync after flushing the
850 1 : // last buffered data only if it was requested via syncQ, so we need to sync
851 1 : // here to ensure that all the data is synced.
852 1 : err := w.flusher.err
853 1 : var syncLatency time.Duration
854 1 : if err == nil && w.s != nil {
855 1 : syncLatency, err = w.syncWithLatency()
856 1 : }
857 1 : f.Lock()
858 1 : if err == nil && f.fsyncLatency != nil {
859 1 : f.fsyncLatency.Observe(float64(syncLatency))
860 1 : }
861 1 : free := w.free.blocks
862 1 : f.Unlock()
863 1 :
864 1 : // NB: the caller of closeInternal may not care about a non-nil cerr below
865 1 : // if all queued writes have been successfully written and synced.
866 1 : if lastQueuedRecord.Index != NoSyncIndex {
867 1 : w.pendingSyncsBackingIndex.externalSyncQueueCallback(lastQueuedRecord, err)
868 1 : }
869 1 : if w.c != nil {
870 1 : cerr := w.c.Close()
871 1 : w.c = nil
872 1 : err = firstError(err, cerr)
873 1 : }
874 :
875 1 : for _, b := range free {
876 1 : b.flushed = 0
877 1 : b.written.Store(0)
878 1 : blockPool.Put(b)
879 1 : }
880 :
881 1 : w.err = errClosedWriter
882 1 : return err
883 : }
884 :
885 : // firstError returns the first non-nil error of err0 and err1, or nil if both
886 : // are nil.
887 1 : func firstError(err0, err1 error) error {
888 1 : if err0 != nil {
889 1 : return err0
890 1 : }
891 1 : return err1
892 : }
893 :
894 : // WriteRecord writes a complete record. Returns the offset just past the end
895 : // of the record.
896 : // External synchronisation provided by commitPipeline.mu.
897 1 : func (w *LogWriter) WriteRecord(p []byte) (int64, error) {
898 1 : logSize, err := w.SyncRecord(p, nil, nil)
899 1 : return logSize, err
900 1 : }
901 :
902 : // SyncRecord writes a complete record. If wg != nil the record will be
903 : // asynchronously persisted to the underlying writer and done will be called on
904 : // the wait group upon completion. Returns the offset just past the end of the
905 : // record.
906 : // External synchronisation provided by commitPipeline.mu.
907 : func (w *LogWriter) SyncRecord(
908 : p []byte, wg *sync.WaitGroup, err *error,
909 1 : ) (logSize int64, err2 error) {
910 1 : w.pendingSyncForSyncQueueBacking = pendingSyncForSyncQueue{
911 1 : wg: wg,
912 1 : err: err,
913 1 : }
914 1 : return w.SyncRecordGeneralized(p, &w.pendingSyncForSyncQueueBacking)
915 1 : }
916 :
917 : // SyncRecordGeneralized is a version of SyncRecord that accepts a
918 : // PendingSync.
919 1 : func (w *LogWriter) SyncRecordGeneralized(p []byte, ps PendingSync) (logSize int64, err2 error) {
920 1 : if w.err != nil {
921 0 : return -1, w.err
922 0 : }
923 :
924 : // The `i == 0` condition ensures we handle empty records. Such records can
925 : // possibly be generated for VersionEdits stored in the MANIFEST. While the
926 : // MANIFEST is currently written using Writer, it is good to support the same
927 : // semantics with LogWriter.
928 1 : for i := 0; i == 0 || len(p) > 0; i++ {
929 1 : p = w.emitFragment(i, p)
930 1 : }
931 :
932 1 : if ps.syncRequested() {
933 1 : // If we've been asked to persist the record, add the WaitGroup to the sync
934 1 : // queue and signal the flushLoop. Note that flushLoop will write partial
935 1 : // blocks to the file if syncing has been requested. The contract is that
936 1 : // any record written to the LogWriter to this point will be flushed to the
937 1 : // OS and synced to disk.
938 1 : f := &w.flusher
939 1 : f.pendingSyncs.push(ps)
940 1 : f.ready.Signal()
941 1 : }
942 :
943 1 : offset := w.blockNum*blockSize + int64(w.block.written.Load())
944 1 : // Note that we don't return w.err here as a concurrent call to Close would
945 1 : // race with our read. That's ok because the only error we could be seeing is
946 1 : // one to syncing for which the caller can receive notification of by passing
947 1 : // in a non-nil err argument.
948 1 : return offset, nil
949 : }
950 :
951 : // Size returns the current size of the file.
952 : // External synchronisation provided by commitPipeline.mu.
953 1 : func (w *LogWriter) Size() int64 {
954 1 : return w.blockNum*blockSize + int64(w.block.written.Load())
955 1 : }
956 :
957 1 : func (w *LogWriter) emitEOFTrailer() {
958 1 : // Write a recyclable chunk header with a different log number. Readers
959 1 : // will treat the header as EOF when the log number does not match.
960 1 : b := w.block
961 1 : i := b.written.Load()
962 1 : binary.LittleEndian.PutUint32(b.buf[i+0:i+4], 0) // CRC
963 1 : binary.LittleEndian.PutUint16(b.buf[i+4:i+6], 0) // Size
964 1 : b.buf[i+6] = recyclableFullChunkType
965 1 : binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum+1) // Log number
966 1 : b.written.Store(i + int32(recyclableHeaderSize))
967 1 : }
968 :
969 1 : func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte) {
970 1 : b := w.block
971 1 : i := b.written.Load()
972 1 : first := n == 0
973 1 : last := blockSize-i-recyclableHeaderSize >= int32(len(p))
974 1 :
975 1 : if last {
976 1 : if first {
977 1 : b.buf[i+6] = recyclableFullChunkType
978 1 : } else {
979 1 : b.buf[i+6] = recyclableLastChunkType
980 1 : }
981 1 : } else {
982 1 : if first {
983 1 : b.buf[i+6] = recyclableFirstChunkType
984 1 : } else {
985 1 : b.buf[i+6] = recyclableMiddleChunkType
986 1 : }
987 : }
988 :
989 1 : binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum)
990 1 :
991 1 : r := copy(b.buf[i+recyclableHeaderSize:], p)
992 1 : j := i + int32(recyclableHeaderSize+r)
993 1 : binary.LittleEndian.PutUint32(b.buf[i+0:i+4], crc.New(b.buf[i+6:j]).Value())
994 1 : binary.LittleEndian.PutUint16(b.buf[i+4:i+6], uint16(r))
995 1 : b.written.Store(j)
996 1 :
997 1 : if blockSize-b.written.Load() < recyclableHeaderSize {
998 1 : // There is no room for another fragment in the block, so fill the
999 1 : // remaining bytes with zeros and queue the block for flushing.
1000 1 : clear(b.buf[b.written.Load():])
1001 1 : w.queueBlock()
1002 1 : }
1003 1 : return p[r:]
1004 : }
1005 :
1006 : // Metrics must typically be called after Close, since the callee will no
1007 : // longer modify the returned LogWriterMetrics. It is also current if there is
1008 : // nothing left to flush in the flush loop, but that is an implementation
1009 : // detail that callers should not rely on.
1010 1 : func (w *LogWriter) Metrics() LogWriterMetrics {
1011 1 : w.flusher.Lock()
1012 1 : defer w.flusher.Unlock()
1013 1 : m := *w.flusher.metrics
1014 1 : return m
1015 1 : }
1016 :
1017 : // LogWriterMetrics contains misc metrics for the log writer.
1018 : type LogWriterMetrics struct {
1019 : WriteThroughput base.ThroughputMetric
1020 : PendingBufferLen base.GaugeSampleMetric
1021 : SyncQueueLen base.GaugeSampleMetric
1022 : }
1023 :
1024 : // Merge merges metrics from x. Requires that x is non-nil.
1025 1 : func (m *LogWriterMetrics) Merge(x *LogWriterMetrics) error {
1026 1 : m.WriteThroughput.Merge(x.WriteThroughput)
1027 1 : m.PendingBufferLen.Merge(x.PendingBufferLen)
1028 1 : m.SyncQueueLen.Merge(x.SyncQueueLen)
1029 1 : return nil
1030 1 : }
|