LCOV - code coverage report
Current view: top level - pebble/record - log_writer.go (source / functions) Hit Total Coverage
Test: 2024-11-28 08:17Z 21866e88 - tests only.lcov Lines: 526 540 97.4 %
Date: 2024-11-28 08:17:39 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package record
       6             : 
       7             : import (
       8             :         "context"
       9             :         "encoding/binary"
      10             :         "io"
      11             :         "runtime/pprof"
      12             :         "sync"
      13             :         "sync/atomic"
      14             :         "time"
      15             : 
      16             :         "github.com/cockroachdb/crlib/crtime"
      17             :         "github.com/cockroachdb/errors"
      18             :         "github.com/cockroachdb/pebble/internal/base"
      19             :         "github.com/cockroachdb/pebble/internal/crc"
      20             :         "github.com/prometheus/client_golang/prometheus"
      21             : )
      22             : 
      23             : var walSyncLabels = pprof.Labels("pebble", "wal-sync")
      24             : var errClosedWriter = errors.New("pebble/record: closed LogWriter")
      25             : 
      26             : type block struct {
      27             :         // buf[:written] has already been filled with fragments. Updated atomically.
      28             :         written atomic.Int32
      29             :         // buf[:flushed] has already been flushed to w.
      30             :         flushed int32
      31             :         buf     [blockSize]byte
      32             : }
      33             : 
      34             : type flusher interface {
      35             :         Flush() error
      36             : }
      37             : 
      38             : type syncer interface {
      39             :         Sync() error
      40             : }
      41             : 
      42             : const (
      43             :         syncConcurrencyBits = 12
      44             : 
      45             :         // SyncConcurrency is the maximum number of concurrent sync operations that
      46             :         // can be performed. Note that a sync operation is initiated either by a call
      47             :         // to SyncRecord or by a call to Close. Exported as this value also limits
      48             :         // the commit concurrency in commitPipeline.
      49             :         SyncConcurrency = 1 << syncConcurrencyBits
      50             : )
      51             : 
      52             : type syncSlot struct {
      53             :         wg  *sync.WaitGroup
      54             :         err *error
      55             : }
      56             : 
      57             : // syncQueue is a lock-free fixed-size single-producer, single-consumer
      58             : // queue. The single-producer can push to the head, and the single-consumer can
      59             : // pop multiple values from the tail. Popping calls Done() on each of the
      60             : // available *sync.WaitGroup elements.
      61             : type syncQueue struct {
      62             :         // headTail packs together a 32-bit head index and a 32-bit tail index. Both
      63             :         // are indexes into slots modulo len(slots)-1.
      64             :         //
      65             :         // tail = index of oldest data in queue
      66             :         // head = index of next slot to fill
      67             :         //
      68             :         // Slots in the range [tail, head) are owned by consumers.  A consumer
      69             :         // continues to own a slot outside this range until it nils the slot, at
      70             :         // which point ownership passes to the producer.
      71             :         //
      72             :         // The head index is stored in the most-significant bits so that we can
      73             :         // atomically add to it and the overflow is harmless.
      74             :         headTail atomic.Uint64
      75             : 
      76             :         // slots is a ring buffer of values stored in this queue. The size must be a
      77             :         // power of 2. A slot is in use until the tail index has moved beyond it.
      78             :         slots [SyncConcurrency]syncSlot
      79             : 
      80             :         // blocked is an atomic boolean which indicates whether syncing is currently
      81             :         // blocked or can proceed. It is used by the implementation of
      82             :         // min-sync-interval to block syncing until the min interval has passed.
      83             :         blocked atomic.Bool
      84             : }
      85             : 
      86             : const dequeueBits = 32
      87             : 
      88             : // unpack extracts the head and tail indices from a 64-bit unsigned integer.
      89           1 : func (q *syncQueue) unpack(ptrs uint64) (head, tail uint32) {
      90           1 :         const mask = 1<<dequeueBits - 1
      91           1 :         head = uint32((ptrs >> dequeueBits) & mask)
      92           1 :         tail = uint32(ptrs & mask)
      93           1 :         return head, tail
      94           1 : }
      95             : 
      96           1 : func (q *syncQueue) push(wg *sync.WaitGroup, err *error) {
      97           1 :         ptrs := q.headTail.Load()
      98           1 :         head, tail := q.unpack(ptrs)
      99           1 :         if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
     100           0 :                 panic("pebble: queue is full")
     101             :         }
     102             : 
     103           1 :         slot := &q.slots[head&uint32(len(q.slots)-1)]
     104           1 :         slot.wg = wg
     105           1 :         slot.err = err
     106           1 : 
     107           1 :         // Increment head. This passes ownership of slot to dequeue and acts as a
     108           1 :         // store barrier for writing the slot.
     109           1 :         q.headTail.Add(1 << dequeueBits)
     110             : }
     111             : 
     112           1 : func (q *syncQueue) setBlocked() {
     113           1 :         q.blocked.Store(true)
     114           1 : }
     115             : 
     116           1 : func (q *syncQueue) clearBlocked() {
     117           1 :         q.blocked.Store(false)
     118           1 : }
     119             : 
     120           1 : func (q *syncQueue) empty() bool {
     121           1 :         head, tail, _ := q.load()
     122           1 :         return head == tail
     123           1 : }
     124             : 
     125             : // load returns the head, tail of the queue for what should be synced to the
     126             : // caller. It can return a head, tail of zero if syncing is blocked due to
     127             : // min-sync-interval. It additionally returns the real length of this queue,
     128             : // regardless of whether syncing is blocked.
     129           1 : func (q *syncQueue) load() (head, tail, realLength uint32) {
     130           1 :         ptrs := q.headTail.Load()
     131           1 :         head, tail = q.unpack(ptrs)
     132           1 :         realLength = head - tail
     133           1 :         if q.blocked.Load() {
     134           1 :                 return 0, 0, realLength
     135           1 :         }
     136           1 :         return head, tail, realLength
     137             : }
     138             : 
     139             : // REQUIRES: queueSemChan is non-nil.
     140           1 : func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{}) error {
     141           1 :         if tail == head {
     142           1 :                 // Queue is empty.
     143           1 :                 return nil
     144           1 :         }
     145             : 
     146           1 :         for ; tail != head; tail++ {
     147           1 :                 slot := &q.slots[tail&uint32(len(q.slots)-1)]
     148           1 :                 wg := slot.wg
     149           1 :                 if wg == nil {
     150           0 :                         return errors.Errorf("nil waiter at %d", errors.Safe(tail&uint32(len(q.slots)-1)))
     151           0 :                 }
     152           1 :                 *slot.err = err
     153           1 :                 slot.wg = nil
     154           1 :                 slot.err = nil
     155           1 :                 // We need to bump the tail count before releasing the queueSemChan
     156           1 :                 // semaphore as releasing the semaphore can cause a blocked goroutine to
     157           1 :                 // acquire the semaphore and enqueue before we've "freed" space in the
     158           1 :                 // queue.
     159           1 :                 q.headTail.Add(1)
     160           1 :                 wg.Done()
     161           1 :                 // Is always non-nil in production, unless using wal package for WAL
     162           1 :                 // failover.
     163           1 :                 if queueSemChan != nil {
     164           1 :                         <-queueSemChan
     165           1 :                 }
     166             :         }
     167             : 
     168           1 :         return nil
     169             : }
     170             : 
     171             : // pendingSyncs abstracts out the handling of pending sync requests. In
     172             : // standalone mode the implementation is a thin wrapper around syncQueue. In
     173             : // the mode where the LogWriter can be subject to failover, there is no queue
     174             : // kept in the LogWriter and the signaling to those waiting for sync is
     175             : // handled in the wal package.
     176             : //
     177             : // To avoid heap allocations due to the use of this interface, the parameters
     178             : // and return values follow some strict rules:
     179             : //   - The PendingSync parameter can be reused by the caller after push returns.
     180             : //     The implementation should be a pointer backed by a struct that is already
     181             : //     heap allocated, which the caller can reuse for the next push call.
     182             : //   - The pendingSyncSnapshot return value must be backed by the pendingSyncs
     183             : //     implementation, so calling snapshotForPop again will cause the previous
     184             : //     snapshot to be overwritten.
     185             : type pendingSyncs interface {
     186             :         push(PendingSync)
     187             :         setBlocked()
     188             :         clearBlocked()
     189             :         empty() bool
     190             :         snapshotForPop() pendingSyncsSnapshot
     191             :         pop(snap pendingSyncsSnapshot, err error) error
     192             : }
     193             : 
     194             : type pendingSyncsSnapshot interface {
     195             :         empty() bool
     196             : }
     197             : 
     198             : // PendingSync abstracts the sync specification for a record queued on the
     199             : // LogWriter. The only implementations are provided in this package since
     200             : // syncRequested is not exported.
     201             : type PendingSync interface {
     202             :         syncRequested() bool
     203             : }
     204             : 
     205             : // The implementation of pendingSyncs in standalone mode.
     206             : type pendingSyncsWithSyncQueue struct {
     207             :         syncQueue
     208             :         syncQueueLen    *base.GaugeSampleMetric
     209             :         snapshotBacking syncQueueSnapshot
     210             :         // See the comment for LogWriterConfig.QueueSemChan.
     211             :         queueSemChan chan struct{}
     212             : }
     213             : 
     214             : var _ pendingSyncs = &pendingSyncsWithSyncQueue{}
     215             : 
     216           1 : func (q *pendingSyncsWithSyncQueue) push(ps PendingSync) {
     217           1 :         ps2 := ps.(*pendingSyncForSyncQueue)
     218           1 :         q.syncQueue.push(ps2.wg, ps2.err)
     219           1 : }
     220             : 
     221           1 : func (q *pendingSyncsWithSyncQueue) snapshotForPop() pendingSyncsSnapshot {
     222           1 :         head, tail, realLength := q.syncQueue.load()
     223           1 :         q.snapshotBacking = syncQueueSnapshot{
     224           1 :                 head: head,
     225           1 :                 tail: tail,
     226           1 :         }
     227           1 :         q.syncQueueLen.AddSample(int64(realLength))
     228           1 :         return &q.snapshotBacking
     229           1 : }
     230             : 
     231           1 : func (q *pendingSyncsWithSyncQueue) pop(snap pendingSyncsSnapshot, err error) error {
     232           1 :         s := snap.(*syncQueueSnapshot)
     233           1 :         return q.syncQueue.pop(s.head, s.tail, err, q.queueSemChan)
     234           1 : }
     235             : 
     236             : // The implementation of pendingSyncsSnapshot in standalone mode.
     237             : type syncQueueSnapshot struct {
     238             :         head, tail uint32
     239             : }
     240             : 
     241           1 : func (s *syncQueueSnapshot) empty() bool {
     242           1 :         return s.head == s.tail
     243           1 : }
     244             : 
     245             : // The implementation of pendingSync in standalone mode.
     246             : type pendingSyncForSyncQueue struct {
     247             :         wg  *sync.WaitGroup
     248             :         err *error
     249             : }
     250             : 
     251           1 : func (ps *pendingSyncForSyncQueue) syncRequested() bool {
     252           1 :         return ps.wg != nil
     253           1 : }
     254             : 
     255             : // The implementation of pendingSyncs in failover mode.
     256             : type pendingSyncsWithHighestSyncIndex struct {
     257             :         // The highest "index" queued that is requesting a sync. Initialized
     258             :         // to NoSyncIndex, and reset to NoSyncIndex after the sync.
     259             :         index           atomic.Int64
     260             :         snapshotBacking PendingSyncIndex
     261             :         // blocked is an atomic boolean which indicates whether syncing is currently
     262             :         // blocked or can proceed. It is used by the implementation of
     263             :         // min-sync-interval to block syncing until the min interval has passed.
     264             :         blocked                   atomic.Bool
     265             :         externalSyncQueueCallback ExternalSyncQueueCallback
     266             : }
     267             : 
     268             : // NoSyncIndex is the value of PendingSyncIndex when a sync is not requested.
     269             : const NoSyncIndex = -1
     270             : 
     271             : func (si *pendingSyncsWithHighestSyncIndex) init(
     272             :         externalSyncQueueCallback ExternalSyncQueueCallback,
     273           1 : ) {
     274           1 :         si.index.Store(NoSyncIndex)
     275           1 :         si.externalSyncQueueCallback = externalSyncQueueCallback
     276           1 : }
     277             : 
     278           1 : func (si *pendingSyncsWithHighestSyncIndex) push(ps PendingSync) {
     279           1 :         ps2 := ps.(*PendingSyncIndex)
     280           1 :         si.index.Store(ps2.Index)
     281           1 : }
     282             : 
     283           1 : func (si *pendingSyncsWithHighestSyncIndex) setBlocked() {
     284           1 :         si.blocked.Store(true)
     285           1 : }
     286             : 
     287           1 : func (si *pendingSyncsWithHighestSyncIndex) clearBlocked() {
     288           1 :         si.blocked.Store(false)
     289           1 : }
     290             : 
     291           1 : func (si *pendingSyncsWithHighestSyncIndex) empty() bool {
     292           1 :         return si.load() == NoSyncIndex
     293           1 : }
     294             : 
     295           1 : func (si *pendingSyncsWithHighestSyncIndex) snapshotForPop() pendingSyncsSnapshot {
     296           1 :         si.snapshotBacking = PendingSyncIndex{Index: si.load()}
     297           1 :         return &si.snapshotBacking
     298           1 : }
     299             : 
     300           1 : func (si *pendingSyncsWithHighestSyncIndex) load() int64 {
     301           1 :         index := si.index.Load()
     302           1 :         if index != NoSyncIndex && si.blocked.Load() {
     303           1 :                 index = NoSyncIndex
     304           1 :         }
     305           1 :         return index
     306             : }
     307             : 
     308           1 : func (si *pendingSyncsWithHighestSyncIndex) pop(snap pendingSyncsSnapshot, err error) error {
     309           1 :         index := snap.(*PendingSyncIndex)
     310           1 :         if index.Index == NoSyncIndex {
     311           1 :                 return nil
     312           1 :         }
     313             :         // Set to NoSyncIndex if a higher index has not queued.
     314           1 :         si.index.CompareAndSwap(index.Index, NoSyncIndex)
     315           1 :         si.externalSyncQueueCallback(*index, err)
     316           1 :         return nil
     317             : }
     318             : 
     319             : // PendingSyncIndex implements both pendingSyncsSnapshot and PendingSync.
     320             : type PendingSyncIndex struct {
     321             :         // Index is some state meaningful to the user of LogWriter. The LogWriter
     322             :         // itself only examines whether Index is equal to NoSyncIndex.
     323             :         Index int64
     324             : }
     325             : 
     326           1 : func (s *PendingSyncIndex) empty() bool {
     327           1 :         return s.Index == NoSyncIndex
     328           1 : }
     329             : 
     330           1 : func (s *PendingSyncIndex) syncRequested() bool {
     331           1 :         return s.Index != NoSyncIndex
     332           1 : }
     333             : 
     334             : // flusherCond is a specialized condition variable that allows its condition to
     335             : // change and readiness be signalled without holding its associated mutex. In
     336             : // particular, when a waiter is added to syncQueue atomically, this condition
     337             : // variable can be signalled without holding flusher.Mutex.
     338             : type flusherCond struct {
     339             :         mu   *sync.Mutex
     340             :         q    pendingSyncs
     341             :         cond sync.Cond
     342             : }
     343             : 
     344           1 : func (c *flusherCond) init(mu *sync.Mutex, q pendingSyncs) {
     345           1 :         c.mu = mu
     346           1 :         c.q = q
     347           1 :         // Yes, this is a bit circular, but that is intentional. flusherCond.cond.L
     348           1 :         // points flusherCond so that when cond.L.Unlock is called flusherCond.Unlock
     349           1 :         // will be called and we can check the !syncQueue.empty() condition.
     350           1 :         c.cond.L = c
     351           1 : }
     352             : 
     353           1 : func (c *flusherCond) Signal() {
     354           1 :         // Pass-through to the cond var.
     355           1 :         c.cond.Signal()
     356           1 : }
     357             : 
     358           1 : func (c *flusherCond) Wait() {
     359           1 :         // Pass-through to the cond var. Note that internally the cond var implements
     360           1 :         // Wait as:
     361           1 :         //
     362           1 :         //   t := notifyListAdd()
     363           1 :         //   L.Unlock()
     364           1 :         //   notifyListWait(t)
     365           1 :         //   L.Lock()
     366           1 :         //
     367           1 :         // We've configured the cond var to call flusherReady.Unlock() which allows
     368           1 :         // us to check the !syncQueue.empty() condition without a danger of missing a
     369           1 :         // notification. Any call to flusherReady.Signal() after notifyListAdd() is
     370           1 :         // called will cause the subsequent notifyListWait() to return immediately.
     371           1 :         c.cond.Wait()
     372           1 : }
     373             : 
     374           1 : func (c *flusherCond) Lock() {
     375           1 :         c.mu.Lock()
     376           1 : }
     377             : 
     378           1 : func (c *flusherCond) Unlock() {
     379           1 :         c.mu.Unlock()
     380           1 :         if !c.q.empty() {
     381           1 :                 // If the current goroutine is about to block on sync.Cond.Wait, this call
     382           1 :                 // to Signal will prevent that. The comment in Wait above explains a bit
     383           1 :                 // about what is going on here, but it is worth reiterating:
     384           1 :                 //
     385           1 :                 //   flusherCond.Wait()
     386           1 :                 //     sync.Cond.Wait()
     387           1 :                 //       t := notifyListAdd()
     388           1 :                 //       flusherCond.Unlock()    <-- we are here
     389           1 :                 //       notifyListWait(t)
     390           1 :                 //       flusherCond.Lock()
     391           1 :                 //
     392           1 :                 // The call to Signal here results in:
     393           1 :                 //
     394           1 :                 //     sync.Cond.Signal()
     395           1 :                 //       notifyListNotifyOne()
     396           1 :                 //
     397           1 :                 // The call to notifyListNotifyOne() will prevent the call to
     398           1 :                 // notifyListWait(t) from blocking.
     399           1 :                 c.cond.Signal()
     400           1 :         }
     401             : }
     402             : 
     403             : type durationFunc func() time.Duration
     404             : 
     405             : // syncTimer is an interface for timers, modeled on the closure callback mode
     406             : // of time.Timer. See time.AfterFunc and LogWriter.afterFunc. syncTimer is used
     407             : // by tests to mock out the timer functionality used to implement
     408             : // min-sync-interval.
     409             : type syncTimer interface {
     410             :         Reset(time.Duration) bool
     411             :         Stop() bool
     412             : }
     413             : 
     414             : // LogWriter writes records to an underlying io.Writer. In order to support WAL
     415             : // file reuse, a LogWriter's records are tagged with the WAL's file
     416             : // number. When reading a log file a record from a previous incarnation of the
     417             : // file will return the error ErrInvalidLogNum.
     418             : type LogWriter struct {
     419             :         // w is the underlying writer.
     420             :         w io.Writer
     421             :         // c is w as a closer.
     422             :         c io.Closer
     423             :         // s is w as a syncer.
     424             :         s syncer
     425             :         // logNum is the low 32-bits of the log's file number.
     426             :         logNum uint32
     427             :         // blockNum is the zero based block number for the current block.
     428             :         blockNum int64
     429             :         // err is any accumulated error. It originates in flusher.err, and is
     430             :         // updated to reflect flusher.err when a block is full and getting enqueued.
     431             :         // Therefore, there is a lag between when flusher.err has a non-nil error,
     432             :         // and when that non-nil error is reflected in LogWriter.err. On close, it
     433             :         // is set to errClosedWriter to inform accidental future calls to
     434             :         // SyncRecord*.
     435             :         err error
     436             :         // block is the current block being written. Protected by flusher.Mutex.
     437             :         block *block
     438             :         free  struct {
     439             :                 sync.Mutex
     440             :                 blocks []*block
     441             :         }
     442             : 
     443             :         flusher struct {
     444             :                 sync.Mutex
     445             :                 // Flusher ready is a condition variable that is signalled when there are
     446             :                 // blocks to flush, syncing has been requested, or the LogWriter has been
     447             :                 // closed. For signalling of a sync, it is safe to call without holding
     448             :                 // flusher.Mutex.
     449             :                 ready flusherCond
     450             :                 // Set to true when the flush loop should be closed.
     451             :                 close bool
     452             :                 // Closed when the flush loop has terminated.
     453             :                 closed chan struct{}
     454             :                 // Accumulated flush error.
     455             :                 err error
     456             :                 // minSyncInterval is the minimum duration between syncs.
     457             :                 minSyncInterval durationFunc
     458             :                 fsyncLatency    prometheus.Histogram
     459             :                 pending         []*block
     460             :                 // Pushing and popping from pendingSyncs does not require flusher mutex to
     461             :                 // be held.
     462             :                 pendingSyncs pendingSyncs
     463             :                 metrics      *LogWriterMetrics
     464             :         }
     465             : 
     466             :         // afterFunc is a hook to allow tests to mock out the timer functionality
     467             :         // used for min-sync-interval. In normal operation this points to
     468             :         // time.AfterFunc.
     469             :         afterFunc func(d time.Duration, f func()) syncTimer
     470             : 
     471             :         // Backing for both pendingSyncs implementations.
     472             :         pendingSyncsBackingQ     pendingSyncsWithSyncQueue
     473             :         pendingSyncsBackingIndex pendingSyncsWithHighestSyncIndex
     474             : 
     475             :         pendingSyncForSyncQueueBacking pendingSyncForSyncQueue
     476             : }
     477             : 
     478             : // LogWriterConfig is a struct used for configuring new LogWriters
     479             : type LogWriterConfig struct {
     480             :         WALMinSyncInterval durationFunc
     481             :         WALFsyncLatency    prometheus.Histogram
     482             :         // QueueSemChan is an optional channel to pop from when popping from
     483             :         // LogWriter.flusher.syncQueue. It functions as a semaphore that prevents
     484             :         // the syncQueue from overflowing (which will cause a panic). All production
     485             :         // code ensures this is non-nil.
     486             :         QueueSemChan chan struct{}
     487             : 
     488             :         // ExternalSyncQueueCallback is set to non-nil when the LogWriter is used
     489             :         // as part of a WAL implementation that can failover between LogWriters.
     490             :         //
     491             :         // In this case, QueueSemChan is always nil, and SyncRecordGeneralized must
     492             :         // be used with a PendingSync parameter that is implemented by
     493             :         // PendingSyncIndex. When an index is synced (which implies all earlier
     494             :         // indices are also synced), this callback is invoked. The caller must not
     495             :         // hold any mutex when invoking this callback, since the lock ordering
     496             :         // requirement in this case is that any higher layer locks (in the wal
     497             :         // package) precede the lower layer locks (in the record package). These
     498             :         // callbacks are serialized since they are invoked from the flushLoop.
     499             :         ExternalSyncQueueCallback ExternalSyncQueueCallback
     500             : }
     501             : 
     502             : // ExternalSyncQueueCallback is to be run when a PendingSync has been
     503             : // processed, either successfully or with an error.
     504             : type ExternalSyncQueueCallback func(doneSync PendingSyncIndex, err error)
     505             : 
     506             : // initialAllocatedBlocksCap is the initial capacity of the various slices
     507             : // intended to hold LogWriter blocks. The LogWriter may allocate more blocks
     508             : // than this threshold allows.
     509             : const initialAllocatedBlocksCap = 32
     510             : 
     511             : // blockPool pools *blocks to avoid allocations. Blocks are only added to the
     512             : // Pool when a LogWriter is closed. Before that, free blocks are maintained
     513             : // within a LogWriter's own internal free list `w.free.blocks`.
     514             : var blockPool = sync.Pool{
     515           1 :         New: func() any { return &block{} },
     516             : }
     517             : 
     518             : // NewLogWriter returns a new LogWriter.
     519             : //
     520             : // The io.Writer may also be used as an io.Closer and syncer. No other methods
     521             : // will be called on the writer.
     522             : func NewLogWriter(
     523             :         w io.Writer, logNum base.DiskFileNum, logWriterConfig LogWriterConfig,
     524           1 : ) *LogWriter {
     525           1 :         c, _ := w.(io.Closer)
     526           1 :         s, _ := w.(syncer)
     527           1 :         r := &LogWriter{
     528           1 :                 w: w,
     529           1 :                 c: c,
     530           1 :                 s: s,
     531           1 :                 // NB: we truncate the 64-bit log number to 32-bits. This is ok because a)
     532           1 :                 // we are very unlikely to reach a file number of 4 billion and b) the log
     533           1 :                 // number is used as a validation check and using only the low 32-bits is
     534           1 :                 // sufficient for that purpose.
     535           1 :                 logNum: uint32(logNum),
     536           1 :                 afterFunc: func(d time.Duration, f func()) syncTimer {
     537           0 :                         return time.AfterFunc(d, f)
     538           0 :                 },
     539             :         }
     540           1 :         m := &LogWriterMetrics{}
     541           1 :         if logWriterConfig.ExternalSyncQueueCallback != nil {
     542           1 :                 r.pendingSyncsBackingIndex.init(logWriterConfig.ExternalSyncQueueCallback)
     543           1 :                 r.flusher.pendingSyncs = &r.pendingSyncsBackingIndex
     544           1 :         } else {
     545           1 :                 r.pendingSyncsBackingQ = pendingSyncsWithSyncQueue{
     546           1 :                         syncQueueLen: &m.SyncQueueLen,
     547           1 :                         queueSemChan: logWriterConfig.QueueSemChan,
     548           1 :                 }
     549           1 :                 r.flusher.pendingSyncs = &r.pendingSyncsBackingQ
     550           1 :         }
     551             : 
     552           1 :         r.free.blocks = make([]*block, 0, initialAllocatedBlocksCap)
     553           1 :         r.block = blockPool.Get().(*block)
     554           1 :         r.flusher.ready.init(&r.flusher.Mutex, r.flusher.pendingSyncs)
     555           1 :         r.flusher.closed = make(chan struct{})
     556           1 :         r.flusher.pending = make([]*block, 0, cap(r.free.blocks))
     557           1 :         r.flusher.metrics = m
     558           1 : 
     559           1 :         f := &r.flusher
     560           1 :         f.minSyncInterval = logWriterConfig.WALMinSyncInterval
     561           1 :         f.fsyncLatency = logWriterConfig.WALFsyncLatency
     562           1 : 
     563           1 :         go func() {
     564           1 :                 pprof.Do(context.Background(), walSyncLabels, r.flushLoop)
     565           1 :         }()
     566           1 :         return r
     567             : }
     568             : 
     569           1 : func (w *LogWriter) flushLoop(context.Context) {
     570           1 :         f := &w.flusher
     571           1 :         f.Lock()
     572           1 : 
     573           1 :         // Initialize idleStartTime to when the loop starts.
     574           1 :         idleStartTime := crtime.NowMono()
     575           1 :         var syncTimer syncTimer
     576           1 :         defer func() {
     577           1 :                 // Capture the idle duration between the last piece of work and when the
     578           1 :                 // loop terminated.
     579           1 :                 f.metrics.WriteThroughput.IdleDuration += idleStartTime.Elapsed()
     580           1 :                 if syncTimer != nil {
     581           1 :                         syncTimer.Stop()
     582           1 :                 }
     583           1 :                 close(f.closed)
     584           1 :                 f.Unlock()
     585             :         }()
     586             : 
     587             :         // The flush loop performs flushing of full and partial data blocks to the
     588             :         // underlying writer (LogWriter.w), syncing of the writer, and notification
     589             :         // to sync requests that they have completed.
     590             :         //
     591             :         // - flusher.ready is a condition variable that is signalled when there is
     592             :         //   work to do. Full blocks are contained in flusher.pending. The current
     593             :         //   partial block is in LogWriter.block. And sync operations are held in
     594             :         //   flusher.syncQ.
     595             :         //
     596             :         // - The decision to sync is determined by whether there are any sync
     597             :         //   requests present in flusher.syncQ and whether enough time has elapsed
     598             :         //   since the last sync. If not enough time has elapsed since the last sync,
     599             :         //   flusher.syncQ.blocked will be set to 1. If syncing is blocked,
     600             :         //   syncQueue.empty() will return true and syncQueue.load() will return 0,0
     601             :         //   (i.e. an empty list).
     602             :         //
     603             :         // - flusher.syncQ.blocked is cleared by a timer that is initialized when
     604             :         //   blocked is set to 1. When blocked is 1, no syncing will take place, but
     605             :         //   flushing will continue to be performed. The on/off toggle for syncing
     606             :         //   does not need to be carefully synchronized with the rest of processing
     607             :         //   -- all we need to ensure is that after any transition to blocked=1 there
     608             :         //   is eventually a transition to blocked=0. syncTimer performs this
     609             :         //   transition. Note that any change to min-sync-interval will not take
     610             :         //   effect until the previous timer elapses.
     611             :         //
     612             :         // - Picking up the syncing work to perform requires coordination with
     613             :         //   picking up the flushing work. Specifically, flushing work is queued
     614             :         //   before syncing work. The guarantee of this code is that when a sync is
     615             :         //   requested, any previously queued flush work will be synced. This
     616             :         //   motivates reading the syncing work (f.syncQ.load()) before picking up
     617             :         //   the flush work (w.block.written.Load()).
     618             : 
     619             :         // The list of full blocks that need to be written. This is copied from
     620             :         // f.pending on every loop iteration, though the number of elements is
     621             :         // usually small (most frequently 1). In the case of the WAL LogWriter, the
     622             :         // number of blocks is bounded by the size of the WAL's corresponding
     623             :         // memtable (MemtableSize/BlockSize). With the default 64 MiB memtables,
     624             :         // this works out to at most 2048 elements if the entirety of the memtable's
     625             :         // contents are queued.
     626           1 :         pending := make([]*block, 0, cap(f.pending))
     627           1 :         for {
     628           1 :                 for {
     629           1 :                         // Grab the portion of the current block that requires flushing. Note that
     630           1 :                         // the current block can be added to the pending blocks list after we release
     631           1 :                         // the flusher lock, but it won't be part of pending.
     632           1 :                         written := w.block.written.Load()
     633           1 :                         if len(f.pending) > 0 || written > w.block.flushed || !f.pendingSyncs.empty() {
     634           1 :                                 break
     635             :                         }
     636           1 :                         if f.close {
     637           1 :                                 // If the writer is closed, pretend the sync timer fired immediately so
     638           1 :                                 // that we can process any queued sync requests.
     639           1 :                                 f.pendingSyncs.clearBlocked()
     640           1 :                                 if !f.pendingSyncs.empty() {
     641           1 :                                         break
     642             :                                 }
     643           1 :                                 return
     644             :                         }
     645           1 :                         f.ready.Wait()
     646           1 :                         continue
     647             :                 }
     648             :                 // Found work to do, so no longer idle.
     649             :                 //
     650             :                 // NB: it is safe to read pending before loading from the syncQ since
     651             :                 // mutations to pending require the w.flusher mutex, which is held here.
     652             :                 // There is no risk that someone will concurrently add to pending, so the
     653             :                 // following sequence, which would pick up a syncQ entry without the
     654             :                 // corresponding data, is impossible:
     655             :                 //
     656             :                 // Thread enqueueing       This thread
     657             :                 //                         1. read pending
     658             :                 // 2. add block to pending
     659             :                 // 3. add to syncQ
     660             :                 //                         4. read syncQ
     661           1 :                 workStartTime := crtime.NowMono()
     662           1 :                 idleDuration := workStartTime.Sub(idleStartTime)
     663           1 :                 pending = append(pending[:0], f.pending...)
     664           1 :                 f.pending = f.pending[:0]
     665           1 :                 f.metrics.PendingBufferLen.AddSample(int64(len(pending)))
     666           1 : 
     667           1 :                 // Grab the list of sync waiters. Note that syncQueue.load() will return
     668           1 :                 // 0,0 while we're waiting for the min-sync-interval to expire. This
     669           1 :                 // allows flushing to proceed even if we're not ready to sync.
     670           1 :                 snap := f.pendingSyncs.snapshotForPop()
     671           1 : 
     672           1 :                 // Grab the portion of the current block that requires flushing. Note that
     673           1 :                 // the current block can be added to the pending blocks list after we
     674           1 :                 // release the flusher lock, but it won't be part of pending. This has to
     675           1 :                 // be ordered after we get the list of sync waiters from syncQ in order to
     676           1 :                 // prevent a race where a waiter adds itself to syncQ, but this thread
     677           1 :                 // picks up the entry in syncQ and not the buffered data.
     678           1 :                 written := w.block.written.Load()
     679           1 :                 data := w.block.buf[w.block.flushed:written]
     680           1 :                 w.block.flushed = written
     681           1 : 
     682           1 :                 fErr := f.err
     683           1 :                 f.Unlock()
     684           1 :                 // If flusher has an error, we propagate it to waiters. Note in spite of
     685           1 :                 // error we consume the pending list above to free blocks for writers.
     686           1 :                 if fErr != nil {
     687           1 :                         // NB: pop may invoke ExternalSyncQueueCallback, which is why we have
     688           1 :                         // called f.Unlock() above. We will acquire the lock again below.
     689           1 :                         f.pendingSyncs.pop(snap, fErr)
     690           1 :                         // Update the idleStartTime if work could not be done, so that we don't
     691           1 :                         // include the duration we tried to do work as idle. We don't bother
     692           1 :                         // with the rest of the accounting, which means we will undercount.
     693           1 :                         idleStartTime = crtime.NowMono()
     694           1 :                         f.Lock()
     695           1 :                         continue
     696             :                 }
     697           1 :                 synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, snap)
     698           1 :                 f.Lock()
     699           1 :                 if synced && f.fsyncLatency != nil {
     700           1 :                         f.fsyncLatency.Observe(float64(syncLatency))
     701           1 :                 }
     702           1 :                 f.err = err
     703           1 :                 if f.err != nil {
     704           1 :                         f.pendingSyncs.clearBlocked()
     705           1 :                         // Update the idleStartTime if work could not be done, so that we don't
     706           1 :                         // include the duration we tried to do work as idle. We don't bother
     707           1 :                         // with the rest of the accounting, which means we will undercount.
     708           1 :                         idleStartTime = crtime.NowMono()
     709           1 :                         continue
     710             :                 }
     711             : 
     712           1 :                 if synced && f.minSyncInterval != nil {
     713           1 :                         // A sync was performed. Make sure we've waited for the min sync
     714           1 :                         // interval before syncing again.
     715           1 :                         if min := f.minSyncInterval(); min > 0 {
     716           1 :                                 f.pendingSyncs.setBlocked()
     717           1 :                                 if syncTimer == nil {
     718           1 :                                         syncTimer = w.afterFunc(min, func() {
     719           1 :                                                 f.pendingSyncs.clearBlocked()
     720           1 :                                                 f.ready.Signal()
     721           1 :                                         })
     722           1 :                                 } else {
     723           1 :                                         syncTimer.Reset(min)
     724           1 :                                 }
     725             :                         }
     726             :                 }
     727             :                 // Finished work, and started idling.
     728           1 :                 idleStartTime = crtime.NowMono()
     729           1 :                 workDuration := idleStartTime.Sub(workStartTime)
     730           1 :                 f.metrics.WriteThroughput.Bytes += bytesWritten
     731           1 :                 f.metrics.WriteThroughput.WorkDuration += workDuration
     732           1 :                 f.metrics.WriteThroughput.IdleDuration += idleDuration
     733             :         }
     734             : }
     735             : 
     736             : func (w *LogWriter) flushPending(
     737             :         data []byte, pending []*block, snap pendingSyncsSnapshot,
     738           1 : ) (synced bool, syncLatency time.Duration, bytesWritten int64, err error) {
     739           1 :         defer func() {
     740           1 :                 // Translate panics into errors. The errors will cause flushLoop to shut
     741           1 :                 // down, but allows us to do so in a controlled way and avoid swallowing
     742           1 :                 // the stack that created the panic if panic'ing itself hits a panic
     743           1 :                 // (e.g. unlock of unlocked mutex).
     744           1 :                 if r := recover(); r != nil {
     745           0 :                         err = errors.Newf("%v", r)
     746           0 :                 }
     747             :         }()
     748             : 
     749           1 :         for _, b := range pending {
     750           1 :                 bytesWritten += blockSize - int64(b.flushed)
     751           1 :                 if err = w.flushBlock(b); err != nil {
     752           0 :                         break
     753             :                 }
     754             :         }
     755           1 :         if n := len(data); err == nil && n > 0 {
     756           1 :                 bytesWritten += int64(n)
     757           1 :                 _, err = w.w.Write(data)
     758           1 :         }
     759             : 
     760           1 :         synced = !snap.empty()
     761           1 :         if synced {
     762           1 :                 if err == nil && w.s != nil {
     763           1 :                         syncLatency, err = w.syncWithLatency()
     764           1 :                 } else {
     765           1 :                         synced = false
     766           1 :                 }
     767           1 :                 f := &w.flusher
     768           1 :                 if popErr := f.pendingSyncs.pop(snap, err); popErr != nil {
     769           0 :                         return synced, syncLatency, bytesWritten, firstError(err, popErr)
     770           0 :                 }
     771             :         }
     772             : 
     773           1 :         return synced, syncLatency, bytesWritten, err
     774             : }
     775             : 
     776           1 : func (w *LogWriter) syncWithLatency() (time.Duration, error) {
     777           1 :         start := crtime.NowMono()
     778           1 :         err := w.s.Sync()
     779           1 :         syncLatency := start.Elapsed()
     780           1 :         return syncLatency, err
     781           1 : }
     782             : 
     783           1 : func (w *LogWriter) flushBlock(b *block) error {
     784           1 :         if _, err := w.w.Write(b.buf[b.flushed:]); err != nil {
     785           0 :                 return err
     786           0 :         }
     787           1 :         b.written.Store(0)
     788           1 :         b.flushed = 0
     789           1 :         w.free.Lock()
     790           1 :         w.free.blocks = append(w.free.blocks, b)
     791           1 :         w.free.Unlock()
     792           1 :         return nil
     793             : }
     794             : 
     795             : // queueBlock queues the current block for writing to the underlying writer,
     796             : // allocates a new block and reserves space for the next header.
     797           1 : func (w *LogWriter) queueBlock() {
     798           1 :         // Allocate a new block, blocking until one is available. We do this first
     799           1 :         // because w.block is protected by w.flusher.Mutex.
     800           1 :         w.free.Lock()
     801           1 :         if len(w.free.blocks) == 0 {
     802           1 :                 w.free.blocks = append(w.free.blocks, blockPool.Get().(*block))
     803           1 :         }
     804           1 :         nextBlock := w.free.blocks[len(w.free.blocks)-1]
     805           1 :         w.free.blocks = w.free.blocks[:len(w.free.blocks)-1]
     806           1 :         w.free.Unlock()
     807           1 : 
     808           1 :         f := &w.flusher
     809           1 :         f.Lock()
     810           1 :         f.pending = append(f.pending, w.block)
     811           1 :         w.block = nextBlock
     812           1 :         f.ready.Signal()
     813           1 :         w.err = w.flusher.err
     814           1 :         f.Unlock()
     815           1 : 
     816           1 :         w.blockNum++
     817             : }
     818             : 
     819             : // Close flushes and syncs any unwritten data and closes the writer.
     820             : // Where required, external synchronisation is provided by commitPipeline.mu.
     821           1 : func (w *LogWriter) Close() error {
     822           1 :         return w.closeInternal(PendingSyncIndex{Index: NoSyncIndex})
     823           1 : }
     824             : 
     825             : // CloseWithLastQueuedRecord is like Close, but optionally accepts a
     826             : // lastQueuedRecord, that the caller will be notified about when synced.
     827           1 : func (w *LogWriter) CloseWithLastQueuedRecord(lastQueuedRecord PendingSyncIndex) error {
     828           1 :         return w.closeInternal(lastQueuedRecord)
     829           1 : }
     830             : 
     831           1 : func (w *LogWriter) closeInternal(lastQueuedRecord PendingSyncIndex) error {
     832           1 :         f := &w.flusher
     833           1 : 
     834           1 :         // Emit an EOF trailer signifying the end of this log. This helps readers
     835           1 :         // differentiate between a corrupted entry in the middle of a log from
     836           1 :         // garbage at the tail from a recycled log file.
     837           1 :         w.emitEOFTrailer()
     838           1 : 
     839           1 :         // Signal the flush loop to close.
     840           1 :         f.Lock()
     841           1 :         f.close = true
     842           1 :         f.ready.Signal()
     843           1 :         f.Unlock()
     844           1 : 
     845           1 :         // Wait for the flush loop to close. The flush loop will not close until all
     846           1 :         // pending data has been written or an error occurs.
     847           1 :         <-f.closed
     848           1 : 
     849           1 :         // Sync any flushed data to disk. NB: flushLoop will sync after flushing the
     850           1 :         // last buffered data only if it was requested via syncQ, so we need to sync
     851           1 :         // here to ensure that all the data is synced.
     852           1 :         err := w.flusher.err
     853           1 :         var syncLatency time.Duration
     854           1 :         if err == nil && w.s != nil {
     855           1 :                 syncLatency, err = w.syncWithLatency()
     856           1 :         }
     857           1 :         f.Lock()
     858           1 :         if err == nil && f.fsyncLatency != nil {
     859           1 :                 f.fsyncLatency.Observe(float64(syncLatency))
     860           1 :         }
     861           1 :         free := w.free.blocks
     862           1 :         f.Unlock()
     863           1 : 
     864           1 :         // NB: the caller of closeInternal may not care about a non-nil cerr below
     865           1 :         // if all queued writes have been successfully written and synced.
     866           1 :         if lastQueuedRecord.Index != NoSyncIndex {
     867           1 :                 w.pendingSyncsBackingIndex.externalSyncQueueCallback(lastQueuedRecord, err)
     868           1 :         }
     869           1 :         if w.c != nil {
     870           1 :                 cerr := w.c.Close()
     871           1 :                 w.c = nil
     872           1 :                 err = firstError(err, cerr)
     873           1 :         }
     874             : 
     875           1 :         for _, b := range free {
     876           1 :                 b.flushed = 0
     877           1 :                 b.written.Store(0)
     878           1 :                 blockPool.Put(b)
     879           1 :         }
     880             : 
     881           1 :         w.err = errClosedWriter
     882           1 :         return err
     883             : }
     884             : 
     885             : // firstError returns the first non-nil error of err0 and err1, or nil if both
     886             : // are nil.
     887           1 : func firstError(err0, err1 error) error {
     888           1 :         if err0 != nil {
     889           1 :                 return err0
     890           1 :         }
     891           1 :         return err1
     892             : }
     893             : 
     894             : // WriteRecord writes a complete record. Returns the offset just past the end
     895             : // of the record.
     896             : // External synchronisation provided by commitPipeline.mu.
     897           1 : func (w *LogWriter) WriteRecord(p []byte) (int64, error) {
     898           1 :         logSize, err := w.SyncRecord(p, nil, nil)
     899           1 :         return logSize, err
     900           1 : }
     901             : 
     902             : // SyncRecord writes a complete record. If wg != nil the record will be
     903             : // asynchronously persisted to the underlying writer and done will be called on
     904             : // the wait group upon completion. Returns the offset just past the end of the
     905             : // record.
     906             : // External synchronisation provided by commitPipeline.mu.
     907             : func (w *LogWriter) SyncRecord(
     908             :         p []byte, wg *sync.WaitGroup, err *error,
     909           1 : ) (logSize int64, err2 error) {
     910           1 :         w.pendingSyncForSyncQueueBacking = pendingSyncForSyncQueue{
     911           1 :                 wg:  wg,
     912           1 :                 err: err,
     913           1 :         }
     914           1 :         return w.SyncRecordGeneralized(p, &w.pendingSyncForSyncQueueBacking)
     915           1 : }
     916             : 
     917             : // SyncRecordGeneralized is a version of SyncRecord that accepts a
     918             : // PendingSync.
     919           1 : func (w *LogWriter) SyncRecordGeneralized(p []byte, ps PendingSync) (logSize int64, err2 error) {
     920           1 :         if w.err != nil {
     921           0 :                 return -1, w.err
     922           0 :         }
     923             : 
     924             :         // The `i == 0` condition ensures we handle empty records. Such records can
     925             :         // possibly be generated for VersionEdits stored in the MANIFEST. While the
     926             :         // MANIFEST is currently written using Writer, it is good to support the same
     927             :         // semantics with LogWriter.
     928           1 :         for i := 0; i == 0 || len(p) > 0; i++ {
     929           1 :                 p = w.emitFragment(i, p)
     930           1 :         }
     931             : 
     932           1 :         if ps.syncRequested() {
     933           1 :                 // If we've been asked to persist the record, add the WaitGroup to the sync
     934           1 :                 // queue and signal the flushLoop. Note that flushLoop will write partial
     935           1 :                 // blocks to the file if syncing has been requested. The contract is that
     936           1 :                 // any record written to the LogWriter to this point will be flushed to the
     937           1 :                 // OS and synced to disk.
     938           1 :                 f := &w.flusher
     939           1 :                 f.pendingSyncs.push(ps)
     940           1 :                 f.ready.Signal()
     941           1 :         }
     942             : 
     943           1 :         offset := w.blockNum*blockSize + int64(w.block.written.Load())
     944           1 :         // Note that we don't return w.err here as a concurrent call to Close would
     945           1 :         // race with our read. That's ok because the only error we could be seeing is
     946           1 :         // one to syncing for which the caller can receive notification of by passing
     947           1 :         // in a non-nil err argument.
     948           1 :         return offset, nil
     949             : }
     950             : 
     951             : // Size returns the current size of the file.
     952             : // External synchronisation provided by commitPipeline.mu.
     953           1 : func (w *LogWriter) Size() int64 {
     954           1 :         return w.blockNum*blockSize + int64(w.block.written.Load())
     955           1 : }
     956             : 
     957           1 : func (w *LogWriter) emitEOFTrailer() {
     958           1 :         // Write a recyclable chunk header with a different log number.  Readers
     959           1 :         // will treat the header as EOF when the log number does not match.
     960           1 :         b := w.block
     961           1 :         i := b.written.Load()
     962           1 :         binary.LittleEndian.PutUint32(b.buf[i+0:i+4], 0) // CRC
     963           1 :         binary.LittleEndian.PutUint16(b.buf[i+4:i+6], 0) // Size
     964           1 :         b.buf[i+6] = recyclableFullChunkType
     965           1 :         binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum+1) // Log number
     966           1 :         b.written.Store(i + int32(recyclableHeaderSize))
     967           1 : }
     968             : 
     969           1 : func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte) {
     970           1 :         b := w.block
     971           1 :         i := b.written.Load()
     972           1 :         first := n == 0
     973           1 :         last := blockSize-i-recyclableHeaderSize >= int32(len(p))
     974           1 : 
     975           1 :         if last {
     976           1 :                 if first {
     977           1 :                         b.buf[i+6] = recyclableFullChunkType
     978           1 :                 } else {
     979           1 :                         b.buf[i+6] = recyclableLastChunkType
     980           1 :                 }
     981           1 :         } else {
     982           1 :                 if first {
     983           1 :                         b.buf[i+6] = recyclableFirstChunkType
     984           1 :                 } else {
     985           1 :                         b.buf[i+6] = recyclableMiddleChunkType
     986           1 :                 }
     987             :         }
     988             : 
     989           1 :         binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum)
     990           1 : 
     991           1 :         r := copy(b.buf[i+recyclableHeaderSize:], p)
     992           1 :         j := i + int32(recyclableHeaderSize+r)
     993           1 :         binary.LittleEndian.PutUint32(b.buf[i+0:i+4], crc.New(b.buf[i+6:j]).Value())
     994           1 :         binary.LittleEndian.PutUint16(b.buf[i+4:i+6], uint16(r))
     995           1 :         b.written.Store(j)
     996           1 : 
     997           1 :         if blockSize-b.written.Load() < recyclableHeaderSize {
     998           1 :                 // There is no room for another fragment in the block, so fill the
     999           1 :                 // remaining bytes with zeros and queue the block for flushing.
    1000           1 :                 clear(b.buf[b.written.Load():])
    1001           1 :                 w.queueBlock()
    1002           1 :         }
    1003           1 :         return p[r:]
    1004             : }
    1005             : 
    1006             : // Metrics must typically be called after Close, since the callee will no
    1007             : // longer modify the returned LogWriterMetrics. It is also current if there is
    1008             : // nothing left to flush in the flush loop, but that is an implementation
    1009             : // detail that callers should not rely on.
    1010           1 : func (w *LogWriter) Metrics() LogWriterMetrics {
    1011           1 :         w.flusher.Lock()
    1012           1 :         defer w.flusher.Unlock()
    1013           1 :         m := *w.flusher.metrics
    1014           1 :         return m
    1015           1 : }
    1016             : 
    1017             : // LogWriterMetrics contains misc metrics for the log writer.
    1018             : type LogWriterMetrics struct {
    1019             :         WriteThroughput  base.ThroughputMetric
    1020             :         PendingBufferLen base.GaugeSampleMetric
    1021             :         SyncQueueLen     base.GaugeSampleMetric
    1022             : }
    1023             : 
    1024             : // Merge merges metrics from x. Requires that x is non-nil.
    1025           1 : func (m *LogWriterMetrics) Merge(x *LogWriterMetrics) error {
    1026           1 :         m.WriteThroughput.Merge(x.WriteThroughput)
    1027           1 :         m.PendingBufferLen.Merge(x.PendingBufferLen)
    1028           1 :         m.SyncQueueLen.Merge(x.SyncQueueLen)
    1029           1 :         return nil
    1030           1 : }

Generated by: LCOV version 1.14