Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package record
6 :
7 : import (
8 : "context"
9 : "encoding/binary"
10 : "io"
11 : "runtime/pprof"
12 : "sync"
13 : "sync/atomic"
14 : "time"
15 :
16 : "github.com/cockroachdb/crlib/crtime"
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/crc"
20 : "github.com/prometheus/client_golang/prometheus"
21 : )
22 :
23 : var walSyncLabels = pprof.Labels("pebble", "wal-sync")
24 : var errClosedWriter = errors.New("pebble/record: closed LogWriter")
25 :
26 : type block struct {
27 : // buf[:written] has already been filled with fragments. Updated atomically.
28 : written atomic.Int32
29 : // buf[:flushed] has already been flushed to w.
30 : flushed int32
31 : buf [blockSize]byte
32 : }
33 :
34 : type flusher interface {
35 : Flush() error
36 : }
37 :
38 : type syncer interface {
39 : Sync() error
40 : }
41 :
42 : const (
43 : syncConcurrencyBits = 12
44 :
45 : // SyncConcurrency is the maximum number of concurrent sync operations that
46 : // can be performed. Note that a sync operation is initiated either by a call
47 : // to SyncRecord or by a call to Close. Exported as this value also limits
48 : // the commit concurrency in commitPipeline.
49 : SyncConcurrency = 1 << syncConcurrencyBits
50 : )
51 :
52 : type syncSlot struct {
53 : wg *sync.WaitGroup
54 : err *error
55 : }
56 :
57 : // syncQueue is a lock-free fixed-size single-producer, single-consumer
58 : // queue. The single-producer can push to the head, and the single-consumer can
59 : // pop multiple values from the tail. Popping calls Done() on each of the
60 : // available *sync.WaitGroup elements.
61 : type syncQueue struct {
62 : // headTail packs together a 32-bit head index and a 32-bit tail index. Both
63 : // are indexes into slots modulo len(slots)-1.
64 : //
65 : // tail = index of oldest data in queue
66 : // head = index of next slot to fill
67 : //
68 : // Slots in the range [tail, head) are owned by consumers. A consumer
69 : // continues to own a slot outside this range until it nils the slot, at
70 : // which point ownership passes to the producer.
71 : //
72 : // The head index is stored in the most-significant bits so that we can
73 : // atomically add to it and the overflow is harmless.
74 : headTail atomic.Uint64
75 :
76 : // slots is a ring buffer of values stored in this queue. The size must be a
77 : // power of 2. A slot is in use until the tail index has moved beyond it.
78 : slots [SyncConcurrency]syncSlot
79 :
80 : // blocked is an atomic boolean which indicates whether syncing is currently
81 : // blocked or can proceed. It is used by the implementation of
82 : // min-sync-interval to block syncing until the min interval has passed.
83 : blocked atomic.Bool
84 : }
85 :
86 : const dequeueBits = 32
87 :
88 : // unpack extracts the head and tail indices from a 64-bit unsigned integer.
89 1 : func (q *syncQueue) unpack(ptrs uint64) (head, tail uint32) {
90 1 : const mask = 1<<dequeueBits - 1
91 1 : head = uint32((ptrs >> dequeueBits) & mask)
92 1 : tail = uint32(ptrs & mask)
93 1 : return head, tail
94 1 : }
95 :
96 1 : func (q *syncQueue) push(wg *sync.WaitGroup, err *error) {
97 1 : ptrs := q.headTail.Load()
98 1 : head, tail := q.unpack(ptrs)
99 1 : if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
100 0 : panic("pebble: queue is full")
101 : }
102 :
103 1 : slot := &q.slots[head&uint32(len(q.slots)-1)]
104 1 : slot.wg = wg
105 1 : slot.err = err
106 1 :
107 1 : // Increment head. This passes ownership of slot to dequeue and acts as a
108 1 : // store barrier for writing the slot.
109 1 : q.headTail.Add(1 << dequeueBits)
110 : }
111 :
112 0 : func (q *syncQueue) setBlocked() {
113 0 : q.blocked.Store(true)
114 0 : }
115 :
116 1 : func (q *syncQueue) clearBlocked() {
117 1 : q.blocked.Store(false)
118 1 : }
119 :
120 1 : func (q *syncQueue) empty() bool {
121 1 : head, tail, _ := q.load()
122 1 : return head == tail
123 1 : }
124 :
125 : // load returns the head, tail of the queue for what should be synced to the
126 : // caller. It can return a head, tail of zero if syncing is blocked due to
127 : // min-sync-interval. It additionally returns the real length of this queue,
128 : // regardless of whether syncing is blocked.
129 1 : func (q *syncQueue) load() (head, tail, realLength uint32) {
130 1 : ptrs := q.headTail.Load()
131 1 : head, tail = q.unpack(ptrs)
132 1 : realLength = head - tail
133 1 : if q.blocked.Load() {
134 0 : return 0, 0, realLength
135 0 : }
136 1 : return head, tail, realLength
137 : }
138 :
139 : // REQUIRES: queueSemChan is non-nil.
140 1 : func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{}) error {
141 1 : if tail == head {
142 0 : // Queue is empty.
143 0 : return nil
144 0 : }
145 :
146 1 : for ; tail != head; tail++ {
147 1 : slot := &q.slots[tail&uint32(len(q.slots)-1)]
148 1 : wg := slot.wg
149 1 : if wg == nil {
150 0 : return errors.Errorf("nil waiter at %d", errors.Safe(tail&uint32(len(q.slots)-1)))
151 0 : }
152 1 : *slot.err = err
153 1 : slot.wg = nil
154 1 : slot.err = nil
155 1 : // We need to bump the tail count before releasing the queueSemChan
156 1 : // semaphore as releasing the semaphore can cause a blocked goroutine to
157 1 : // acquire the semaphore and enqueue before we've "freed" space in the
158 1 : // queue.
159 1 : q.headTail.Add(1)
160 1 : wg.Done()
161 1 : // Is always non-nil in production, unless using wal package for WAL
162 1 : // failover.
163 1 : if queueSemChan != nil {
164 1 : <-queueSemChan
165 1 : }
166 : }
167 :
168 1 : return nil
169 : }
170 :
171 : // pendingSyncs abstracts out the handling of pending sync requests. In
172 : // standalone mode the implementation is a thin wrapper around syncQueue. In
173 : // the mode where the LogWriter can be subject to failover, there is no queue
174 : // kept in the LogWriter and the signaling to those waiting for sync is
175 : // handled in the wal package.
176 : //
177 : // To avoid heap allocations due to the use of this interface, the parameters
178 : // and return values follow some strict rules:
179 : // - The PendingSync parameter can be reused by the caller after push returns.
180 : // The implementation should be a pointer backed by a struct that is already
181 : // heap allocated, which the caller can reuse for the next push call.
182 : // - The pendingSyncSnapshot return value must be backed by the pendingSyncs
183 : // implementation, so calling snapshotForPop again will cause the previous
184 : // snapshot to be overwritten.
185 : type pendingSyncs interface {
186 : push(PendingSync)
187 : setBlocked()
188 : clearBlocked()
189 : empty() bool
190 : snapshotForPop() pendingSyncsSnapshot
191 : pop(snap pendingSyncsSnapshot, err error) error
192 : }
193 :
194 : type pendingSyncsSnapshot interface {
195 : empty() bool
196 : }
197 :
198 : // PendingSync abstracts the sync specification for a record queued on the
199 : // LogWriter. The only implementations are provided in this package since
200 : // syncRequested is not exported.
201 : type PendingSync interface {
202 : syncRequested() bool
203 : }
204 :
205 : // The implementation of pendingSyncs in standalone mode.
206 : type pendingSyncsWithSyncQueue struct {
207 : syncQueue
208 : syncQueueLen *base.GaugeSampleMetric
209 : snapshotBacking syncQueueSnapshot
210 : // See the comment for LogWriterConfig.QueueSemChan.
211 : queueSemChan chan struct{}
212 : }
213 :
214 : var _ pendingSyncs = &pendingSyncsWithSyncQueue{}
215 :
216 1 : func (q *pendingSyncsWithSyncQueue) push(ps PendingSync) {
217 1 : ps2 := ps.(*pendingSyncForSyncQueue)
218 1 : q.syncQueue.push(ps2.wg, ps2.err)
219 1 : }
220 :
221 1 : func (q *pendingSyncsWithSyncQueue) snapshotForPop() pendingSyncsSnapshot {
222 1 : head, tail, realLength := q.syncQueue.load()
223 1 : q.snapshotBacking = syncQueueSnapshot{
224 1 : head: head,
225 1 : tail: tail,
226 1 : }
227 1 : q.syncQueueLen.AddSample(int64(realLength))
228 1 : return &q.snapshotBacking
229 1 : }
230 :
231 1 : func (q *pendingSyncsWithSyncQueue) pop(snap pendingSyncsSnapshot, err error) error {
232 1 : s := snap.(*syncQueueSnapshot)
233 1 : return q.syncQueue.pop(s.head, s.tail, err, q.queueSemChan)
234 1 : }
235 :
236 : // The implementation of pendingSyncsSnapshot in standalone mode.
237 : type syncQueueSnapshot struct {
238 : head, tail uint32
239 : }
240 :
241 1 : func (s *syncQueueSnapshot) empty() bool {
242 1 : return s.head == s.tail
243 1 : }
244 :
245 : // The implementation of pendingSync in standalone mode.
246 : type pendingSyncForSyncQueue struct {
247 : wg *sync.WaitGroup
248 : err *error
249 : }
250 :
251 1 : func (ps *pendingSyncForSyncQueue) syncRequested() bool {
252 1 : return ps.wg != nil
253 1 : }
254 :
255 : // The implementation of pendingSyncs in failover mode.
256 : type pendingSyncsWithHighestSyncIndex struct {
257 : // The highest "index" queued that is requesting a sync. Initialized
258 : // to NoSyncIndex, and reset to NoSyncIndex after the sync.
259 : index atomic.Int64
260 : snapshotBacking PendingSyncIndex
261 : // blocked is an atomic boolean which indicates whether syncing is currently
262 : // blocked or can proceed. It is used by the implementation of
263 : // min-sync-interval to block syncing until the min interval has passed.
264 : blocked atomic.Bool
265 : externalSyncQueueCallback ExternalSyncQueueCallback
266 : }
267 :
268 : // NoSyncIndex is the value of PendingSyncIndex when a sync is not requested.
269 : const NoSyncIndex = -1
270 :
271 : func (si *pendingSyncsWithHighestSyncIndex) init(
272 : externalSyncQueueCallback ExternalSyncQueueCallback,
273 1 : ) {
274 1 : si.index.Store(NoSyncIndex)
275 1 : si.externalSyncQueueCallback = externalSyncQueueCallback
276 1 : }
277 :
278 1 : func (si *pendingSyncsWithHighestSyncIndex) push(ps PendingSync) {
279 1 : ps2 := ps.(*PendingSyncIndex)
280 1 : si.index.Store(ps2.Index)
281 1 : }
282 :
283 0 : func (si *pendingSyncsWithHighestSyncIndex) setBlocked() {
284 0 : si.blocked.Store(true)
285 0 : }
286 :
287 1 : func (si *pendingSyncsWithHighestSyncIndex) clearBlocked() {
288 1 : si.blocked.Store(false)
289 1 : }
290 :
291 1 : func (si *pendingSyncsWithHighestSyncIndex) empty() bool {
292 1 : return si.load() == NoSyncIndex
293 1 : }
294 :
295 1 : func (si *pendingSyncsWithHighestSyncIndex) snapshotForPop() pendingSyncsSnapshot {
296 1 : si.snapshotBacking = PendingSyncIndex{Index: si.load()}
297 1 : return &si.snapshotBacking
298 1 : }
299 :
300 1 : func (si *pendingSyncsWithHighestSyncIndex) load() int64 {
301 1 : index := si.index.Load()
302 1 : if index != NoSyncIndex && si.blocked.Load() {
303 0 : index = NoSyncIndex
304 0 : }
305 1 : return index
306 : }
307 :
308 1 : func (si *pendingSyncsWithHighestSyncIndex) pop(snap pendingSyncsSnapshot, err error) error {
309 1 : index := snap.(*PendingSyncIndex)
310 1 : if index.Index == NoSyncIndex {
311 0 : return nil
312 0 : }
313 : // Set to NoSyncIndex if a higher index has not queued.
314 1 : si.index.CompareAndSwap(index.Index, NoSyncIndex)
315 1 : si.externalSyncQueueCallback(*index, err)
316 1 : return nil
317 : }
318 :
319 : // PendingSyncIndex implements both pendingSyncsSnapshot and PendingSync.
320 : type PendingSyncIndex struct {
321 : // Index is some state meaningful to the user of LogWriter. The LogWriter
322 : // itself only examines whether Index is equal to NoSyncIndex.
323 : Index int64
324 : }
325 :
326 1 : func (s *PendingSyncIndex) empty() bool {
327 1 : return s.Index == NoSyncIndex
328 1 : }
329 :
330 1 : func (s *PendingSyncIndex) syncRequested() bool {
331 1 : return s.Index != NoSyncIndex
332 1 : }
333 :
334 : // flusherCond is a specialized condition variable that allows its condition to
335 : // change and readiness be signalled without holding its associated mutex. In
336 : // particular, when a waiter is added to syncQueue atomically, this condition
337 : // variable can be signalled without holding flusher.Mutex.
338 : type flusherCond struct {
339 : mu *sync.Mutex
340 : q pendingSyncs
341 : cond sync.Cond
342 : }
343 :
344 1 : func (c *flusherCond) init(mu *sync.Mutex, q pendingSyncs) {
345 1 : c.mu = mu
346 1 : c.q = q
347 1 : // Yes, this is a bit circular, but that is intentional. flusherCond.cond.L
348 1 : // points flusherCond so that when cond.L.Unlock is called flusherCond.Unlock
349 1 : // will be called and we can check the !syncQueue.empty() condition.
350 1 : c.cond.L = c
351 1 : }
352 :
353 1 : func (c *flusherCond) Signal() {
354 1 : // Pass-through to the cond var.
355 1 : c.cond.Signal()
356 1 : }
357 :
358 1 : func (c *flusherCond) Wait() {
359 1 : // Pass-through to the cond var. Note that internally the cond var implements
360 1 : // Wait as:
361 1 : //
362 1 : // t := notifyListAdd()
363 1 : // L.Unlock()
364 1 : // notifyListWait(t)
365 1 : // L.Lock()
366 1 : //
367 1 : // We've configured the cond var to call flusherReady.Unlock() which allows
368 1 : // us to check the !syncQueue.empty() condition without a danger of missing a
369 1 : // notification. Any call to flusherReady.Signal() after notifyListAdd() is
370 1 : // called will cause the subsequent notifyListWait() to return immediately.
371 1 : c.cond.Wait()
372 1 : }
373 :
374 1 : func (c *flusherCond) Lock() {
375 1 : c.mu.Lock()
376 1 : }
377 :
378 1 : func (c *flusherCond) Unlock() {
379 1 : c.mu.Unlock()
380 1 : if !c.q.empty() {
381 1 : // If the current goroutine is about to block on sync.Cond.Wait, this call
382 1 : // to Signal will prevent that. The comment in Wait above explains a bit
383 1 : // about what is going on here, but it is worth reiterating:
384 1 : //
385 1 : // flusherCond.Wait()
386 1 : // sync.Cond.Wait()
387 1 : // t := notifyListAdd()
388 1 : // flusherCond.Unlock() <-- we are here
389 1 : // notifyListWait(t)
390 1 : // flusherCond.Lock()
391 1 : //
392 1 : // The call to Signal here results in:
393 1 : //
394 1 : // sync.Cond.Signal()
395 1 : // notifyListNotifyOne()
396 1 : //
397 1 : // The call to notifyListNotifyOne() will prevent the call to
398 1 : // notifyListWait(t) from blocking.
399 1 : c.cond.Signal()
400 1 : }
401 : }
402 :
403 : type durationFunc func() time.Duration
404 :
405 : // syncTimer is an interface for timers, modeled on the closure callback mode
406 : // of time.Timer. See time.AfterFunc and LogWriter.afterFunc. syncTimer is used
407 : // by tests to mock out the timer functionality used to implement
408 : // min-sync-interval.
409 : type syncTimer interface {
410 : Reset(time.Duration) bool
411 : Stop() bool
412 : }
413 :
414 : // LogWriter writes records to an underlying io.Writer. In order to support WAL
415 : // file reuse, a LogWriter's records are tagged with the WAL's file
416 : // number. When reading a log file a record from a previous incarnation of the
417 : // file will return the error ErrInvalidLogNum.
418 : type LogWriter struct {
419 : // w is the underlying writer.
420 : w io.Writer
421 : // c is w as a closer.
422 : c io.Closer
423 : // s is w as a syncer.
424 : s syncer
425 : // logNum is the low 32-bits of the log's file number.
426 : logNum uint32
427 : // blockNum is the zero based block number for the current block.
428 : blockNum int64
429 : // err is any accumulated error. It originates in flusher.err, and is
430 : // updated to reflect flusher.err when a block is full and getting enqueued.
431 : // Therefore, there is a lag between when flusher.err has a non-nil error,
432 : // and when that non-nil error is reflected in LogWriter.err. On close, it
433 : // is set to errClosedWriter to inform accidental future calls to
434 : // SyncRecord*.
435 : err error
436 : // block is the current block being written. Protected by flusher.Mutex.
437 : block *block
438 : free struct {
439 : sync.Mutex
440 : blocks []*block
441 : }
442 :
443 : flusher struct {
444 : sync.Mutex
445 : // Flusher ready is a condition variable that is signalled when there are
446 : // blocks to flush, syncing has been requested, or the LogWriter has been
447 : // closed. For signalling of a sync, it is safe to call without holding
448 : // flusher.Mutex.
449 : ready flusherCond
450 : // Set to true when the flush loop should be closed.
451 : close bool
452 : // Closed when the flush loop has terminated.
453 : closed chan struct{}
454 : // Accumulated flush error.
455 : err error
456 : // minSyncInterval is the minimum duration between syncs.
457 : minSyncInterval durationFunc
458 : fsyncLatency prometheus.Histogram
459 : pending []*block
460 : // Pushing and popping from pendingSyncs does not require flusher mutex to
461 : // be held.
462 : pendingSyncs pendingSyncs
463 : metrics *LogWriterMetrics
464 : }
465 :
466 : // afterFunc is a hook to allow tests to mock out the timer functionality
467 : // used for min-sync-interval. In normal operation this points to
468 : // time.AfterFunc.
469 : afterFunc func(d time.Duration, f func()) syncTimer
470 :
471 : // Backing for both pendingSyncs implementations.
472 : pendingSyncsBackingQ pendingSyncsWithSyncQueue
473 : pendingSyncsBackingIndex pendingSyncsWithHighestSyncIndex
474 :
475 : pendingSyncForSyncQueueBacking pendingSyncForSyncQueue
476 :
477 : // syncedOffset is the offset in the log that is durably synced after a
478 : // flush. This member is used to write the WAL Sync chunk format's "Offset"
479 : // field in the header.
480 : syncedOffset atomic.Uint64
481 :
482 : // emitFragment is set at runtime depending on which FormatMajorVersion
483 : // is used. emitFragment will be set to writing WAL Sync chunk formats
484 : // if the FormatMajorVersion is greater than or equal to FormatWALSyncChunks,
485 : // otherwise it will write the recyclable chunk format.
486 : emitFragment func(n int, p []byte) (remainingP []byte)
487 : }
488 :
489 : // LogWriterConfig is a struct used for configuring new LogWriters
490 : type LogWriterConfig struct {
491 : WALMinSyncInterval durationFunc
492 : WALFsyncLatency prometheus.Histogram
493 : // QueueSemChan is an optional channel to pop from when popping from
494 : // LogWriter.flusher.syncQueue. It functions as a semaphore that prevents
495 : // the syncQueue from overflowing (which will cause a panic). All production
496 : // code ensures this is non-nil.
497 : QueueSemChan chan struct{}
498 :
499 : // ExternalSyncQueueCallback is set to non-nil when the LogWriter is used
500 : // as part of a WAL implementation that can failover between LogWriters.
501 : //
502 : // In this case, QueueSemChan is always nil, and SyncRecordGeneralized must
503 : // be used with a PendingSync parameter that is implemented by
504 : // PendingSyncIndex. When an index is synced (which implies all earlier
505 : // indices are also synced), this callback is invoked. The caller must not
506 : // hold any mutex when invoking this callback, since the lock ordering
507 : // requirement in this case is that any higher layer locks (in the wal
508 : // package) precede the lower layer locks (in the record package). These
509 : // callbacks are serialized since they are invoked from the flushLoop.
510 : ExternalSyncQueueCallback ExternalSyncQueueCallback
511 :
512 : // WriteWALSyncOffsets determines whether to write WAL sync chunk offsets.
513 : // The format major version can change (ratchet) at runtime, so this must be
514 : // a function rather than a static bool to ensure we use the latest format version.
515 : WriteWALSyncOffsets func() bool
516 : }
517 :
518 : // ExternalSyncQueueCallback is to be run when a PendingSync has been
519 : // processed, either successfully or with an error.
520 : type ExternalSyncQueueCallback func(doneSync PendingSyncIndex, err error)
521 :
522 : // initialAllocatedBlocksCap is the initial capacity of the various slices
523 : // intended to hold LogWriter blocks. The LogWriter may allocate more blocks
524 : // than this threshold allows.
525 : const initialAllocatedBlocksCap = 32
526 :
527 : // blockPool pools *blocks to avoid allocations. Blocks are only added to the
528 : // Pool when a LogWriter is closed. Before that, free blocks are maintained
529 : // within a LogWriter's own internal free list `w.free.blocks`.
530 : var blockPool = sync.Pool{
531 1 : New: func() any { return &block{} },
532 : }
533 :
534 : // NewLogWriter returns a new LogWriter.
535 : //
536 : // The io.Writer may also be used as an io.Closer and syncer. No other methods
537 : // will be called on the writer.
538 : func NewLogWriter(
539 : w io.Writer, logNum base.DiskFileNum, logWriterConfig LogWriterConfig,
540 1 : ) *LogWriter {
541 1 : c, _ := w.(io.Closer)
542 1 : s, _ := w.(syncer)
543 1 : r := &LogWriter{
544 1 : w: w,
545 1 : c: c,
546 1 : s: s,
547 1 : // NB: we truncate the 64-bit log number to 32-bits. This is ok because a)
548 1 : // we are very unlikely to reach a file number of 4 billion and b) the log
549 1 : // number is used as a validation check and using only the low 32-bits is
550 1 : // sufficient for that purpose.
551 1 : logNum: uint32(logNum),
552 1 : afterFunc: func(d time.Duration, f func()) syncTimer {
553 0 : return time.AfterFunc(d, f)
554 0 : },
555 : }
556 :
557 1 : if logWriterConfig.WriteWALSyncOffsets() {
558 1 : r.emitFragment = r.emitFragmentSyncOffsets
559 1 : } else {
560 1 : r.emitFragment = r.emitFragmentRecyclable
561 1 : }
562 :
563 1 : m := &LogWriterMetrics{}
564 1 : if logWriterConfig.ExternalSyncQueueCallback != nil {
565 1 : r.pendingSyncsBackingIndex.init(logWriterConfig.ExternalSyncQueueCallback)
566 1 : r.flusher.pendingSyncs = &r.pendingSyncsBackingIndex
567 1 : } else {
568 1 : r.pendingSyncsBackingQ = pendingSyncsWithSyncQueue{
569 1 : syncQueueLen: &m.SyncQueueLen,
570 1 : queueSemChan: logWriterConfig.QueueSemChan,
571 1 : }
572 1 : r.flusher.pendingSyncs = &r.pendingSyncsBackingQ
573 1 : }
574 :
575 1 : r.free.blocks = make([]*block, 0, initialAllocatedBlocksCap)
576 1 : r.block = blockPool.Get().(*block)
577 1 : r.flusher.ready.init(&r.flusher.Mutex, r.flusher.pendingSyncs)
578 1 : r.flusher.closed = make(chan struct{})
579 1 : r.flusher.pending = make([]*block, 0, cap(r.free.blocks))
580 1 : r.flusher.metrics = m
581 1 :
582 1 : f := &r.flusher
583 1 : f.minSyncInterval = logWriterConfig.WALMinSyncInterval
584 1 : f.fsyncLatency = logWriterConfig.WALFsyncLatency
585 1 :
586 1 : go func() {
587 1 : pprof.Do(context.Background(), walSyncLabels, r.flushLoop)
588 1 : }()
589 1 : return r
590 : }
591 :
592 1 : func (w *LogWriter) flushLoop(context.Context) {
593 1 : f := &w.flusher
594 1 : f.Lock()
595 1 :
596 1 : // Initialize idleStartTime to when the loop starts.
597 1 : idleStartTime := crtime.NowMono()
598 1 : var syncTimer syncTimer
599 1 : defer func() {
600 1 : // Capture the idle duration between the last piece of work and when the
601 1 : // loop terminated.
602 1 : f.metrics.WriteThroughput.IdleDuration += idleStartTime.Elapsed()
603 1 : if syncTimer != nil {
604 0 : syncTimer.Stop()
605 0 : }
606 1 : close(f.closed)
607 1 : f.Unlock()
608 : }()
609 :
610 : // writtenOffset is the amount of data that has been written
611 : // but not necessarily synced. This is used to update logWriter's
612 : // syncedOffset after a sync.
613 1 : var writtenOffset uint64 = 0
614 1 :
615 1 : // The flush loop performs flushing of full and partial data blocks to the
616 1 : // underlying writer (LogWriter.w), syncing of the writer, and notification
617 1 : // to sync requests that they have completed.
618 1 : //
619 1 : // - flusher.ready is a condition variable that is signalled when there is
620 1 : // work to do. Full blocks are contained in flusher.pending. The current
621 1 : // partial block is in LogWriter.block. And sync operations are held in
622 1 : // flusher.syncQ.
623 1 : //
624 1 : // - The decision to sync is determined by whether there are any sync
625 1 : // requests present in flusher.syncQ and whether enough time has elapsed
626 1 : // since the last sync. If not enough time has elapsed since the last sync,
627 1 : // flusher.syncQ.blocked will be set to 1. If syncing is blocked,
628 1 : // syncQueue.empty() will return true and syncQueue.load() will return 0,0
629 1 : // (i.e. an empty list).
630 1 : //
631 1 : // - flusher.syncQ.blocked is cleared by a timer that is initialized when
632 1 : // blocked is set to 1. When blocked is 1, no syncing will take place, but
633 1 : // flushing will continue to be performed. The on/off toggle for syncing
634 1 : // does not need to be carefully synchronized with the rest of processing
635 1 : // -- all we need to ensure is that after any transition to blocked=1 there
636 1 : // is eventually a transition to blocked=0. syncTimer performs this
637 1 : // transition. Note that any change to min-sync-interval will not take
638 1 : // effect until the previous timer elapses.
639 1 : //
640 1 : // - Picking up the syncing work to perform requires coordination with
641 1 : // picking up the flushing work. Specifically, flushing work is queued
642 1 : // before syncing work. The guarantee of this code is that when a sync is
643 1 : // requested, any previously queued flush work will be synced. This
644 1 : // motivates reading the syncing work (f.syncQ.load()) before picking up
645 1 : // the flush work (w.block.written.Load()).
646 1 :
647 1 : // The list of full blocks that need to be written. This is copied from
648 1 : // f.pending on every loop iteration, though the number of elements is
649 1 : // usually small (most frequently 1). In the case of the WAL LogWriter, the
650 1 : // number of blocks is bounded by the size of the WAL's corresponding
651 1 : // memtable (MemtableSize/BlockSize). With the default 64 MiB memtables,
652 1 : // this works out to at most 2048 elements if the entirety of the memtable's
653 1 : // contents are queued.
654 1 : pending := make([]*block, 0, cap(f.pending))
655 1 : for {
656 1 : for {
657 1 : // Grab the portion of the current block that requires flushing. Note that
658 1 : // the current block can be added to the pending blocks list after we release
659 1 : // the flusher lock, but it won't be part of pending.
660 1 : written := w.block.written.Load()
661 1 : if len(f.pending) > 0 || written > w.block.flushed || !f.pendingSyncs.empty() {
662 1 : break
663 : }
664 1 : if f.close {
665 1 : // If the writer is closed, pretend the sync timer fired immediately so
666 1 : // that we can process any queued sync requests.
667 1 : f.pendingSyncs.clearBlocked()
668 1 : if !f.pendingSyncs.empty() {
669 0 : break
670 : }
671 1 : return
672 : }
673 1 : f.ready.Wait()
674 1 : continue
675 : }
676 : // Found work to do, so no longer idle.
677 : //
678 : // NB: it is safe to read pending before loading from the syncQ since
679 : // mutations to pending require the w.flusher mutex, which is held here.
680 : // There is no risk that someone will concurrently add to pending, so the
681 : // following sequence, which would pick up a syncQ entry without the
682 : // corresponding data, is impossible:
683 : //
684 : // Thread enqueueing This thread
685 : // 1. read pending
686 : // 2. add block to pending
687 : // 3. add to syncQ
688 : // 4. read syncQ
689 1 : workStartTime := crtime.NowMono()
690 1 : idleDuration := workStartTime.Sub(idleStartTime)
691 1 : pending = append(pending[:0], f.pending...)
692 1 : f.pending = f.pending[:0]
693 1 : f.metrics.PendingBufferLen.AddSample(int64(len(pending)))
694 1 :
695 1 : // Grab the list of sync waiters. Note that syncQueue.load() will return
696 1 : // 0,0 while we're waiting for the min-sync-interval to expire. This
697 1 : // allows flushing to proceed even if we're not ready to sync.
698 1 : snap := f.pendingSyncs.snapshotForPop()
699 1 :
700 1 : // Grab the portion of the current block that requires flushing. Note that
701 1 : // the current block can be added to the pending blocks list after we
702 1 : // release the flusher lock, but it won't be part of pending. This has to
703 1 : // be ordered after we get the list of sync waiters from syncQ in order to
704 1 : // prevent a race where a waiter adds itself to syncQ, but this thread
705 1 : // picks up the entry in syncQ and not the buffered data.
706 1 : written := w.block.written.Load()
707 1 : data := w.block.buf[w.block.flushed:written]
708 1 : w.block.flushed = written
709 1 :
710 1 : fErr := f.err
711 1 : f.Unlock()
712 1 : // If flusher has an error, we propagate it to waiters. Note in spite of
713 1 : // error we consume the pending list above to free blocks for writers.
714 1 : if fErr != nil {
715 0 : // NB: pop may invoke ExternalSyncQueueCallback, which is why we have
716 0 : // called f.Unlock() above. We will acquire the lock again below.
717 0 : _ = f.pendingSyncs.pop(snap, fErr)
718 0 : // Update the idleStartTime if work could not be done, so that we don't
719 0 : // include the duration we tried to do work as idle. We don't bother
720 0 : // with the rest of the accounting, which means we will undercount.
721 0 : idleStartTime = crtime.NowMono()
722 0 : f.Lock()
723 0 : continue
724 : }
725 1 : writtenOffset += uint64(len(data))
726 1 : synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, snap)
727 1 : f.Lock()
728 1 : if synced && f.fsyncLatency != nil {
729 1 : w.syncedOffset.Store(writtenOffset)
730 1 : f.fsyncLatency.Observe(float64(syncLatency))
731 1 : }
732 1 : f.err = err
733 1 : if f.err != nil {
734 0 : f.pendingSyncs.clearBlocked()
735 0 : // Update the idleStartTime if work could not be done, so that we don't
736 0 : // include the duration we tried to do work as idle. We don't bother
737 0 : // with the rest of the accounting, which means we will undercount.
738 0 : idleStartTime = crtime.NowMono()
739 0 : continue
740 : }
741 :
742 1 : if synced && f.minSyncInterval != nil {
743 0 : // A sync was performed. Make sure we've waited for the min sync
744 0 : // interval before syncing again.
745 0 : if min := f.minSyncInterval(); min > 0 {
746 0 : f.pendingSyncs.setBlocked()
747 0 : if syncTimer == nil {
748 0 : syncTimer = w.afterFunc(min, func() {
749 0 : f.pendingSyncs.clearBlocked()
750 0 : f.ready.Signal()
751 0 : })
752 0 : } else {
753 0 : syncTimer.Reset(min)
754 0 : }
755 : }
756 : }
757 : // Finished work, and started idling.
758 1 : idleStartTime = crtime.NowMono()
759 1 : workDuration := idleStartTime.Sub(workStartTime)
760 1 : f.metrics.WriteThroughput.Bytes += bytesWritten
761 1 : f.metrics.WriteThroughput.WorkDuration += workDuration
762 1 : f.metrics.WriteThroughput.IdleDuration += idleDuration
763 : }
764 : }
765 :
766 : func (w *LogWriter) flushPending(
767 : data []byte, pending []*block, snap pendingSyncsSnapshot,
768 1 : ) (synced bool, syncLatency time.Duration, bytesWritten int64, err error) {
769 1 : defer func() {
770 1 : // Translate panics into errors. The errors will cause flushLoop to shut
771 1 : // down, but allows us to do so in a controlled way and avoid swallowing
772 1 : // the stack that created the panic if panic'ing itself hits a panic
773 1 : // (e.g. unlock of unlocked mutex).
774 1 : if r := recover(); r != nil {
775 0 : err = errors.Newf("%v", r)
776 0 : }
777 : }()
778 :
779 1 : for _, b := range pending {
780 0 : bytesWritten += blockSize - int64(b.flushed)
781 0 : if err = w.flushBlock(b); err != nil {
782 0 : break
783 : }
784 : }
785 1 : if n := len(data); err == nil && n > 0 {
786 1 : bytesWritten += int64(n)
787 1 : _, err = w.w.Write(data)
788 1 : }
789 :
790 1 : synced = !snap.empty()
791 1 : if synced {
792 1 : if err == nil && w.s != nil {
793 1 : syncLatency, err = w.syncWithLatency()
794 1 : } else {
795 0 : synced = false
796 0 : }
797 1 : f := &w.flusher
798 1 : if popErr := f.pendingSyncs.pop(snap, err); popErr != nil {
799 0 : return synced, syncLatency, bytesWritten, firstError(err, popErr)
800 0 : }
801 : }
802 :
803 1 : return synced, syncLatency, bytesWritten, err
804 : }
805 :
806 1 : func (w *LogWriter) syncWithLatency() (time.Duration, error) {
807 1 : start := crtime.NowMono()
808 1 : err := w.s.Sync()
809 1 : syncLatency := start.Elapsed()
810 1 : return syncLatency, err
811 1 : }
812 :
813 0 : func (w *LogWriter) flushBlock(b *block) error {
814 0 : if _, err := w.w.Write(b.buf[b.flushed:]); err != nil {
815 0 : return err
816 0 : }
817 0 : b.written.Store(0)
818 0 : b.flushed = 0
819 0 : w.free.Lock()
820 0 : w.free.blocks = append(w.free.blocks, b)
821 0 : w.free.Unlock()
822 0 : return nil
823 : }
824 :
825 : // queueBlock queues the current block for writing to the underlying writer,
826 : // allocates a new block and reserves space for the next header.
827 0 : func (w *LogWriter) queueBlock() {
828 0 : // Allocate a new block, blocking until one is available. We do this first
829 0 : // because w.block is protected by w.flusher.Mutex.
830 0 : w.free.Lock()
831 0 : if len(w.free.blocks) == 0 {
832 0 : w.free.blocks = append(w.free.blocks, blockPool.Get().(*block))
833 0 : }
834 0 : nextBlock := w.free.blocks[len(w.free.blocks)-1]
835 0 : w.free.blocks = w.free.blocks[:len(w.free.blocks)-1]
836 0 : w.free.Unlock()
837 0 :
838 0 : f := &w.flusher
839 0 : f.Lock()
840 0 : f.pending = append(f.pending, w.block)
841 0 : w.block = nextBlock
842 0 : f.ready.Signal()
843 0 : w.err = w.flusher.err
844 0 : f.Unlock()
845 0 :
846 0 : w.blockNum++
847 : }
848 :
849 : // Close flushes and syncs any unwritten data and closes the writer.
850 : // Where required, external synchronisation is provided by commitPipeline.mu.
851 1 : func (w *LogWriter) Close() error {
852 1 : return w.closeInternal(PendingSyncIndex{Index: NoSyncIndex})
853 1 : }
854 :
855 : // CloseWithLastQueuedRecord is like Close, but optionally accepts a
856 : // lastQueuedRecord, that the caller will be notified about when synced.
857 1 : func (w *LogWriter) CloseWithLastQueuedRecord(lastQueuedRecord PendingSyncIndex) error {
858 1 : return w.closeInternal(lastQueuedRecord)
859 1 : }
860 :
861 1 : func (w *LogWriter) closeInternal(lastQueuedRecord PendingSyncIndex) error {
862 1 : f := &w.flusher
863 1 :
864 1 : // Emit an EOF trailer signifying the end of this log. This helps readers
865 1 : // differentiate between a corrupted entry in the middle of a log from
866 1 : // garbage at the tail from a recycled log file.
867 1 : w.emitEOFTrailer()
868 1 :
869 1 : // Signal the flush loop to close.
870 1 : f.Lock()
871 1 : f.close = true
872 1 : f.ready.Signal()
873 1 : f.Unlock()
874 1 :
875 1 : // Wait for the flush loop to close. The flush loop will not close until all
876 1 : // pending data has been written or an error occurs.
877 1 : <-f.closed
878 1 :
879 1 : // Sync any flushed data to disk. NB: flushLoop will sync after flushing the
880 1 : // last buffered data only if it was requested via syncQ, so we need to sync
881 1 : // here to ensure that all the data is synced.
882 1 : err := w.flusher.err
883 1 : var syncLatency time.Duration
884 1 : if err == nil && w.s != nil {
885 1 : syncLatency, err = w.syncWithLatency()
886 1 : }
887 1 : f.Lock()
888 1 : if err == nil && f.fsyncLatency != nil {
889 1 : f.fsyncLatency.Observe(float64(syncLatency))
890 1 : }
891 1 : free := w.free.blocks
892 1 : f.Unlock()
893 1 :
894 1 : // NB: the caller of closeInternal may not care about a non-nil cerr below
895 1 : // if all queued writes have been successfully written and synced.
896 1 : if lastQueuedRecord.Index != NoSyncIndex {
897 1 : w.pendingSyncsBackingIndex.externalSyncQueueCallback(lastQueuedRecord, err)
898 1 : }
899 1 : if w.c != nil {
900 1 : cerr := w.c.Close()
901 1 : w.c = nil
902 1 : err = firstError(err, cerr)
903 1 : }
904 :
905 1 : for _, b := range free {
906 0 : b.flushed = 0
907 0 : b.written.Store(0)
908 0 : blockPool.Put(b)
909 0 : }
910 :
911 1 : w.err = errClosedWriter
912 1 : return err
913 : }
914 :
915 : // firstError returns the first non-nil error of err0 and err1, or nil if both
916 : // are nil.
917 1 : func firstError(err0, err1 error) error {
918 1 : if err0 != nil {
919 0 : return err0
920 0 : }
921 1 : return err1
922 : }
923 :
924 : // WriteRecord writes a complete record. Returns the offset just past the end
925 : // of the record.
926 : // External synchronisation provided by commitPipeline.mu.
927 1 : func (w *LogWriter) WriteRecord(p []byte) (int64, error) {
928 1 : logSize, err := w.SyncRecord(p, nil, nil)
929 1 : return logSize, err
930 1 : }
931 :
932 : // SyncRecord writes a complete record. If wg != nil the record will be
933 : // asynchronously persisted to the underlying writer and done will be called on
934 : // the wait group upon completion. Returns the offset just past the end of the
935 : // record.
936 : // External synchronisation provided by commitPipeline.mu.
937 : func (w *LogWriter) SyncRecord(
938 : p []byte, wg *sync.WaitGroup, err *error,
939 1 : ) (logSize int64, err2 error) {
940 1 : w.pendingSyncForSyncQueueBacking = pendingSyncForSyncQueue{
941 1 : wg: wg,
942 1 : err: err,
943 1 : }
944 1 : return w.SyncRecordGeneralized(p, &w.pendingSyncForSyncQueueBacking)
945 1 : }
946 :
947 : // SyncRecordGeneralized is a version of SyncRecord that accepts a
948 : // PendingSync.
949 1 : func (w *LogWriter) SyncRecordGeneralized(p []byte, ps PendingSync) (logSize int64, err2 error) {
950 1 : if w.err != nil {
951 0 : return -1, w.err
952 0 : }
953 :
954 : // The `i == 0` condition ensures we handle empty records. Such records can
955 : // possibly be generated for VersionEdits stored in the MANIFEST. While the
956 : // MANIFEST is currently written using Writer, it is good to support the same
957 : // semantics with LogWriter.
958 1 : for i := 0; i == 0 || len(p) > 0; i++ {
959 1 : p = w.emitFragment(i, p)
960 1 : }
961 :
962 1 : if ps.syncRequested() {
963 1 : // If we've been asked to persist the record, add the WaitGroup to the sync
964 1 : // queue and signal the flushLoop. Note that flushLoop will write partial
965 1 : // blocks to the file if syncing has been requested. The contract is that
966 1 : // any record written to the LogWriter to this point will be flushed to the
967 1 : // OS and synced to disk.
968 1 : f := &w.flusher
969 1 : f.pendingSyncs.push(ps)
970 1 : f.ready.Signal()
971 1 : }
972 :
973 1 : offset := w.blockNum*blockSize + int64(w.block.written.Load())
974 1 : // Note that we don't return w.err here as a concurrent call to Close would
975 1 : // race with our read. That's ok because the only error we could be seeing is
976 1 : // one to syncing for which the caller can receive notification of by passing
977 1 : // in a non-nil err argument.
978 1 : return offset, nil
979 : }
980 :
981 : // Size returns the current size of the file.
982 : // External synchronisation provided by commitPipeline.mu.
983 1 : func (w *LogWriter) Size() int64 {
984 1 : return w.blockNum*blockSize + int64(w.block.written.Load())
985 1 : }
986 :
987 : // emitEOFTrailer writes a special recyclable chunk header to signal EOF.
988 : // The reason why this function writes the recyclable chunk header instead
989 : // of having a function for writing recyclable and WAL sync chunks as
990 : // emitFragment does it because there is no reason to add 8 additional
991 : // bytes to the EOFTrailer for the SyncedOffset as it will be zeroed out anyway.
992 1 : func (w *LogWriter) emitEOFTrailer() {
993 1 : // Write a recyclable chunk header with a different log number. Readers
994 1 : // will treat the header as EOF when the log number does not match.
995 1 : b := w.block
996 1 : i := b.written.Load()
997 1 : binary.LittleEndian.PutUint32(b.buf[i+0:i+4], 0) // CRC
998 1 : binary.LittleEndian.PutUint16(b.buf[i+4:i+6], 0) // Size
999 1 : b.buf[i+6] = recyclableFullChunkEncoding
1000 1 : binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum+1) // Log number
1001 1 : b.written.Store(i + int32(recyclableHeaderSize))
1002 1 : }
1003 :
1004 1 : func (w *LogWriter) emitFragmentRecyclable(n int, p []byte) (remainingP []byte) {
1005 1 : b := w.block
1006 1 : i := b.written.Load()
1007 1 : first := n == 0
1008 1 : last := blockSize-i-recyclableHeaderSize >= int32(len(p))
1009 1 :
1010 1 : if last {
1011 1 : if first {
1012 1 : b.buf[i+6] = recyclableFullChunkEncoding
1013 1 : } else {
1014 0 : b.buf[i+6] = recyclableLastChunkEncoding
1015 0 : }
1016 0 : } else {
1017 0 : if first {
1018 0 : b.buf[i+6] = recyclableFirstChunkEncoding
1019 0 : } else {
1020 0 : b.buf[i+6] = recyclableMiddleChunkEncoding
1021 0 : }
1022 : }
1023 :
1024 1 : binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum)
1025 1 :
1026 1 : r := copy(b.buf[i+recyclableHeaderSize:], p)
1027 1 : j := i + int32(recyclableHeaderSize+r)
1028 1 : binary.LittleEndian.PutUint32(b.buf[i+0:i+4], crc.New(b.buf[i+6:j]).Value())
1029 1 : binary.LittleEndian.PutUint16(b.buf[i+4:i+6], uint16(r))
1030 1 : b.written.Store(j)
1031 1 :
1032 1 : if blockSize-b.written.Load() < recyclableHeaderSize {
1033 0 : // There is no room for another fragment in the block, so fill the
1034 0 : // remaining bytes with zeros and queue the block for flushing.
1035 0 : clear(b.buf[b.written.Load():])
1036 0 : w.queueBlock()
1037 0 : }
1038 1 : return p[r:]
1039 : }
1040 :
1041 1 : func (w *LogWriter) emitFragmentSyncOffsets(n int, p []byte) (remainingP []byte) {
1042 1 : b := w.block
1043 1 : i := b.written.Load()
1044 1 : first := n == 0
1045 1 : last := blockSize-i-walSyncHeaderSize >= int32(len(p))
1046 1 :
1047 1 : if last {
1048 1 : if first {
1049 1 : b.buf[i+6] = walSyncFullChunkEncoding
1050 1 : } else {
1051 0 : b.buf[i+6] = walSyncLastChunkEncoding
1052 0 : }
1053 0 : } else {
1054 0 : if first {
1055 0 : b.buf[i+6] = walSyncFirstChunkEncoding
1056 0 : } else {
1057 0 : b.buf[i+6] = walSyncMiddleChunkEncoding
1058 0 : }
1059 : }
1060 :
1061 1 : binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum)
1062 1 : binary.LittleEndian.PutUint64(b.buf[i+11:i+19], w.syncedOffset.Load())
1063 1 :
1064 1 : r := copy(b.buf[i+walSyncHeaderSize:], p)
1065 1 : j := i + int32(walSyncHeaderSize+r)
1066 1 : binary.LittleEndian.PutUint32(b.buf[i+0:i+4], crc.New(b.buf[i+6:j]).Value())
1067 1 : binary.LittleEndian.PutUint16(b.buf[i+4:i+6], uint16(r))
1068 1 : b.written.Store(j)
1069 1 :
1070 1 : if blockSize-b.written.Load() < walSyncHeaderSize {
1071 0 : // There is no room for another fragment in the block, so fill the
1072 0 : // remaining bytes with zeros and queue the block for flushing.
1073 0 : clear(b.buf[b.written.Load():])
1074 0 : w.queueBlock()
1075 0 : }
1076 1 : return p[r:]
1077 : }
1078 :
1079 : // Metrics must typically be called after Close, since the callee will no
1080 : // longer modify the returned LogWriterMetrics. It is also current if there is
1081 : // nothing left to flush in the flush loop, but that is an implementation
1082 : // detail that callers should not rely on.
1083 1 : func (w *LogWriter) Metrics() LogWriterMetrics {
1084 1 : w.flusher.Lock()
1085 1 : defer w.flusher.Unlock()
1086 1 : m := *w.flusher.metrics
1087 1 : return m
1088 1 : }
1089 :
1090 : // LogWriterMetrics contains misc metrics for the log writer.
1091 : type LogWriterMetrics struct {
1092 : WriteThroughput base.ThroughputMetric
1093 : PendingBufferLen base.GaugeSampleMetric
1094 : SyncQueueLen base.GaugeSampleMetric
1095 : }
1096 :
1097 : // Merge merges metrics from x. Requires that x is non-nil.
1098 1 : func (m *LogWriterMetrics) Merge(x *LogWriterMetrics) error {
1099 1 : m.WriteThroughput.Merge(x.WriteThroughput)
1100 1 : m.PendingBufferLen.Merge(x.PendingBufferLen)
1101 1 : m.SyncQueueLen.Merge(x.SyncQueueLen)
1102 1 : return nil
1103 1 : }
|