Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package record
6 :
7 : import (
8 : "context"
9 : "encoding/binary"
10 : "io"
11 : "runtime/pprof"
12 : "sync"
13 : "sync/atomic"
14 : "time"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/crc"
19 : "github.com/prometheus/client_golang/prometheus"
20 : )
21 :
22 : var walSyncLabels = pprof.Labels("pebble", "wal-sync")
23 : var errClosedWriter = errors.New("pebble/record: closed LogWriter")
24 :
25 : type block struct {
26 : // buf[:written] has already been filled with fragments. Updated atomically.
27 : written atomic.Int32
28 : // buf[:flushed] has already been flushed to w.
29 : flushed int32
30 : buf [blockSize]byte
31 : }
32 :
33 : type flusher interface {
34 : Flush() error
35 : }
36 :
37 : type syncer interface {
38 : Sync() error
39 : }
40 :
41 : const (
42 : syncConcurrencyBits = 12
43 :
44 : // SyncConcurrency is the maximum number of concurrent sync operations that
45 : // can be performed. Note that a sync operation is initiated either by a call
46 : // to SyncRecord or by a call to Close. Exported as this value also limits
47 : // the commit concurrency in commitPipeline.
48 : SyncConcurrency = 1 << syncConcurrencyBits
49 : )
50 :
51 : type syncSlot struct {
52 : wg *sync.WaitGroup
53 : err *error
54 : }
55 :
56 : // syncQueue is a lock-free fixed-size single-producer, single-consumer
57 : // queue. The single-producer can push to the head, and the single-consumer can
58 : // pop multiple values from the tail. Popping calls Done() on each of the
59 : // available *sync.WaitGroup elements.
60 : type syncQueue struct {
61 : // headTail packs together a 32-bit head index and a 32-bit tail index. Both
62 : // are indexes into slots modulo len(slots)-1.
63 : //
64 : // tail = index of oldest data in queue
65 : // head = index of next slot to fill
66 : //
67 : // Slots in the range [tail, head) are owned by consumers. A consumer
68 : // continues to own a slot outside this range until it nils the slot, at
69 : // which point ownership passes to the producer.
70 : //
71 : // The head index is stored in the most-significant bits so that we can
72 : // atomically add to it and the overflow is harmless.
73 : headTail atomic.Uint64
74 :
75 : // slots is a ring buffer of values stored in this queue. The size must be a
76 : // power of 2. A slot is in use until the tail index has moved beyond it.
77 : slots [SyncConcurrency]syncSlot
78 :
79 : // blocked is an atomic boolean which indicates whether syncing is currently
80 : // blocked or can proceed. It is used by the implementation of
81 : // min-sync-interval to block syncing until the min interval has passed.
82 : blocked atomic.Bool
83 : }
84 :
85 : const dequeueBits = 32
86 :
87 : // unpack extracts the head and tail indices from a 64-bit unsigned integer.
88 1 : func (q *syncQueue) unpack(ptrs uint64) (head, tail uint32) {
89 1 : const mask = 1<<dequeueBits - 1
90 1 : head = uint32((ptrs >> dequeueBits) & mask)
91 1 : tail = uint32(ptrs & mask)
92 1 : return head, tail
93 1 : }
94 :
95 1 : func (q *syncQueue) push(wg *sync.WaitGroup, err *error) {
96 1 : ptrs := q.headTail.Load()
97 1 : head, tail := q.unpack(ptrs)
98 1 : if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
99 0 : panic("pebble: queue is full")
100 : }
101 :
102 1 : slot := &q.slots[head&uint32(len(q.slots)-1)]
103 1 : slot.wg = wg
104 1 : slot.err = err
105 1 :
106 1 : // Increment head. This passes ownership of slot to dequeue and acts as a
107 1 : // store barrier for writing the slot.
108 1 : q.headTail.Add(1 << dequeueBits)
109 : }
110 :
111 0 : func (q *syncQueue) setBlocked() {
112 0 : q.blocked.Store(true)
113 0 : }
114 :
115 1 : func (q *syncQueue) clearBlocked() {
116 1 : q.blocked.Store(false)
117 1 : }
118 :
119 1 : func (q *syncQueue) empty() bool {
120 1 : head, tail, _ := q.load()
121 1 : return head == tail
122 1 : }
123 :
124 : // load returns the head, tail of the queue for what should be synced to the
125 : // caller. It can return a head, tail of zero if syncing is blocked due to
126 : // min-sync-interval. It additionally returns the real length of this queue,
127 : // regardless of whether syncing is blocked.
128 1 : func (q *syncQueue) load() (head, tail, realLength uint32) {
129 1 : ptrs := q.headTail.Load()
130 1 : head, tail = q.unpack(ptrs)
131 1 : realLength = head - tail
132 1 : if q.blocked.Load() {
133 0 : return 0, 0, realLength
134 0 : }
135 1 : return head, tail, realLength
136 : }
137 :
138 : // REQUIRES: queueSemChan is non-nil.
139 1 : func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{}) error {
140 1 : if tail == head {
141 0 : // Queue is empty.
142 0 : return nil
143 0 : }
144 :
145 1 : for ; tail != head; tail++ {
146 1 : slot := &q.slots[tail&uint32(len(q.slots)-1)]
147 1 : wg := slot.wg
148 1 : if wg == nil {
149 0 : return errors.Errorf("nil waiter at %d", errors.Safe(tail&uint32(len(q.slots)-1)))
150 0 : }
151 1 : *slot.err = err
152 1 : slot.wg = nil
153 1 : slot.err = nil
154 1 : // We need to bump the tail count before releasing the queueSemChan
155 1 : // semaphore as releasing the semaphore can cause a blocked goroutine to
156 1 : // acquire the semaphore and enqueue before we've "freed" space in the
157 1 : // queue.
158 1 : q.headTail.Add(1)
159 1 : wg.Done()
160 1 : // Is always non-nil in production, unless using wal package for WAL
161 1 : // failover.
162 1 : if queueSemChan != nil {
163 1 : <-queueSemChan
164 1 : }
165 : }
166 :
167 1 : return nil
168 : }
169 :
170 : // pendingSyncs abstracts out the handling of pending sync requests. In
171 : // standalone mode the implementation is a thin wrapper around syncQueue. In
172 : // the mode where the LogWriter can be subject to failover, there is no queue
173 : // kept in the LogWriter and the signaling to those waiting for sync is
174 : // handled in the wal package.
175 : //
176 : // To avoid heap allocations due to the use of this interface, the parameters
177 : // and return values follow some strict rules:
178 : // - The PendingSync parameter can be reused by the caller after push returns.
179 : // The implementation should be a pointer backed by a struct that is already
180 : // heap allocated, which the caller can reuse for the next push call.
181 : // - The pendingSyncSnapshot return value must be backed by the pendingSyncs
182 : // implementation, so calling snapshotForPop again will cause the previous
183 : // snapshot to be overwritten.
184 : type pendingSyncs interface {
185 : push(PendingSync)
186 : setBlocked()
187 : clearBlocked()
188 : empty() bool
189 : snapshotForPop() pendingSyncsSnapshot
190 : pop(snap pendingSyncsSnapshot, err error) error
191 : }
192 :
193 : type pendingSyncsSnapshot interface {
194 : empty() bool
195 : }
196 :
197 : // PendingSync abstracts the sync specification for a record queued on the
198 : // LogWriter. The only implementations are provided in this package since
199 : // syncRequested is not exported.
200 : type PendingSync interface {
201 : syncRequested() bool
202 : }
203 :
204 : // The implementation of pendingSyncs in standalone mode.
205 : type pendingSyncsWithSyncQueue struct {
206 : syncQueue
207 : syncQueueLen *base.GaugeSampleMetric
208 : snapshotBacking syncQueueSnapshot
209 : // See the comment for LogWriterConfig.QueueSemChan.
210 : queueSemChan chan struct{}
211 : }
212 :
213 : var _ pendingSyncs = &pendingSyncsWithSyncQueue{}
214 :
215 1 : func (q *pendingSyncsWithSyncQueue) push(ps PendingSync) {
216 1 : ps2 := ps.(*pendingSyncForSyncQueue)
217 1 : q.syncQueue.push(ps2.wg, ps2.err)
218 1 : }
219 :
220 1 : func (q *pendingSyncsWithSyncQueue) snapshotForPop() pendingSyncsSnapshot {
221 1 : head, tail, realLength := q.syncQueue.load()
222 1 : q.snapshotBacking = syncQueueSnapshot{
223 1 : head: head,
224 1 : tail: tail,
225 1 : }
226 1 : q.syncQueueLen.AddSample(int64(realLength))
227 1 : return &q.snapshotBacking
228 1 : }
229 :
230 1 : func (q *pendingSyncsWithSyncQueue) pop(snap pendingSyncsSnapshot, err error) error {
231 1 : s := snap.(*syncQueueSnapshot)
232 1 : return q.syncQueue.pop(s.head, s.tail, err, q.queueSemChan)
233 1 : }
234 :
235 : // The implementation of pendingSyncsSnapshot in standalone mode.
236 : type syncQueueSnapshot struct {
237 : head, tail uint32
238 : }
239 :
240 1 : func (s *syncQueueSnapshot) empty() bool {
241 1 : return s.head == s.tail
242 1 : }
243 :
244 : // The implementation of pendingSync in standalone mode.
245 : type pendingSyncForSyncQueue struct {
246 : wg *sync.WaitGroup
247 : err *error
248 : }
249 :
250 1 : func (ps *pendingSyncForSyncQueue) syncRequested() bool {
251 1 : return ps.wg != nil
252 1 : }
253 :
254 : // The implementation of pendingSyncs in failover mode.
255 : type pendingSyncsWithHighestSyncIndex struct {
256 : // The highest "index" queued that is requesting a sync. Initialized
257 : // to NoSyncIndex, and reset to NoSyncIndex after the sync.
258 : index atomic.Int64
259 : snapshotBacking PendingSyncIndex
260 : // blocked is an atomic boolean which indicates whether syncing is currently
261 : // blocked or can proceed. It is used by the implementation of
262 : // min-sync-interval to block syncing until the min interval has passed.
263 : blocked atomic.Bool
264 : externalSyncQueueCallback ExternalSyncQueueCallback
265 : }
266 :
267 : // NoSyncIndex is the value of PendingSyncIndex when a sync is not requested.
268 : const NoSyncIndex = -1
269 :
270 : func (si *pendingSyncsWithHighestSyncIndex) init(
271 : externalSyncQueueCallback ExternalSyncQueueCallback,
272 1 : ) {
273 1 : si.index.Store(NoSyncIndex)
274 1 : si.externalSyncQueueCallback = externalSyncQueueCallback
275 1 : }
276 :
277 1 : func (si *pendingSyncsWithHighestSyncIndex) push(ps PendingSync) {
278 1 : ps2 := ps.(*PendingSyncIndex)
279 1 : si.index.Store(ps2.Index)
280 1 : }
281 :
282 0 : func (si *pendingSyncsWithHighestSyncIndex) setBlocked() {
283 0 : si.blocked.Store(true)
284 0 : }
285 :
286 1 : func (si *pendingSyncsWithHighestSyncIndex) clearBlocked() {
287 1 : si.blocked.Store(false)
288 1 : }
289 :
290 1 : func (si *pendingSyncsWithHighestSyncIndex) empty() bool {
291 1 : return si.load() == NoSyncIndex
292 1 : }
293 :
294 1 : func (si *pendingSyncsWithHighestSyncIndex) snapshotForPop() pendingSyncsSnapshot {
295 1 : si.snapshotBacking = PendingSyncIndex{Index: si.load()}
296 1 : return &si.snapshotBacking
297 1 : }
298 :
299 1 : func (si *pendingSyncsWithHighestSyncIndex) load() int64 {
300 1 : index := si.index.Load()
301 1 : if index != NoSyncIndex && si.blocked.Load() {
302 0 : index = NoSyncIndex
303 0 : }
304 1 : return index
305 : }
306 :
307 1 : func (si *pendingSyncsWithHighestSyncIndex) pop(snap pendingSyncsSnapshot, err error) error {
308 1 : index := snap.(*PendingSyncIndex)
309 1 : if index.Index == NoSyncIndex {
310 0 : return nil
311 0 : }
312 : // Set to NoSyncIndex if a higher index has not queued.
313 1 : si.index.CompareAndSwap(index.Index, NoSyncIndex)
314 1 : si.externalSyncQueueCallback(*index, err)
315 1 : return nil
316 : }
317 :
318 : // PendingSyncIndex implements both pendingSyncsSnapshot and PendingSync.
319 : type PendingSyncIndex struct {
320 : // Index is some state meaningful to the user of LogWriter. The LogWriter
321 : // itself only examines whether Index is equal to NoSyncIndex.
322 : Index int64
323 : }
324 :
325 1 : func (s *PendingSyncIndex) empty() bool {
326 1 : return s.Index == NoSyncIndex
327 1 : }
328 :
329 1 : func (s *PendingSyncIndex) syncRequested() bool {
330 1 : return s.Index != NoSyncIndex
331 1 : }
332 :
333 : // flusherCond is a specialized condition variable that allows its condition to
334 : // change and readiness be signalled without holding its associated mutex. In
335 : // particular, when a waiter is added to syncQueue atomically, this condition
336 : // variable can be signalled without holding flusher.Mutex.
337 : type flusherCond struct {
338 : mu *sync.Mutex
339 : q pendingSyncs
340 : cond sync.Cond
341 : }
342 :
343 1 : func (c *flusherCond) init(mu *sync.Mutex, q pendingSyncs) {
344 1 : c.mu = mu
345 1 : c.q = q
346 1 : // Yes, this is a bit circular, but that is intentional. flusherCond.cond.L
347 1 : // points flusherCond so that when cond.L.Unlock is called flusherCond.Unlock
348 1 : // will be called and we can check the !syncQueue.empty() condition.
349 1 : c.cond.L = c
350 1 : }
351 :
352 1 : func (c *flusherCond) Signal() {
353 1 : // Pass-through to the cond var.
354 1 : c.cond.Signal()
355 1 : }
356 :
357 1 : func (c *flusherCond) Wait() {
358 1 : // Pass-through to the cond var. Note that internally the cond var implements
359 1 : // Wait as:
360 1 : //
361 1 : // t := notifyListAdd()
362 1 : // L.Unlock()
363 1 : // notifyListWait(t)
364 1 : // L.Lock()
365 1 : //
366 1 : // We've configured the cond var to call flusherReady.Unlock() which allows
367 1 : // us to check the !syncQueue.empty() condition without a danger of missing a
368 1 : // notification. Any call to flusherReady.Signal() after notifyListAdd() is
369 1 : // called will cause the subsequent notifyListWait() to return immediately.
370 1 : c.cond.Wait()
371 1 : }
372 :
373 1 : func (c *flusherCond) Lock() {
374 1 : c.mu.Lock()
375 1 : }
376 :
377 1 : func (c *flusherCond) Unlock() {
378 1 : c.mu.Unlock()
379 1 : if !c.q.empty() {
380 1 : // If the current goroutine is about to block on sync.Cond.Wait, this call
381 1 : // to Signal will prevent that. The comment in Wait above explains a bit
382 1 : // about what is going on here, but it is worth reiterating:
383 1 : //
384 1 : // flusherCond.Wait()
385 1 : // sync.Cond.Wait()
386 1 : // t := notifyListAdd()
387 1 : // flusherCond.Unlock() <-- we are here
388 1 : // notifyListWait(t)
389 1 : // flusherCond.Lock()
390 1 : //
391 1 : // The call to Signal here results in:
392 1 : //
393 1 : // sync.Cond.Signal()
394 1 : // notifyListNotifyOne()
395 1 : //
396 1 : // The call to notifyListNotifyOne() will prevent the call to
397 1 : // notifyListWait(t) from blocking.
398 1 : c.cond.Signal()
399 1 : }
400 : }
401 :
402 : type durationFunc func() time.Duration
403 :
404 : // syncTimer is an interface for timers, modeled on the closure callback mode
405 : // of time.Timer. See time.AfterFunc and LogWriter.afterFunc. syncTimer is used
406 : // by tests to mock out the timer functionality used to implement
407 : // min-sync-interval.
408 : type syncTimer interface {
409 : Reset(time.Duration) bool
410 : Stop() bool
411 : }
412 :
413 : // LogWriter writes records to an underlying io.Writer. In order to support WAL
414 : // file reuse, a LogWriter's records are tagged with the WAL's file
415 : // number. When reading a log file a record from a previous incarnation of the
416 : // file will return the error ErrInvalidLogNum.
417 : type LogWriter struct {
418 : // w is the underlying writer.
419 : w io.Writer
420 : // c is w as a closer.
421 : c io.Closer
422 : // s is w as a syncer.
423 : s syncer
424 : // logNum is the low 32-bits of the log's file number.
425 : logNum uint32
426 : // blockNum is the zero based block number for the current block.
427 : blockNum int64
428 : // err is any accumulated error. It originates in flusher.err, and is
429 : // updated to reflect flusher.err when a block is full and getting enqueued.
430 : // Therefore, there is a lag between when flusher.err has a non-nil error,
431 : // and when that non-nil error is reflected in LogWriter.err. On close, it
432 : // is set to errClosedWriter to inform accidental future calls to
433 : // SyncRecord*.
434 : err error
435 : // block is the current block being written. Protected by flusher.Mutex.
436 : block *block
437 : free struct {
438 : sync.Mutex
439 : blocks []*block
440 : }
441 :
442 : flusher struct {
443 : sync.Mutex
444 : // Flusher ready is a condition variable that is signalled when there are
445 : // blocks to flush, syncing has been requested, or the LogWriter has been
446 : // closed. For signalling of a sync, it is safe to call without holding
447 : // flusher.Mutex.
448 : ready flusherCond
449 : // Set to true when the flush loop should be closed.
450 : close bool
451 : // Closed when the flush loop has terminated.
452 : closed chan struct{}
453 : // Accumulated flush error.
454 : err error
455 : // minSyncInterval is the minimum duration between syncs.
456 : minSyncInterval durationFunc
457 : fsyncLatency prometheus.Histogram
458 : pending []*block
459 : // Pushing and popping from pendingSyncs does not require flusher mutex to
460 : // be held.
461 : pendingSyncs pendingSyncs
462 : metrics *LogWriterMetrics
463 : }
464 :
465 : // afterFunc is a hook to allow tests to mock out the timer functionality
466 : // used for min-sync-interval. In normal operation this points to
467 : // time.AfterFunc.
468 : afterFunc func(d time.Duration, f func()) syncTimer
469 :
470 : // Backing for both pendingSyncs implementations.
471 : pendingSyncsBackingQ pendingSyncsWithSyncQueue
472 : pendingSyncsBackingIndex pendingSyncsWithHighestSyncIndex
473 :
474 : pendingSyncForSyncQueueBacking pendingSyncForSyncQueue
475 : }
476 :
477 : // LogWriterConfig is a struct used for configuring new LogWriters
478 : type LogWriterConfig struct {
479 : WALMinSyncInterval durationFunc
480 : WALFsyncLatency prometheus.Histogram
481 : // QueueSemChan is an optional channel to pop from when popping from
482 : // LogWriter.flusher.syncQueue. It functions as a semaphore that prevents
483 : // the syncQueue from overflowing (which will cause a panic). All production
484 : // code ensures this is non-nil.
485 : QueueSemChan chan struct{}
486 :
487 : // ExternalSyncQueueCallback is set to non-nil when the LogWriter is used
488 : // as part of a WAL implementation that can failover between LogWriters.
489 : //
490 : // In this case, QueueSemChan is always nil, and SyncRecordGeneralized must
491 : // be used with a PendingSync parameter that is implemented by
492 : // PendingSyncIndex. When an index is synced (which implies all earlier
493 : // indices are also synced), this callback is invoked. The caller must not
494 : // hold any mutex when invoking this callback, since the lock ordering
495 : // requirement in this case is that any higher layer locks (in the wal
496 : // package) precede the lower layer locks (in the record package). These
497 : // callbacks are serialized since they are invoked from the flushLoop.
498 : ExternalSyncQueueCallback ExternalSyncQueueCallback
499 : }
500 :
501 : // ExternalSyncQueueCallback is to be run when a PendingSync has been
502 : // processed, either successfully or with an error.
503 : type ExternalSyncQueueCallback func(doneSync PendingSyncIndex, err error)
504 :
505 : // initialAllocatedBlocksCap is the initial capacity of the various slices
506 : // intended to hold LogWriter blocks. The LogWriter may allocate more blocks
507 : // than this threshold allows.
508 : const initialAllocatedBlocksCap = 32
509 :
510 : // blockPool pools *blocks to avoid allocations. Blocks are only added to the
511 : // Pool when a LogWriter is closed. Before that, free blocks are maintained
512 : // within a LogWriter's own internal free list `w.free.blocks`.
513 : var blockPool = sync.Pool{
514 1 : New: func() any { return &block{} },
515 : }
516 :
517 : // NewLogWriter returns a new LogWriter.
518 : //
519 : // The io.Writer may also be used as an io.Closer and syncer. No other methods
520 : // will be called on the writer.
521 : func NewLogWriter(
522 : w io.Writer, logNum base.DiskFileNum, logWriterConfig LogWriterConfig,
523 1 : ) *LogWriter {
524 1 : c, _ := w.(io.Closer)
525 1 : s, _ := w.(syncer)
526 1 : r := &LogWriter{
527 1 : w: w,
528 1 : c: c,
529 1 : s: s,
530 1 : // NB: we truncate the 64-bit log number to 32-bits. This is ok because a)
531 1 : // we are very unlikely to reach a file number of 4 billion and b) the log
532 1 : // number is used as a validation check and using only the low 32-bits is
533 1 : // sufficient for that purpose.
534 1 : logNum: uint32(logNum),
535 1 : afterFunc: func(d time.Duration, f func()) syncTimer {
536 0 : return time.AfterFunc(d, f)
537 0 : },
538 : }
539 1 : m := &LogWriterMetrics{}
540 1 : if logWriterConfig.ExternalSyncQueueCallback != nil {
541 1 : r.pendingSyncsBackingIndex.init(logWriterConfig.ExternalSyncQueueCallback)
542 1 : r.flusher.pendingSyncs = &r.pendingSyncsBackingIndex
543 1 : } else {
544 1 : r.pendingSyncsBackingQ = pendingSyncsWithSyncQueue{
545 1 : syncQueueLen: &m.SyncQueueLen,
546 1 : queueSemChan: logWriterConfig.QueueSemChan,
547 1 : }
548 1 : r.flusher.pendingSyncs = &r.pendingSyncsBackingQ
549 1 : }
550 :
551 1 : r.free.blocks = make([]*block, 0, initialAllocatedBlocksCap)
552 1 : r.block = blockPool.Get().(*block)
553 1 : r.flusher.ready.init(&r.flusher.Mutex, r.flusher.pendingSyncs)
554 1 : r.flusher.closed = make(chan struct{})
555 1 : r.flusher.pending = make([]*block, 0, cap(r.free.blocks))
556 1 : r.flusher.metrics = m
557 1 :
558 1 : f := &r.flusher
559 1 : f.minSyncInterval = logWriterConfig.WALMinSyncInterval
560 1 : f.fsyncLatency = logWriterConfig.WALFsyncLatency
561 1 :
562 1 : go func() {
563 1 : pprof.Do(context.Background(), walSyncLabels, r.flushLoop)
564 1 : }()
565 1 : return r
566 : }
567 :
568 1 : func (w *LogWriter) flushLoop(context.Context) {
569 1 : f := &w.flusher
570 1 : f.Lock()
571 1 :
572 1 : // Initialize idleStartTime to when the loop starts.
573 1 : idleStartTime := time.Now()
574 1 : var syncTimer syncTimer
575 1 : defer func() {
576 1 : // Capture the idle duration between the last piece of work and when the
577 1 : // loop terminated.
578 1 : f.metrics.WriteThroughput.IdleDuration += time.Since(idleStartTime)
579 1 : if syncTimer != nil {
580 0 : syncTimer.Stop()
581 0 : }
582 1 : close(f.closed)
583 1 : f.Unlock()
584 : }()
585 :
586 : // The flush loop performs flushing of full and partial data blocks to the
587 : // underlying writer (LogWriter.w), syncing of the writer, and notification
588 : // to sync requests that they have completed.
589 : //
590 : // - flusher.ready is a condition variable that is signalled when there is
591 : // work to do. Full blocks are contained in flusher.pending. The current
592 : // partial block is in LogWriter.block. And sync operations are held in
593 : // flusher.syncQ.
594 : //
595 : // - The decision to sync is determined by whether there are any sync
596 : // requests present in flusher.syncQ and whether enough time has elapsed
597 : // since the last sync. If not enough time has elapsed since the last sync,
598 : // flusher.syncQ.blocked will be set to 1. If syncing is blocked,
599 : // syncQueue.empty() will return true and syncQueue.load() will return 0,0
600 : // (i.e. an empty list).
601 : //
602 : // - flusher.syncQ.blocked is cleared by a timer that is initialized when
603 : // blocked is set to 1. When blocked is 1, no syncing will take place, but
604 : // flushing will continue to be performed. The on/off toggle for syncing
605 : // does not need to be carefully synchronized with the rest of processing
606 : // -- all we need to ensure is that after any transition to blocked=1 there
607 : // is eventually a transition to blocked=0. syncTimer performs this
608 : // transition. Note that any change to min-sync-interval will not take
609 : // effect until the previous timer elapses.
610 : //
611 : // - Picking up the syncing work to perform requires coordination with
612 : // picking up the flushing work. Specifically, flushing work is queued
613 : // before syncing work. The guarantee of this code is that when a sync is
614 : // requested, any previously queued flush work will be synced. This
615 : // motivates reading the syncing work (f.syncQ.load()) before picking up
616 : // the flush work (w.block.written.Load()).
617 :
618 : // The list of full blocks that need to be written. This is copied from
619 : // f.pending on every loop iteration, though the number of elements is
620 : // usually small (most frequently 1). In the case of the WAL LogWriter, the
621 : // number of blocks is bounded by the size of the WAL's corresponding
622 : // memtable (MemtableSize/BlockSize). With the default 64 MiB memtables,
623 : // this works out to at most 2048 elements if the entirety of the memtable's
624 : // contents are queued.
625 1 : pending := make([]*block, 0, cap(f.pending))
626 1 : for {
627 1 : for {
628 1 : // Grab the portion of the current block that requires flushing. Note that
629 1 : // the current block can be added to the pending blocks list after we release
630 1 : // the flusher lock, but it won't be part of pending.
631 1 : written := w.block.written.Load()
632 1 : if len(f.pending) > 0 || written > w.block.flushed || !f.pendingSyncs.empty() {
633 1 : break
634 : }
635 1 : if f.close {
636 1 : // If the writer is closed, pretend the sync timer fired immediately so
637 1 : // that we can process any queued sync requests.
638 1 : f.pendingSyncs.clearBlocked()
639 1 : if !f.pendingSyncs.empty() {
640 0 : break
641 : }
642 1 : return
643 : }
644 1 : f.ready.Wait()
645 1 : continue
646 : }
647 : // Found work to do, so no longer idle.
648 : //
649 : // NB: it is safe to read pending before loading from the syncQ since
650 : // mutations to pending require the w.flusher mutex, which is held here.
651 : // There is no risk that someone will concurrently add to pending, so the
652 : // following sequence, which would pick up a syncQ entry without the
653 : // corresponding data, is impossible:
654 : //
655 : // Thread enqueueing This thread
656 : // 1. read pending
657 : // 2. add block to pending
658 : // 3. add to syncQ
659 : // 4. read syncQ
660 1 : workStartTime := time.Now()
661 1 : idleDuration := workStartTime.Sub(idleStartTime)
662 1 : pending = append(pending[:0], f.pending...)
663 1 : f.pending = f.pending[:0]
664 1 : f.metrics.PendingBufferLen.AddSample(int64(len(pending)))
665 1 :
666 1 : // Grab the list of sync waiters. Note that syncQueue.load() will return
667 1 : // 0,0 while we're waiting for the min-sync-interval to expire. This
668 1 : // allows flushing to proceed even if we're not ready to sync.
669 1 : snap := f.pendingSyncs.snapshotForPop()
670 1 :
671 1 : // Grab the portion of the current block that requires flushing. Note that
672 1 : // the current block can be added to the pending blocks list after we
673 1 : // release the flusher lock, but it won't be part of pending. This has to
674 1 : // be ordered after we get the list of sync waiters from syncQ in order to
675 1 : // prevent a race where a waiter adds itself to syncQ, but this thread
676 1 : // picks up the entry in syncQ and not the buffered data.
677 1 : written := w.block.written.Load()
678 1 : data := w.block.buf[w.block.flushed:written]
679 1 : w.block.flushed = written
680 1 :
681 1 : fErr := f.err
682 1 : f.Unlock()
683 1 : // If flusher has an error, we propagate it to waiters. Note in spite of
684 1 : // error we consume the pending list above to free blocks for writers.
685 1 : if fErr != nil {
686 0 : // NB: pop may invoke ExternalSyncQueueCallback, which is why we have
687 0 : // called f.Unlock() above. We will acquire the lock again below.
688 0 : f.pendingSyncs.pop(snap, fErr)
689 0 : // Update the idleStartTime if work could not be done, so that we don't
690 0 : // include the duration we tried to do work as idle. We don't bother
691 0 : // with the rest of the accounting, which means we will undercount.
692 0 : idleStartTime = time.Now()
693 0 : f.Lock()
694 0 : continue
695 : }
696 1 : synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, snap)
697 1 : f.Lock()
698 1 : if synced && f.fsyncLatency != nil {
699 1 : f.fsyncLatency.Observe(float64(syncLatency))
700 1 : }
701 1 : f.err = err
702 1 : if f.err != nil {
703 0 : f.pendingSyncs.clearBlocked()
704 0 : // Update the idleStartTime if work could not be done, so that we don't
705 0 : // include the duration we tried to do work as idle. We don't bother
706 0 : // with the rest of the accounting, which means we will undercount.
707 0 : idleStartTime = time.Now()
708 0 : continue
709 : }
710 :
711 1 : if synced && f.minSyncInterval != nil {
712 0 : // A sync was performed. Make sure we've waited for the min sync
713 0 : // interval before syncing again.
714 0 : if min := f.minSyncInterval(); min > 0 {
715 0 : f.pendingSyncs.setBlocked()
716 0 : if syncTimer == nil {
717 0 : syncTimer = w.afterFunc(min, func() {
718 0 : f.pendingSyncs.clearBlocked()
719 0 : f.ready.Signal()
720 0 : })
721 0 : } else {
722 0 : syncTimer.Reset(min)
723 0 : }
724 : }
725 : }
726 : // Finished work, and started idling.
727 1 : idleStartTime = time.Now()
728 1 : workDuration := idleStartTime.Sub(workStartTime)
729 1 : f.metrics.WriteThroughput.Bytes += bytesWritten
730 1 : f.metrics.WriteThroughput.WorkDuration += workDuration
731 1 : f.metrics.WriteThroughput.IdleDuration += idleDuration
732 : }
733 : }
734 :
735 : func (w *LogWriter) flushPending(
736 : data []byte, pending []*block, snap pendingSyncsSnapshot,
737 1 : ) (synced bool, syncLatency time.Duration, bytesWritten int64, err error) {
738 1 : defer func() {
739 1 : // Translate panics into errors. The errors will cause flushLoop to shut
740 1 : // down, but allows us to do so in a controlled way and avoid swallowing
741 1 : // the stack that created the panic if panic'ing itself hits a panic
742 1 : // (e.g. unlock of unlocked mutex).
743 1 : if r := recover(); r != nil {
744 0 : err = errors.Newf("%v", r)
745 0 : }
746 : }()
747 :
748 1 : for _, b := range pending {
749 0 : bytesWritten += blockSize - int64(b.flushed)
750 0 : if err = w.flushBlock(b); err != nil {
751 0 : break
752 : }
753 : }
754 1 : if n := len(data); err == nil && n > 0 {
755 1 : bytesWritten += int64(n)
756 1 : _, err = w.w.Write(data)
757 1 : }
758 :
759 1 : synced = !snap.empty()
760 1 : if synced {
761 1 : if err == nil && w.s != nil {
762 1 : syncLatency, err = w.syncWithLatency()
763 1 : } else {
764 0 : synced = false
765 0 : }
766 1 : f := &w.flusher
767 1 : if popErr := f.pendingSyncs.pop(snap, err); popErr != nil {
768 0 : return synced, syncLatency, bytesWritten, firstError(err, popErr)
769 0 : }
770 : }
771 :
772 1 : return synced, syncLatency, bytesWritten, err
773 : }
774 :
775 1 : func (w *LogWriter) syncWithLatency() (time.Duration, error) {
776 1 : start := time.Now()
777 1 : err := w.s.Sync()
778 1 : syncLatency := time.Since(start)
779 1 : return syncLatency, err
780 1 : }
781 :
782 0 : func (w *LogWriter) flushBlock(b *block) error {
783 0 : if _, err := w.w.Write(b.buf[b.flushed:]); err != nil {
784 0 : return err
785 0 : }
786 0 : b.written.Store(0)
787 0 : b.flushed = 0
788 0 : w.free.Lock()
789 0 : w.free.blocks = append(w.free.blocks, b)
790 0 : w.free.Unlock()
791 0 : return nil
792 : }
793 :
794 : // queueBlock queues the current block for writing to the underlying writer,
795 : // allocates a new block and reserves space for the next header.
796 0 : func (w *LogWriter) queueBlock() {
797 0 : // Allocate a new block, blocking until one is available. We do this first
798 0 : // because w.block is protected by w.flusher.Mutex.
799 0 : w.free.Lock()
800 0 : if len(w.free.blocks) == 0 {
801 0 : w.free.blocks = append(w.free.blocks, blockPool.Get().(*block))
802 0 : }
803 0 : nextBlock := w.free.blocks[len(w.free.blocks)-1]
804 0 : w.free.blocks = w.free.blocks[:len(w.free.blocks)-1]
805 0 : w.free.Unlock()
806 0 :
807 0 : f := &w.flusher
808 0 : f.Lock()
809 0 : f.pending = append(f.pending, w.block)
810 0 : w.block = nextBlock
811 0 : f.ready.Signal()
812 0 : w.err = w.flusher.err
813 0 : f.Unlock()
814 0 :
815 0 : w.blockNum++
816 : }
817 :
818 : // Close flushes and syncs any unwritten data and closes the writer.
819 : // Where required, external synchronisation is provided by commitPipeline.mu.
820 1 : func (w *LogWriter) Close() error {
821 1 : return w.closeInternal(PendingSyncIndex{Index: NoSyncIndex})
822 1 : }
823 :
824 : // CloseWithLastQueuedRecord is like Close, but optionally accepts a
825 : // lastQueuedRecord, that the caller will be notified about when synced.
826 1 : func (w *LogWriter) CloseWithLastQueuedRecord(lastQueuedRecord PendingSyncIndex) error {
827 1 : return w.closeInternal(lastQueuedRecord)
828 1 : }
829 :
830 1 : func (w *LogWriter) closeInternal(lastQueuedRecord PendingSyncIndex) error {
831 1 : f := &w.flusher
832 1 :
833 1 : // Emit an EOF trailer signifying the end of this log. This helps readers
834 1 : // differentiate between a corrupted entry in the middle of a log from
835 1 : // garbage at the tail from a recycled log file.
836 1 : w.emitEOFTrailer()
837 1 :
838 1 : // Signal the flush loop to close.
839 1 : f.Lock()
840 1 : f.close = true
841 1 : f.ready.Signal()
842 1 : f.Unlock()
843 1 :
844 1 : // Wait for the flush loop to close. The flush loop will not close until all
845 1 : // pending data has been written or an error occurs.
846 1 : <-f.closed
847 1 :
848 1 : // Sync any flushed data to disk. NB: flushLoop will sync after flushing the
849 1 : // last buffered data only if it was requested via syncQ, so we need to sync
850 1 : // here to ensure that all the data is synced.
851 1 : err := w.flusher.err
852 1 : var syncLatency time.Duration
853 1 : if err == nil && w.s != nil {
854 1 : syncLatency, err = w.syncWithLatency()
855 1 : }
856 1 : f.Lock()
857 1 : if err == nil && f.fsyncLatency != nil {
858 1 : f.fsyncLatency.Observe(float64(syncLatency))
859 1 : }
860 1 : free := w.free.blocks
861 1 : f.Unlock()
862 1 :
863 1 : // NB: the caller of closeInternal may not care about a non-nil cerr below
864 1 : // if all queued writes have been successfully written and synced.
865 1 : if lastQueuedRecord.Index != NoSyncIndex {
866 1 : w.pendingSyncsBackingIndex.externalSyncQueueCallback(lastQueuedRecord, err)
867 1 : }
868 1 : if w.c != nil {
869 1 : cerr := w.c.Close()
870 1 : w.c = nil
871 1 : err = firstError(err, cerr)
872 1 : }
873 :
874 1 : for _, b := range free {
875 0 : b.flushed = 0
876 0 : b.written.Store(0)
877 0 : blockPool.Put(b)
878 0 : }
879 :
880 1 : w.err = errClosedWriter
881 1 : return err
882 : }
883 :
884 : // firstError returns the first non-nil error of err0 and err1, or nil if both
885 : // are nil.
886 1 : func firstError(err0, err1 error) error {
887 1 : if err0 != nil {
888 0 : return err0
889 0 : }
890 1 : return err1
891 : }
892 :
893 : // WriteRecord writes a complete record. Returns the offset just past the end
894 : // of the record.
895 : // External synchronisation provided by commitPipeline.mu.
896 0 : func (w *LogWriter) WriteRecord(p []byte) (int64, error) {
897 0 : logSize, err := w.SyncRecord(p, nil, nil)
898 0 : return logSize, err
899 0 : }
900 :
901 : // SyncRecord writes a complete record. If wg != nil the record will be
902 : // asynchronously persisted to the underlying writer and done will be called on
903 : // the wait group upon completion. Returns the offset just past the end of the
904 : // record.
905 : // External synchronisation provided by commitPipeline.mu.
906 : func (w *LogWriter) SyncRecord(
907 : p []byte, wg *sync.WaitGroup, err *error,
908 1 : ) (logSize int64, err2 error) {
909 1 : w.pendingSyncForSyncQueueBacking = pendingSyncForSyncQueue{
910 1 : wg: wg,
911 1 : err: err,
912 1 : }
913 1 : return w.SyncRecordGeneralized(p, &w.pendingSyncForSyncQueueBacking)
914 1 : }
915 :
916 : // SyncRecordGeneralized is a version of SyncRecord that accepts a
917 : // PendingSync.
918 1 : func (w *LogWriter) SyncRecordGeneralized(p []byte, ps PendingSync) (logSize int64, err2 error) {
919 1 : if w.err != nil {
920 0 : return -1, w.err
921 0 : }
922 :
923 : // The `i == 0` condition ensures we handle empty records. Such records can
924 : // possibly be generated for VersionEdits stored in the MANIFEST. While the
925 : // MANIFEST is currently written using Writer, it is good to support the same
926 : // semantics with LogWriter.
927 1 : for i := 0; i == 0 || len(p) > 0; i++ {
928 1 : p = w.emitFragment(i, p)
929 1 : }
930 :
931 1 : if ps.syncRequested() {
932 1 : // If we've been asked to persist the record, add the WaitGroup to the sync
933 1 : // queue and signal the flushLoop. Note that flushLoop will write partial
934 1 : // blocks to the file if syncing has been requested. The contract is that
935 1 : // any record written to the LogWriter to this point will be flushed to the
936 1 : // OS and synced to disk.
937 1 : f := &w.flusher
938 1 : f.pendingSyncs.push(ps)
939 1 : f.ready.Signal()
940 1 : }
941 :
942 1 : offset := w.blockNum*blockSize + int64(w.block.written.Load())
943 1 : // Note that we don't return w.err here as a concurrent call to Close would
944 1 : // race with our read. That's ok because the only error we could be seeing is
945 1 : // one to syncing for which the caller can receive notification of by passing
946 1 : // in a non-nil err argument.
947 1 : return offset, nil
948 : }
949 :
950 : // Size returns the current size of the file.
951 : // External synchronisation provided by commitPipeline.mu.
952 1 : func (w *LogWriter) Size() int64 {
953 1 : return w.blockNum*blockSize + int64(w.block.written.Load())
954 1 : }
955 :
956 1 : func (w *LogWriter) emitEOFTrailer() {
957 1 : // Write a recyclable chunk header with a different log number. Readers
958 1 : // will treat the header as EOF when the log number does not match.
959 1 : b := w.block
960 1 : i := b.written.Load()
961 1 : binary.LittleEndian.PutUint32(b.buf[i+0:i+4], 0) // CRC
962 1 : binary.LittleEndian.PutUint16(b.buf[i+4:i+6], 0) // Size
963 1 : b.buf[i+6] = recyclableFullChunkType
964 1 : binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum+1) // Log number
965 1 : b.written.Store(i + int32(recyclableHeaderSize))
966 1 : }
967 :
968 1 : func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte) {
969 1 : b := w.block
970 1 : i := b.written.Load()
971 1 : first := n == 0
972 1 : last := blockSize-i-recyclableHeaderSize >= int32(len(p))
973 1 :
974 1 : if last {
975 1 : if first {
976 1 : b.buf[i+6] = recyclableFullChunkType
977 1 : } else {
978 0 : b.buf[i+6] = recyclableLastChunkType
979 0 : }
980 0 : } else {
981 0 : if first {
982 0 : b.buf[i+6] = recyclableFirstChunkType
983 0 : } else {
984 0 : b.buf[i+6] = recyclableMiddleChunkType
985 0 : }
986 : }
987 :
988 1 : binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum)
989 1 :
990 1 : r := copy(b.buf[i+recyclableHeaderSize:], p)
991 1 : j := i + int32(recyclableHeaderSize+r)
992 1 : binary.LittleEndian.PutUint32(b.buf[i+0:i+4], crc.New(b.buf[i+6:j]).Value())
993 1 : binary.LittleEndian.PutUint16(b.buf[i+4:i+6], uint16(r))
994 1 : b.written.Store(j)
995 1 :
996 1 : if blockSize-b.written.Load() < recyclableHeaderSize {
997 0 : // There is no room for another fragment in the block, so fill the
998 0 : // remaining bytes with zeros and queue the block for flushing.
999 0 : clear(b.buf[b.written.Load():])
1000 0 : w.queueBlock()
1001 0 : }
1002 1 : return p[r:]
1003 : }
1004 :
1005 : // Metrics must typically be called after Close, since the callee will no
1006 : // longer modify the returned LogWriterMetrics. It is also current if there is
1007 : // nothing left to flush in the flush loop, but that is an implementation
1008 : // detail that callers should not rely on.
1009 1 : func (w *LogWriter) Metrics() LogWriterMetrics {
1010 1 : w.flusher.Lock()
1011 1 : defer w.flusher.Unlock()
1012 1 : m := *w.flusher.metrics
1013 1 : return m
1014 1 : }
1015 :
1016 : // LogWriterMetrics contains misc metrics for the log writer.
1017 : type LogWriterMetrics struct {
1018 : WriteThroughput base.ThroughputMetric
1019 : PendingBufferLen base.GaugeSampleMetric
1020 : SyncQueueLen base.GaugeSampleMetric
1021 : }
1022 :
1023 : // Merge merges metrics from x. Requires that x is non-nil.
1024 1 : func (m *LogWriterMetrics) Merge(x *LogWriterMetrics) error {
1025 1 : m.WriteThroughput.Merge(x.WriteThroughput)
1026 1 : m.PendingBufferLen.Merge(x.PendingBufferLen)
1027 1 : m.SyncQueueLen.Merge(x.SyncQueueLen)
1028 1 : return nil
1029 1 : }
|