Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package record
6 :
7 : import (
8 : "context"
9 : "encoding/binary"
10 : "io"
11 : "runtime/pprof"
12 : "sync"
13 : "sync/atomic"
14 : "time"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/crc"
19 : "github.com/prometheus/client_golang/prometheus"
20 : )
21 :
22 : var walSyncLabels = pprof.Labels("pebble", "wal-sync")
23 : var errClosedWriter = errors.New("pebble/record: closed LogWriter")
24 :
25 : type block struct {
26 : // buf[:written] has already been filled with fragments. Updated atomically.
27 : written atomic.Int32
28 : // buf[:flushed] has already been flushed to w.
29 : flushed int32
30 : buf [blockSize]byte
31 : }
32 :
33 : type flusher interface {
34 : Flush() error
35 : }
36 :
37 : type syncer interface {
38 : Sync() error
39 : }
40 :
41 : const (
42 : syncConcurrencyBits = 12
43 :
44 : // SyncConcurrency is the maximum number of concurrent sync operations that
45 : // can be performed. Note that a sync operation is initiated either by a call
46 : // to SyncRecord or by a call to Close. Exported as this value also limits
47 : // the commit concurrency in commitPipeline.
48 : SyncConcurrency = 1 << syncConcurrencyBits
49 : )
50 :
51 : type syncSlot struct {
52 : wg *sync.WaitGroup
53 : err *error
54 : }
55 :
56 : // syncQueue is a lock-free fixed-size single-producer, single-consumer
57 : // queue. The single-producer can push to the head, and the single-consumer can
58 : // pop multiple values from the tail. Popping calls Done() on each of the
59 : // available *sync.WaitGroup elements.
60 : type syncQueue struct {
61 : // headTail packs together a 32-bit head index and a 32-bit tail index. Both
62 : // are indexes into slots modulo len(slots)-1.
63 : //
64 : // tail = index of oldest data in queue
65 : // head = index of next slot to fill
66 : //
67 : // Slots in the range [tail, head) are owned by consumers. A consumer
68 : // continues to own a slot outside this range until it nils the slot, at
69 : // which point ownership passes to the producer.
70 : //
71 : // The head index is stored in the most-significant bits so that we can
72 : // atomically add to it and the overflow is harmless.
73 : headTail atomic.Uint64
74 :
75 : // slots is a ring buffer of values stored in this queue. The size must be a
76 : // power of 2. A slot is in use until the tail index has moved beyond it.
77 : slots [SyncConcurrency]syncSlot
78 :
79 : // blocked is an atomic boolean which indicates whether syncing is currently
80 : // blocked or can proceed. It is used by the implementation of
81 : // min-sync-interval to block syncing until the min interval has passed.
82 : blocked atomic.Bool
83 : }
84 :
85 : const dequeueBits = 32
86 :
87 2 : func (q *syncQueue) unpack(ptrs uint64) (head, tail uint32) {
88 2 : const mask = 1<<dequeueBits - 1
89 2 : head = uint32((ptrs >> dequeueBits) & mask)
90 2 : tail = uint32(ptrs & mask)
91 2 : return
92 2 : }
93 :
94 2 : func (q *syncQueue) push(wg *sync.WaitGroup, err *error) {
95 2 : ptrs := q.headTail.Load()
96 2 : head, tail := q.unpack(ptrs)
97 2 : if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
98 0 : panic("pebble: queue is full")
99 : }
100 :
101 2 : slot := &q.slots[head&uint32(len(q.slots)-1)]
102 2 : slot.wg = wg
103 2 : slot.err = err
104 2 :
105 2 : // Increment head. This passes ownership of slot to dequeue and acts as a
106 2 : // store barrier for writing the slot.
107 2 : q.headTail.Add(1 << dequeueBits)
108 : }
109 :
110 1 : func (q *syncQueue) setBlocked() {
111 1 : q.blocked.Store(true)
112 1 : }
113 :
114 2 : func (q *syncQueue) clearBlocked() {
115 2 : q.blocked.Store(false)
116 2 : }
117 :
118 2 : func (q *syncQueue) empty() bool {
119 2 : head, tail, _ := q.load()
120 2 : return head == tail
121 2 : }
122 :
123 : // load returns the head, tail of the queue for what should be synced to the
124 : // caller. It can return a head, tail of zero if syncing is blocked due to
125 : // min-sync-interval. It additionally returns the real length of this queue,
126 : // regardless of whether syncing is blocked.
127 2 : func (q *syncQueue) load() (head, tail, realLength uint32) {
128 2 : ptrs := q.headTail.Load()
129 2 : head, tail = q.unpack(ptrs)
130 2 : realLength = head - tail
131 2 : if q.blocked.Load() {
132 1 : return 0, 0, realLength
133 1 : }
134 2 : return head, tail, realLength
135 : }
136 :
137 : // REQUIRES: queueSemChan is non-nil.
138 2 : func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{}) error {
139 2 : if tail == head {
140 1 : // Queue is empty.
141 1 : return nil
142 1 : }
143 :
144 2 : for ; tail != head; tail++ {
145 2 : slot := &q.slots[tail&uint32(len(q.slots)-1)]
146 2 : wg := slot.wg
147 2 : if wg == nil {
148 0 : return errors.Errorf("nil waiter at %d", errors.Safe(tail&uint32(len(q.slots)-1)))
149 0 : }
150 2 : *slot.err = err
151 2 : slot.wg = nil
152 2 : slot.err = nil
153 2 : // We need to bump the tail count before releasing the queueSemChan
154 2 : // semaphore as releasing the semaphore can cause a blocked goroutine to
155 2 : // acquire the semaphore and enqueue before we've "freed" space in the
156 2 : // queue.
157 2 : q.headTail.Add(1)
158 2 : wg.Done()
159 2 : // Is always non-nil in production, unless using wal package for WAL
160 2 : // failover.
161 2 : if queueSemChan != nil {
162 2 : <-queueSemChan
163 2 : }
164 : }
165 :
166 2 : return nil
167 : }
168 :
169 : // pendingSyncs abstracts out the handling of pending sync requests. In
170 : // standalone mode the implementation is a thin wrapper around syncQueue. In
171 : // the mode where the LogWriter can be subject to failover, there is no queue
172 : // kept in the LogWriter and the signaling to those waiting for sync is
173 : // handled in the wal package.
174 : //
175 : // To avoid heap allocations due to the use of this interface, the parameters
176 : // and return values follow some strict rules:
177 : // - The PendingSync parameter can be reused by the caller after push returns.
178 : // The implementation should be a pointer backed by a struct that is already
179 : // heap allocated, which the caller can reuse for the next push call.
180 : // - The pendingSyncSnapshot return value must be backed by the pendingSyncs
181 : // implementation, so calling snapshotForPop again will cause the previous
182 : // snapshot to be overwritten.
183 : type pendingSyncs interface {
184 : push(PendingSync)
185 : setBlocked()
186 : clearBlocked()
187 : empty() bool
188 : snapshotForPop() pendingSyncsSnapshot
189 : pop(snap pendingSyncsSnapshot, err error) error
190 : }
191 :
192 : type pendingSyncsSnapshot interface {
193 : empty() bool
194 : }
195 :
196 : // PendingSync abstracts the sync specification for a record queued on the
197 : // LogWriter. The only implementations are provided in this package since
198 : // syncRequested is not exported.
199 : type PendingSync interface {
200 : syncRequested() bool
201 : }
202 :
203 : // The implementation of pendingSyncs in standalone mode.
204 : type pendingSyncsWithSyncQueue struct {
205 : syncQueue
206 : syncQueueLen *base.GaugeSampleMetric
207 : snapshotBacking syncQueueSnapshot
208 : // See the comment for LogWriterConfig.QueueSemChan.
209 : queueSemChan chan struct{}
210 : }
211 :
212 : var _ pendingSyncs = &pendingSyncsWithSyncQueue{}
213 :
214 2 : func (q *pendingSyncsWithSyncQueue) push(ps PendingSync) {
215 2 : ps2 := ps.(*pendingSyncForSyncQueue)
216 2 : q.syncQueue.push(ps2.wg, ps2.err)
217 2 : }
218 :
219 2 : func (q *pendingSyncsWithSyncQueue) snapshotForPop() pendingSyncsSnapshot {
220 2 : head, tail, realLength := q.syncQueue.load()
221 2 : q.snapshotBacking = syncQueueSnapshot{
222 2 : head: head,
223 2 : tail: tail,
224 2 : }
225 2 : q.syncQueueLen.AddSample(int64(realLength))
226 2 : return &q.snapshotBacking
227 2 : }
228 :
229 2 : func (q *pendingSyncsWithSyncQueue) pop(snap pendingSyncsSnapshot, err error) error {
230 2 : s := snap.(*syncQueueSnapshot)
231 2 : return q.syncQueue.pop(s.head, s.tail, err, q.queueSemChan)
232 2 : }
233 :
234 : // The implementation of pendingSyncsSnapshot in standalone mode.
235 : type syncQueueSnapshot struct {
236 : head, tail uint32
237 : }
238 :
239 2 : func (s *syncQueueSnapshot) empty() bool {
240 2 : return s.head == s.tail
241 2 : }
242 :
243 : // The implementation of pendingSync in standalone mode.
244 : type pendingSyncForSyncQueue struct {
245 : wg *sync.WaitGroup
246 : err *error
247 : }
248 :
249 2 : func (ps *pendingSyncForSyncQueue) syncRequested() bool {
250 2 : return ps.wg != nil
251 2 : }
252 :
253 : // The implementation of pendingSyncs in failover mode.
254 : type pendingSyncsWithHighestSyncIndex struct {
255 : // The highest "index" queued that is requesting a sync. Initialized
256 : // to NoSyncIndex, and reset to NoSyncIndex after the sync.
257 : index atomic.Int64
258 : snapshotBacking PendingSyncIndex
259 : // blocked is an atomic boolean which indicates whether syncing is currently
260 : // blocked or can proceed. It is used by the implementation of
261 : // min-sync-interval to block syncing until the min interval has passed.
262 : blocked atomic.Bool
263 : externalSyncQueueCallback ExternalSyncQueueCallback
264 : }
265 :
266 : // NoSyncIndex is the value of PendingSyncIndex when a sync is not requested.
267 : const NoSyncIndex = -1
268 :
269 : func (si *pendingSyncsWithHighestSyncIndex) init(
270 : externalSyncQueueCallback ExternalSyncQueueCallback,
271 1 : ) {
272 1 : si.index.Store(NoSyncIndex)
273 1 : si.externalSyncQueueCallback = externalSyncQueueCallback
274 1 : }
275 :
276 1 : func (si *pendingSyncsWithHighestSyncIndex) push(ps PendingSync) {
277 1 : ps2 := ps.(*PendingSyncIndex)
278 1 : si.index.Store(ps2.Index)
279 1 : }
280 :
281 1 : func (si *pendingSyncsWithHighestSyncIndex) setBlocked() {
282 1 : si.blocked.Store(true)
283 1 : }
284 :
285 1 : func (si *pendingSyncsWithHighestSyncIndex) clearBlocked() {
286 1 : si.blocked.Store(false)
287 1 : }
288 :
289 1 : func (si *pendingSyncsWithHighestSyncIndex) empty() bool {
290 1 : return si.load() == NoSyncIndex
291 1 : }
292 :
293 1 : func (si *pendingSyncsWithHighestSyncIndex) snapshotForPop() pendingSyncsSnapshot {
294 1 : si.snapshotBacking = PendingSyncIndex{Index: si.load()}
295 1 : return &si.snapshotBacking
296 1 : }
297 :
298 1 : func (si *pendingSyncsWithHighestSyncIndex) load() int64 {
299 1 : index := si.index.Load()
300 1 : if index != NoSyncIndex && si.blocked.Load() {
301 1 : index = NoSyncIndex
302 1 : }
303 1 : return index
304 : }
305 :
306 1 : func (si *pendingSyncsWithHighestSyncIndex) pop(snap pendingSyncsSnapshot, err error) error {
307 1 : index := snap.(*PendingSyncIndex)
308 1 : if index.Index == NoSyncIndex {
309 1 : return nil
310 1 : }
311 : // Set to NoSyncIndex if a higher index has not queued.
312 1 : si.index.CompareAndSwap(index.Index, NoSyncIndex)
313 1 : si.externalSyncQueueCallback(*index, err)
314 1 : return nil
315 : }
316 :
317 : // PendingSyncIndex implements both pendingSyncsSnapshot and PendingSync.
318 : type PendingSyncIndex struct {
319 : // Index is some state meaningful to the user of LogWriter. The LogWriter
320 : // itself only examines whether Index is equal to NoSyncIndex.
321 : Index int64
322 : }
323 :
324 1 : func (s *PendingSyncIndex) empty() bool {
325 1 : return s.Index == NoSyncIndex
326 1 : }
327 :
328 1 : func (s *PendingSyncIndex) syncRequested() bool {
329 1 : return s.Index != NoSyncIndex
330 1 : }
331 :
332 : // flusherCond is a specialized condition variable that allows its condition to
333 : // change and readiness be signalled without holding its associated mutex. In
334 : // particular, when a waiter is added to syncQueue atomically, this condition
335 : // variable can be signalled without holding flusher.Mutex.
336 : type flusherCond struct {
337 : mu *sync.Mutex
338 : q pendingSyncs
339 : cond sync.Cond
340 : }
341 :
342 2 : func (c *flusherCond) init(mu *sync.Mutex, q pendingSyncs) {
343 2 : c.mu = mu
344 2 : c.q = q
345 2 : // Yes, this is a bit circular, but that is intentional. flusherCond.cond.L
346 2 : // points flusherCond so that when cond.L.Unlock is called flusherCond.Unlock
347 2 : // will be called and we can check the !syncQueue.empty() condition.
348 2 : c.cond.L = c
349 2 : }
350 :
351 2 : func (c *flusherCond) Signal() {
352 2 : // Pass-through to the cond var.
353 2 : c.cond.Signal()
354 2 : }
355 :
356 2 : func (c *flusherCond) Wait() {
357 2 : // Pass-through to the cond var. Note that internally the cond var implements
358 2 : // Wait as:
359 2 : //
360 2 : // t := notifyListAdd()
361 2 : // L.Unlock()
362 2 : // notifyListWait(t)
363 2 : // L.Lock()
364 2 : //
365 2 : // We've configured the cond var to call flusherReady.Unlock() which allows
366 2 : // us to check the !syncQueue.empty() condition without a danger of missing a
367 2 : // notification. Any call to flusherReady.Signal() after notifyListAdd() is
368 2 : // called will cause the subsequent notifyListWait() to return immediately.
369 2 : c.cond.Wait()
370 2 : }
371 :
372 2 : func (c *flusherCond) Lock() {
373 2 : c.mu.Lock()
374 2 : }
375 :
376 2 : func (c *flusherCond) Unlock() {
377 2 : c.mu.Unlock()
378 2 : if !c.q.empty() {
379 2 : // If the current goroutine is about to block on sync.Cond.Wait, this call
380 2 : // to Signal will prevent that. The comment in Wait above explains a bit
381 2 : // about what is going on here, but it is worth reiterating:
382 2 : //
383 2 : // flusherCond.Wait()
384 2 : // sync.Cond.Wait()
385 2 : // t := notifyListAdd()
386 2 : // flusherCond.Unlock() <-- we are here
387 2 : // notifyListWait(t)
388 2 : // flusherCond.Lock()
389 2 : //
390 2 : // The call to Signal here results in:
391 2 : //
392 2 : // sync.Cond.Signal()
393 2 : // notifyListNotifyOne()
394 2 : //
395 2 : // The call to notifyListNotifyOne() will prevent the call to
396 2 : // notifyListWait(t) from blocking.
397 2 : c.cond.Signal()
398 2 : }
399 : }
400 :
401 : type durationFunc func() time.Duration
402 :
403 : // syncTimer is an interface for timers, modeled on the closure callback mode
404 : // of time.Timer. See time.AfterFunc and LogWriter.afterFunc. syncTimer is used
405 : // by tests to mock out the timer functionality used to implement
406 : // min-sync-interval.
407 : type syncTimer interface {
408 : Reset(time.Duration) bool
409 : Stop() bool
410 : }
411 :
412 : // LogWriter writes records to an underlying io.Writer. In order to support WAL
413 : // file reuse, a LogWriter's records are tagged with the WAL's file
414 : // number. When reading a log file a record from a previous incarnation of the
415 : // file will return the error ErrInvalidLogNum.
416 : type LogWriter struct {
417 : // w is the underlying writer.
418 : w io.Writer
419 : // c is w as a closer.
420 : c io.Closer
421 : // s is w as a syncer.
422 : s syncer
423 : // logNum is the low 32-bits of the log's file number.
424 : logNum uint32
425 : // blockNum is the zero based block number for the current block.
426 : blockNum int64
427 : // err is any accumulated error. It originates in flusher.err, and is
428 : // updated to reflect flusher.err when a block is full and getting enqueued.
429 : // Therefore, there is a lag between when flusher.err has a non-nil error,
430 : // and when that non-nil error is reflected in LogWriter.err. On close, it
431 : // is set to errClosedWriter to inform accidental future calls to
432 : // SyncRecord*.
433 : err error
434 : // block is the current block being written. Protected by flusher.Mutex.
435 : block *block
436 : free struct {
437 : sync.Mutex
438 : blocks []*block
439 : }
440 :
441 : flusher struct {
442 : sync.Mutex
443 : // Flusher ready is a condition variable that is signalled when there are
444 : // blocks to flush, syncing has been requested, or the LogWriter has been
445 : // closed. For signalling of a sync, it is safe to call without holding
446 : // flusher.Mutex.
447 : ready flusherCond
448 : // Set to true when the flush loop should be closed.
449 : close bool
450 : // Closed when the flush loop has terminated.
451 : closed chan struct{}
452 : // Accumulated flush error.
453 : err error
454 : // minSyncInterval is the minimum duration between syncs.
455 : minSyncInterval durationFunc
456 : fsyncLatency prometheus.Histogram
457 : pending []*block
458 : // Pushing and popping from pendingSyncs does not require flusher mutex to
459 : // be held.
460 : pendingSyncs pendingSyncs
461 : metrics *LogWriterMetrics
462 : }
463 :
464 : // afterFunc is a hook to allow tests to mock out the timer functionality
465 : // used for min-sync-interval. In normal operation this points to
466 : // time.AfterFunc.
467 : afterFunc func(d time.Duration, f func()) syncTimer
468 :
469 : // Backing for both pendingSyncs implementations.
470 : pendingSyncsBackingQ pendingSyncsWithSyncQueue
471 : pendingSyncsBackingIndex pendingSyncsWithHighestSyncIndex
472 :
473 : pendingSyncForSyncQueueBacking pendingSyncForSyncQueue
474 : }
475 :
476 : // LogWriterConfig is a struct used for configuring new LogWriters
477 : type LogWriterConfig struct {
478 : WALMinSyncInterval durationFunc
479 : WALFsyncLatency prometheus.Histogram
480 : // QueueSemChan is an optional channel to pop from when popping from
481 : // LogWriter.flusher.syncQueue. It functions as a semaphore that prevents
482 : // the syncQueue from overflowing (which will cause a panic). All production
483 : // code ensures this is non-nil.
484 : QueueSemChan chan struct{}
485 :
486 : // ExternalSyncQueueCallback is set to non-nil when the LogWriter is used
487 : // as part of a WAL implementation that can failover between LogWriters.
488 : //
489 : // In this case, QueueSemChan is always nil, and SyncRecordGeneralized must
490 : // be used with a PendingSync parameter that is implemented by
491 : // PendingSyncIndex. When an index is synced (which implies all earlier
492 : // indices are also synced), this callback is invoked. The caller must not
493 : // hold any mutex when invoking this callback, since the lock ordering
494 : // requirement in this case is that any higher layer locks (in the wal
495 : // package) precede the lower layer locks (in the record package). These
496 : // callbacks are serialized since they are invoked from the flushLoop.
497 : ExternalSyncQueueCallback ExternalSyncQueueCallback
498 : }
499 :
500 : // ExternalSyncQueueCallback is to be run when a PendingSync has been
501 : // processed, either successfully or with an error.
502 : type ExternalSyncQueueCallback func(doneSync PendingSyncIndex, err error)
503 :
504 : // initialAllocatedBlocksCap is the initial capacity of the various slices
505 : // intended to hold LogWriter blocks. The LogWriter may allocate more blocks
506 : // than this threshold allows.
507 : const initialAllocatedBlocksCap = 32
508 :
509 : // blockPool pools *blocks to avoid allocations. Blocks are only added to the
510 : // Pool when a LogWriter is closed. Before that, free blocks are maintained
511 : // within a LogWriter's own internal free list `w.free.blocks`.
512 : var blockPool = sync.Pool{
513 2 : New: func() any { return &block{} },
514 : }
515 :
516 : // NewLogWriter returns a new LogWriter.
517 : //
518 : // The io.Writer may also be used as an io.Closer and syncer. No other methods
519 : // will be called on the writer.
520 : func NewLogWriter(
521 : w io.Writer, logNum base.DiskFileNum, logWriterConfig LogWriterConfig,
522 2 : ) *LogWriter {
523 2 : c, _ := w.(io.Closer)
524 2 : s, _ := w.(syncer)
525 2 : r := &LogWriter{
526 2 : w: w,
527 2 : c: c,
528 2 : s: s,
529 2 : // NB: we truncate the 64-bit log number to 32-bits. This is ok because a)
530 2 : // we are very unlikely to reach a file number of 4 billion and b) the log
531 2 : // number is used as a validation check and using only the low 32-bits is
532 2 : // sufficient for that purpose.
533 2 : logNum: uint32(logNum),
534 2 : afterFunc: func(d time.Duration, f func()) syncTimer {
535 0 : return time.AfterFunc(d, f)
536 0 : },
537 : }
538 2 : m := &LogWriterMetrics{}
539 2 : if logWriterConfig.ExternalSyncQueueCallback != nil {
540 1 : r.pendingSyncsBackingIndex.init(logWriterConfig.ExternalSyncQueueCallback)
541 1 : r.flusher.pendingSyncs = &r.pendingSyncsBackingIndex
542 2 : } else {
543 2 : r.pendingSyncsBackingQ = pendingSyncsWithSyncQueue{
544 2 : syncQueueLen: &m.SyncQueueLen,
545 2 : queueSemChan: logWriterConfig.QueueSemChan,
546 2 : }
547 2 : r.flusher.pendingSyncs = &r.pendingSyncsBackingQ
548 2 : }
549 :
550 2 : r.free.blocks = make([]*block, 0, initialAllocatedBlocksCap)
551 2 : r.block = blockPool.Get().(*block)
552 2 : r.flusher.ready.init(&r.flusher.Mutex, r.flusher.pendingSyncs)
553 2 : r.flusher.closed = make(chan struct{})
554 2 : r.flusher.pending = make([]*block, 0, cap(r.free.blocks))
555 2 : r.flusher.metrics = m
556 2 :
557 2 : f := &r.flusher
558 2 : f.minSyncInterval = logWriterConfig.WALMinSyncInterval
559 2 : f.fsyncLatency = logWriterConfig.WALFsyncLatency
560 2 :
561 2 : go func() {
562 2 : pprof.Do(context.Background(), walSyncLabels, r.flushLoop)
563 2 : }()
564 2 : return r
565 : }
566 :
567 2 : func (w *LogWriter) flushLoop(context.Context) {
568 2 : f := &w.flusher
569 2 : f.Lock()
570 2 :
571 2 : // Initialize idleStartTime to when the loop starts.
572 2 : idleStartTime := time.Now()
573 2 : var syncTimer syncTimer
574 2 : defer func() {
575 2 : // Capture the idle duration between the last piece of work and when the
576 2 : // loop terminated.
577 2 : f.metrics.WriteThroughput.IdleDuration += time.Since(idleStartTime)
578 2 : if syncTimer != nil {
579 1 : syncTimer.Stop()
580 1 : }
581 2 : close(f.closed)
582 2 : f.Unlock()
583 : }()
584 :
585 : // The flush loop performs flushing of full and partial data blocks to the
586 : // underlying writer (LogWriter.w), syncing of the writer, and notification
587 : // to sync requests that they have completed.
588 : //
589 : // - flusher.ready is a condition variable that is signalled when there is
590 : // work to do. Full blocks are contained in flusher.pending. The current
591 : // partial block is in LogWriter.block. And sync operations are held in
592 : // flusher.syncQ.
593 : //
594 : // - The decision to sync is determined by whether there are any sync
595 : // requests present in flusher.syncQ and whether enough time has elapsed
596 : // since the last sync. If not enough time has elapsed since the last sync,
597 : // flusher.syncQ.blocked will be set to 1. If syncing is blocked,
598 : // syncQueue.empty() will return true and syncQueue.load() will return 0,0
599 : // (i.e. an empty list).
600 : //
601 : // - flusher.syncQ.blocked is cleared by a timer that is initialized when
602 : // blocked is set to 1. When blocked is 1, no syncing will take place, but
603 : // flushing will continue to be performed. The on/off toggle for syncing
604 : // does not need to be carefully synchronized with the rest of processing
605 : // -- all we need to ensure is that after any transition to blocked=1 there
606 : // is eventually a transition to blocked=0. syncTimer performs this
607 : // transition. Note that any change to min-sync-interval will not take
608 : // effect until the previous timer elapses.
609 : //
610 : // - Picking up the syncing work to perform requires coordination with
611 : // picking up the flushing work. Specifically, flushing work is queued
612 : // before syncing work. The guarantee of this code is that when a sync is
613 : // requested, any previously queued flush work will be synced. This
614 : // motivates reading the syncing work (f.syncQ.load()) before picking up
615 : // the flush work (w.block.written.Load()).
616 :
617 : // The list of full blocks that need to be written. This is copied from
618 : // f.pending on every loop iteration, though the number of elements is
619 : // usually small (most frequently 1). In the case of the WAL LogWriter, the
620 : // number of blocks is bounded by the size of the WAL's corresponding
621 : // memtable (MemtableSize/BlockSize). With the default 64 MiB memtables,
622 : // this works out to at most 2048 elements if the entirety of the memtable's
623 : // contents are queued.
624 2 : pending := make([]*block, 0, cap(f.pending))
625 2 : for {
626 2 : for {
627 2 : // Grab the portion of the current block that requires flushing. Note that
628 2 : // the current block can be added to the pending blocks list after we release
629 2 : // the flusher lock, but it won't be part of pending.
630 2 : written := w.block.written.Load()
631 2 : if len(f.pending) > 0 || written > w.block.flushed || !f.pendingSyncs.empty() {
632 2 : break
633 : }
634 2 : if f.close {
635 2 : // If the writer is closed, pretend the sync timer fired immediately so
636 2 : // that we can process any queued sync requests.
637 2 : f.pendingSyncs.clearBlocked()
638 2 : if !f.pendingSyncs.empty() {
639 1 : break
640 : }
641 2 : return
642 : }
643 2 : f.ready.Wait()
644 2 : continue
645 : }
646 : // Found work to do, so no longer idle.
647 : //
648 : // NB: it is safe to read pending before loading from the syncQ since
649 : // mutations to pending require the w.flusher mutex, which is held here.
650 : // There is no risk that someone will concurrently add to pending, so the
651 : // following sequence, which would pick up a syncQ entry without the
652 : // corresponding data, is impossible:
653 : //
654 : // Thread enqueueing This thread
655 : // 1. read pending
656 : // 2. add block to pending
657 : // 3. add to syncQ
658 : // 4. read syncQ
659 2 : workStartTime := time.Now()
660 2 : idleDuration := workStartTime.Sub(idleStartTime)
661 2 : pending = append(pending[:0], f.pending...)
662 2 : f.pending = f.pending[:0]
663 2 : f.metrics.PendingBufferLen.AddSample(int64(len(pending)))
664 2 :
665 2 : // Grab the list of sync waiters. Note that syncQueue.load() will return
666 2 : // 0,0 while we're waiting for the min-sync-interval to expire. This
667 2 : // allows flushing to proceed even if we're not ready to sync.
668 2 : snap := f.pendingSyncs.snapshotForPop()
669 2 :
670 2 : // Grab the portion of the current block that requires flushing. Note that
671 2 : // the current block can be added to the pending blocks list after we
672 2 : // release the flusher lock, but it won't be part of pending. This has to
673 2 : // be ordered after we get the list of sync waiters from syncQ in order to
674 2 : // prevent a race where a waiter adds itself to syncQ, but this thread
675 2 : // picks up the entry in syncQ and not the buffered data.
676 2 : written := w.block.written.Load()
677 2 : data := w.block.buf[w.block.flushed:written]
678 2 : w.block.flushed = written
679 2 :
680 2 : fErr := f.err
681 2 : f.Unlock()
682 2 : // If flusher has an error, we propagate it to waiters. Note in spite of
683 2 : // error we consume the pending list above to free blocks for writers.
684 2 : if fErr != nil {
685 1 : // NB: pop may invoke ExternalSyncQueueCallback, which is why we have
686 1 : // called f.Unlock() above. We will acquire the lock again below.
687 1 : f.pendingSyncs.pop(snap, fErr)
688 1 : // Update the idleStartTime if work could not be done, so that we don't
689 1 : // include the duration we tried to do work as idle. We don't bother
690 1 : // with the rest of the accounting, which means we will undercount.
691 1 : idleStartTime = time.Now()
692 1 : f.Lock()
693 1 : continue
694 : }
695 2 : synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, snap)
696 2 : f.Lock()
697 2 : if synced && f.fsyncLatency != nil {
698 2 : f.fsyncLatency.Observe(float64(syncLatency))
699 2 : }
700 2 : f.err = err
701 2 : if f.err != nil {
702 1 : f.pendingSyncs.clearBlocked()
703 1 : // Update the idleStartTime if work could not be done, so that we don't
704 1 : // include the duration we tried to do work as idle. We don't bother
705 1 : // with the rest of the accounting, which means we will undercount.
706 1 : idleStartTime = time.Now()
707 1 : continue
708 : }
709 :
710 2 : if synced && f.minSyncInterval != nil {
711 1 : // A sync was performed. Make sure we've waited for the min sync
712 1 : // interval before syncing again.
713 1 : if min := f.minSyncInterval(); min > 0 {
714 1 : f.pendingSyncs.setBlocked()
715 1 : if syncTimer == nil {
716 1 : syncTimer = w.afterFunc(min, func() {
717 1 : f.pendingSyncs.clearBlocked()
718 1 : f.ready.Signal()
719 1 : })
720 1 : } else {
721 1 : syncTimer.Reset(min)
722 1 : }
723 : }
724 : }
725 : // Finished work, and started idling.
726 2 : idleStartTime = time.Now()
727 2 : workDuration := idleStartTime.Sub(workStartTime)
728 2 : f.metrics.WriteThroughput.Bytes += bytesWritten
729 2 : f.metrics.WriteThroughput.WorkDuration += workDuration
730 2 : f.metrics.WriteThroughput.IdleDuration += idleDuration
731 : }
732 : }
733 :
734 : func (w *LogWriter) flushPending(
735 : data []byte, pending []*block, snap pendingSyncsSnapshot,
736 2 : ) (synced bool, syncLatency time.Duration, bytesWritten int64, err error) {
737 2 : defer func() {
738 2 : // Translate panics into errors. The errors will cause flushLoop to shut
739 2 : // down, but allows us to do so in a controlled way and avoid swallowing
740 2 : // the stack that created the panic if panic'ing itself hits a panic
741 2 : // (e.g. unlock of unlocked mutex).
742 2 : if r := recover(); r != nil {
743 0 : err = errors.Newf("%v", r)
744 0 : }
745 : }()
746 :
747 2 : for _, b := range pending {
748 1 : bytesWritten += blockSize - int64(b.flushed)
749 1 : if err = w.flushBlock(b); err != nil {
750 0 : break
751 : }
752 : }
753 2 : if n := len(data); err == nil && n > 0 {
754 2 : bytesWritten += int64(n)
755 2 : _, err = w.w.Write(data)
756 2 : }
757 :
758 2 : synced = !snap.empty()
759 2 : if synced {
760 2 : if err == nil && w.s != nil {
761 2 : syncLatency, err = w.syncWithLatency()
762 2 : } else {
763 1 : synced = false
764 1 : }
765 2 : f := &w.flusher
766 2 : if popErr := f.pendingSyncs.pop(snap, err); popErr != nil {
767 0 : return synced, syncLatency, bytesWritten, firstError(err, popErr)
768 0 : }
769 : }
770 :
771 2 : return synced, syncLatency, bytesWritten, err
772 : }
773 :
774 2 : func (w *LogWriter) syncWithLatency() (time.Duration, error) {
775 2 : start := time.Now()
776 2 : err := w.s.Sync()
777 2 : syncLatency := time.Since(start)
778 2 : return syncLatency, err
779 2 : }
780 :
781 1 : func (w *LogWriter) flushBlock(b *block) error {
782 1 : if _, err := w.w.Write(b.buf[b.flushed:]); err != nil {
783 0 : return err
784 0 : }
785 1 : b.written.Store(0)
786 1 : b.flushed = 0
787 1 : w.free.Lock()
788 1 : w.free.blocks = append(w.free.blocks, b)
789 1 : w.free.Unlock()
790 1 : return nil
791 : }
792 :
793 : // queueBlock queues the current block for writing to the underlying writer,
794 : // allocates a new block and reserves space for the next header.
795 1 : func (w *LogWriter) queueBlock() {
796 1 : // Allocate a new block, blocking until one is available. We do this first
797 1 : // because w.block is protected by w.flusher.Mutex.
798 1 : w.free.Lock()
799 1 : if len(w.free.blocks) == 0 {
800 1 : w.free.blocks = append(w.free.blocks, blockPool.Get().(*block))
801 1 : }
802 1 : nextBlock := w.free.blocks[len(w.free.blocks)-1]
803 1 : w.free.blocks = w.free.blocks[:len(w.free.blocks)-1]
804 1 : w.free.Unlock()
805 1 :
806 1 : f := &w.flusher
807 1 : f.Lock()
808 1 : f.pending = append(f.pending, w.block)
809 1 : w.block = nextBlock
810 1 : f.ready.Signal()
811 1 : w.err = w.flusher.err
812 1 : f.Unlock()
813 1 :
814 1 : w.blockNum++
815 : }
816 :
817 : // Close flushes and syncs any unwritten data and closes the writer.
818 : // Where required, external synchronisation is provided by commitPipeline.mu.
819 2 : func (w *LogWriter) Close() error {
820 2 : return w.closeInternal(PendingSyncIndex{Index: NoSyncIndex})
821 2 : }
822 :
823 : // CloseWithLastQueuedRecord is like Close, but optionally accepts a
824 : // lastQueuedRecord, that the caller will be notified about when synced.
825 1 : func (w *LogWriter) CloseWithLastQueuedRecord(lastQueuedRecord PendingSyncIndex) error {
826 1 : return w.closeInternal(lastQueuedRecord)
827 1 : }
828 :
829 2 : func (w *LogWriter) closeInternal(lastQueuedRecord PendingSyncIndex) error {
830 2 : f := &w.flusher
831 2 :
832 2 : // Emit an EOF trailer signifying the end of this log. This helps readers
833 2 : // differentiate between a corrupted entry in the middle of a log from
834 2 : // garbage at the tail from a recycled log file.
835 2 : w.emitEOFTrailer()
836 2 :
837 2 : // Signal the flush loop to close.
838 2 : f.Lock()
839 2 : f.close = true
840 2 : f.ready.Signal()
841 2 : f.Unlock()
842 2 :
843 2 : // Wait for the flush loop to close. The flush loop will not close until all
844 2 : // pending data has been written or an error occurs.
845 2 : <-f.closed
846 2 :
847 2 : // Sync any flushed data to disk. NB: flushLoop will sync after flushing the
848 2 : // last buffered data only if it was requested via syncQ, so we need to sync
849 2 : // here to ensure that all the data is synced.
850 2 : err := w.flusher.err
851 2 : var syncLatency time.Duration
852 2 : if err == nil && w.s != nil {
853 2 : syncLatency, err = w.syncWithLatency()
854 2 : }
855 2 : f.Lock()
856 2 : if err == nil && f.fsyncLatency != nil {
857 2 : f.fsyncLatency.Observe(float64(syncLatency))
858 2 : }
859 2 : free := w.free.blocks
860 2 : f.Unlock()
861 2 :
862 2 : // NB: the caller of closeInternal may not care about a non-nil cerr below
863 2 : // if all queued writes have been successfully written and synced.
864 2 : if lastQueuedRecord.Index != NoSyncIndex {
865 1 : w.pendingSyncsBackingIndex.externalSyncQueueCallback(lastQueuedRecord, err)
866 1 : }
867 2 : if w.c != nil {
868 2 : cerr := w.c.Close()
869 2 : w.c = nil
870 2 : err = firstError(err, cerr)
871 2 : }
872 :
873 2 : for _, b := range free {
874 1 : b.flushed = 0
875 1 : b.written.Store(0)
876 1 : blockPool.Put(b)
877 1 : }
878 :
879 2 : w.err = errClosedWriter
880 2 : return err
881 : }
882 :
883 : // firstError returns the first non-nil error of err0 and err1, or nil if both
884 : // are nil.
885 2 : func firstError(err0, err1 error) error {
886 2 : if err0 != nil {
887 1 : return err0
888 1 : }
889 2 : return err1
890 : }
891 :
892 : // WriteRecord writes a complete record. Returns the offset just past the end
893 : // of the record.
894 : // External synchronisation provided by commitPipeline.mu.
895 1 : func (w *LogWriter) WriteRecord(p []byte) (int64, error) {
896 1 : logSize, err := w.SyncRecord(p, nil, nil)
897 1 : return logSize, err
898 1 : }
899 :
900 : // SyncRecord writes a complete record. If wg != nil the record will be
901 : // asynchronously persisted to the underlying writer and done will be called on
902 : // the wait group upon completion. Returns the offset just past the end of the
903 : // record.
904 : // External synchronisation provided by commitPipeline.mu.
905 : func (w *LogWriter) SyncRecord(
906 : p []byte, wg *sync.WaitGroup, err *error,
907 2 : ) (logSize int64, err2 error) {
908 2 : w.pendingSyncForSyncQueueBacking = pendingSyncForSyncQueue{
909 2 : wg: wg,
910 2 : err: err,
911 2 : }
912 2 : return w.SyncRecordGeneralized(p, &w.pendingSyncForSyncQueueBacking)
913 2 : }
914 :
915 : // SyncRecordGeneralized is a version of SyncRecord that accepts a
916 : // PendingSync.
917 2 : func (w *LogWriter) SyncRecordGeneralized(p []byte, ps PendingSync) (logSize int64, err2 error) {
918 2 : if w.err != nil {
919 0 : return -1, w.err
920 0 : }
921 :
922 : // The `i == 0` condition ensures we handle empty records. Such records can
923 : // possibly be generated for VersionEdits stored in the MANIFEST. While the
924 : // MANIFEST is currently written using Writer, it is good to support the same
925 : // semantics with LogWriter.
926 2 : for i := 0; i == 0 || len(p) > 0; i++ {
927 2 : p = w.emitFragment(i, p)
928 2 : }
929 :
930 2 : if ps.syncRequested() {
931 2 : // If we've been asked to persist the record, add the WaitGroup to the sync
932 2 : // queue and signal the flushLoop. Note that flushLoop will write partial
933 2 : // blocks to the file if syncing has been requested. The contract is that
934 2 : // any record written to the LogWriter to this point will be flushed to the
935 2 : // OS and synced to disk.
936 2 : f := &w.flusher
937 2 : f.pendingSyncs.push(ps)
938 2 : f.ready.Signal()
939 2 : }
940 :
941 2 : offset := w.blockNum*blockSize + int64(w.block.written.Load())
942 2 : // Note that we don't return w.err here as a concurrent call to Close would
943 2 : // race with our read. That's ok because the only error we could be seeing is
944 2 : // one to syncing for which the caller can receive notification of by passing
945 2 : // in a non-nil err argument.
946 2 : return offset, nil
947 : }
948 :
949 : // Size returns the current size of the file.
950 : // External synchronisation provided by commitPipeline.mu.
951 2 : func (w *LogWriter) Size() int64 {
952 2 : return w.blockNum*blockSize + int64(w.block.written.Load())
953 2 : }
954 :
955 2 : func (w *LogWriter) emitEOFTrailer() {
956 2 : // Write a recyclable chunk header with a different log number. Readers
957 2 : // will treat the header as EOF when the log number does not match.
958 2 : b := w.block
959 2 : i := b.written.Load()
960 2 : binary.LittleEndian.PutUint32(b.buf[i+0:i+4], 0) // CRC
961 2 : binary.LittleEndian.PutUint16(b.buf[i+4:i+6], 0) // Size
962 2 : b.buf[i+6] = recyclableFullChunkType
963 2 : binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum+1) // Log number
964 2 : b.written.Store(i + int32(recyclableHeaderSize))
965 2 : }
966 :
967 2 : func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte) {
968 2 : b := w.block
969 2 : i := b.written.Load()
970 2 : first := n == 0
971 2 : last := blockSize-i-recyclableHeaderSize >= int32(len(p))
972 2 :
973 2 : if last {
974 2 : if first {
975 2 : b.buf[i+6] = recyclableFullChunkType
976 2 : } else {
977 1 : b.buf[i+6] = recyclableLastChunkType
978 1 : }
979 1 : } else {
980 1 : if first {
981 1 : b.buf[i+6] = recyclableFirstChunkType
982 1 : } else {
983 1 : b.buf[i+6] = recyclableMiddleChunkType
984 1 : }
985 : }
986 :
987 2 : binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum)
988 2 :
989 2 : r := copy(b.buf[i+recyclableHeaderSize:], p)
990 2 : j := i + int32(recyclableHeaderSize+r)
991 2 : binary.LittleEndian.PutUint32(b.buf[i+0:i+4], crc.New(b.buf[i+6:j]).Value())
992 2 : binary.LittleEndian.PutUint16(b.buf[i+4:i+6], uint16(r))
993 2 : b.written.Store(j)
994 2 :
995 2 : if blockSize-b.written.Load() < recyclableHeaderSize {
996 1 : // There is no room for another fragment in the block, so fill the
997 1 : // remaining bytes with zeros and queue the block for flushing.
998 1 : clear(b.buf[b.written.Load():])
999 1 : w.queueBlock()
1000 1 : }
1001 2 : return p[r:]
1002 : }
1003 :
1004 : // Metrics must typically be called after Close, since the callee will no
1005 : // longer modify the returned LogWriterMetrics. It is also current if there is
1006 : // nothing left to flush in the flush loop, but that is an implementation
1007 : // detail that callers should not rely on.
1008 2 : func (w *LogWriter) Metrics() LogWriterMetrics {
1009 2 : w.flusher.Lock()
1010 2 : defer w.flusher.Unlock()
1011 2 : m := *w.flusher.metrics
1012 2 : return m
1013 2 : }
1014 :
1015 : // LogWriterMetrics contains misc metrics for the log writer.
1016 : type LogWriterMetrics struct {
1017 : WriteThroughput base.ThroughputMetric
1018 : PendingBufferLen base.GaugeSampleMetric
1019 : SyncQueueLen base.GaugeSampleMetric
1020 : }
1021 :
1022 : // Merge merges metrics from x. Requires that x is non-nil.
1023 2 : func (m *LogWriterMetrics) Merge(x *LogWriterMetrics) error {
1024 2 : m.WriteThroughput.Merge(x.WriteThroughput)
1025 2 : m.PendingBufferLen.Merge(x.PendingBufferLen)
1026 2 : m.SyncQueueLen.Merge(x.SyncQueueLen)
1027 2 : return nil
1028 2 : }
|