LCOV - code coverage report
Current view: top level - pebble/record - log_writer.go (source / functions) Hit Total Coverage
Test: 2024-02-28 08:15Z 70c13977 - meta test only.lcov Lines: 385 540 71.3 %
Date: 2024-02-28 08:16:39 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package record
       6             : 
       7             : import (
       8             :         "context"
       9             :         "encoding/binary"
      10             :         "io"
      11             :         "runtime/pprof"
      12             :         "sync"
      13             :         "sync/atomic"
      14             :         "time"
      15             : 
      16             :         "github.com/cockroachdb/errors"
      17             :         "github.com/cockroachdb/pebble/internal/base"
      18             :         "github.com/cockroachdb/pebble/internal/crc"
      19             :         "github.com/prometheus/client_golang/prometheus"
      20             : )
      21             : 
      22             : var walSyncLabels = pprof.Labels("pebble", "wal-sync")
      23             : var errClosedWriter = errors.New("pebble/record: closed LogWriter")
      24             : 
      25             : type block struct {
      26             :         // buf[:written] has already been filled with fragments. Updated atomically.
      27             :         written atomic.Int32
      28             :         // buf[:flushed] has already been flushed to w.
      29             :         flushed int32
      30             :         buf     [blockSize]byte
      31             : }
      32             : 
      33             : type flusher interface {
      34             :         Flush() error
      35             : }
      36             : 
      37             : type syncer interface {
      38             :         Sync() error
      39             : }
      40             : 
      41             : const (
      42             :         syncConcurrencyBits = 12
      43             : 
      44             :         // SyncConcurrency is the maximum number of concurrent sync operations that
      45             :         // can be performed. Note that a sync operation is initiated either by a call
      46             :         // to SyncRecord or by a call to Close. Exported as this value also limits
      47             :         // the commit concurrency in commitPipeline.
      48             :         SyncConcurrency = 1 << syncConcurrencyBits
      49             : )
      50             : 
      51             : type syncSlot struct {
      52             :         wg  *sync.WaitGroup
      53             :         err *error
      54             : }
      55             : 
      56             : // syncQueue is a lock-free fixed-size single-producer, single-consumer
      57             : // queue. The single-producer can push to the head, and the single-consumer can
      58             : // pop multiple values from the tail. Popping calls Done() on each of the
      59             : // available *sync.WaitGroup elements.
      60             : type syncQueue struct {
      61             :         // headTail packs together a 32-bit head index and a 32-bit tail index. Both
      62             :         // are indexes into slots modulo len(slots)-1.
      63             :         //
      64             :         // tail = index of oldest data in queue
      65             :         // head = index of next slot to fill
      66             :         //
      67             :         // Slots in the range [tail, head) are owned by consumers.  A consumer
      68             :         // continues to own a slot outside this range until it nils the slot, at
      69             :         // which point ownership passes to the producer.
      70             :         //
      71             :         // The head index is stored in the most-significant bits so that we can
      72             :         // atomically add to it and the overflow is harmless.
      73             :         headTail atomic.Uint64
      74             : 
      75             :         // slots is a ring buffer of values stored in this queue. The size must be a
      76             :         // power of 2. A slot is in use until the tail index has moved beyond it.
      77             :         slots [SyncConcurrency]syncSlot
      78             : 
      79             :         // blocked is an atomic boolean which indicates whether syncing is currently
      80             :         // blocked or can proceed. It is used by the implementation of
      81             :         // min-sync-interval to block syncing until the min interval has passed.
      82             :         blocked atomic.Bool
      83             : }
      84             : 
      85             : const dequeueBits = 32
      86             : 
      87           1 : func (q *syncQueue) unpack(ptrs uint64) (head, tail uint32) {
      88           1 :         const mask = 1<<dequeueBits - 1
      89           1 :         head = uint32((ptrs >> dequeueBits) & mask)
      90           1 :         tail = uint32(ptrs & mask)
      91           1 :         return
      92           1 : }
      93             : 
      94           1 : func (q *syncQueue) push(wg *sync.WaitGroup, err *error) {
      95           1 :         ptrs := q.headTail.Load()
      96           1 :         head, tail := q.unpack(ptrs)
      97           1 :         if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
      98           0 :                 panic("pebble: queue is full")
      99             :         }
     100             : 
     101           1 :         slot := &q.slots[head&uint32(len(q.slots)-1)]
     102           1 :         slot.wg = wg
     103           1 :         slot.err = err
     104           1 : 
     105           1 :         // Increment head. This passes ownership of slot to dequeue and acts as a
     106           1 :         // store barrier for writing the slot.
     107           1 :         q.headTail.Add(1 << dequeueBits)
     108             : }
     109             : 
     110           0 : func (q *syncQueue) setBlocked() {
     111           0 :         q.blocked.Store(true)
     112           0 : }
     113             : 
     114           1 : func (q *syncQueue) clearBlocked() {
     115           1 :         q.blocked.Store(false)
     116           1 : }
     117             : 
     118           1 : func (q *syncQueue) empty() bool {
     119           1 :         head, tail, _ := q.load()
     120           1 :         return head == tail
     121           1 : }
     122             : 
     123             : // load returns the head, tail of the queue for what should be synced to the
     124             : // caller. It can return a head, tail of zero if syncing is blocked due to
     125             : // min-sync-interval. It additionally returns the real length of this queue,
     126             : // regardless of whether syncing is blocked.
     127           1 : func (q *syncQueue) load() (head, tail, realLength uint32) {
     128           1 :         ptrs := q.headTail.Load()
     129           1 :         head, tail = q.unpack(ptrs)
     130           1 :         realLength = head - tail
     131           1 :         if q.blocked.Load() {
     132           0 :                 return 0, 0, realLength
     133           0 :         }
     134           1 :         return head, tail, realLength
     135             : }
     136             : 
     137             : // REQUIRES: queueSemChan is non-nil.
     138           1 : func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{}) error {
     139           1 :         if tail == head {
     140           0 :                 // Queue is empty.
     141           0 :                 return nil
     142           0 :         }
     143             : 
     144           1 :         for ; tail != head; tail++ {
     145           1 :                 slot := &q.slots[tail&uint32(len(q.slots)-1)]
     146           1 :                 wg := slot.wg
     147           1 :                 if wg == nil {
     148           0 :                         return errors.Errorf("nil waiter at %d", errors.Safe(tail&uint32(len(q.slots)-1)))
     149           0 :                 }
     150           1 :                 *slot.err = err
     151           1 :                 slot.wg = nil
     152           1 :                 slot.err = nil
     153           1 :                 // We need to bump the tail count before releasing the queueSemChan
     154           1 :                 // semaphore as releasing the semaphore can cause a blocked goroutine to
     155           1 :                 // acquire the semaphore and enqueue before we've "freed" space in the
     156           1 :                 // queue.
     157           1 :                 q.headTail.Add(1)
     158           1 :                 wg.Done()
     159           1 :                 // Is always non-nil in production, unless using wal package for WAL
     160           1 :                 // failover.
     161           1 :                 if queueSemChan != nil {
     162           1 :                         <-queueSemChan
     163           1 :                 }
     164             :         }
     165             : 
     166           1 :         return nil
     167             : }
     168             : 
     169             : // pendingSyncs abstracts out the handling of pending sync requests. In
     170             : // standalone mode the implementation is a thin wrapper around syncQueue. In
     171             : // the mode where the LogWriter can be subject to failover, there is no queue
     172             : // kept in the LogWriter and the signaling to those waiting for sync is
     173             : // handled in the wal package.
     174             : //
     175             : // To avoid heap allocations due to the use of this interface, the parameters
     176             : // and return values follow some strict rules:
     177             : //   - The PendingSync parameter can be reused by the caller after push returns.
     178             : //     The implementation should be a pointer backed by a struct that is already
     179             : //     heap allocated, which the caller can reuse for the next push call.
     180             : //   - The pendingSyncSnapshot return value must be backed by the pendingSyncs
     181             : //     implementation, so calling snapshotForPop again will cause the previous
     182             : //     snapshot to be overwritten.
     183             : type pendingSyncs interface {
     184             :         push(PendingSync)
     185             :         setBlocked()
     186             :         clearBlocked()
     187             :         empty() bool
     188             :         snapshotForPop() pendingSyncsSnapshot
     189             :         pop(snap pendingSyncsSnapshot, err error) error
     190             : }
     191             : 
     192             : type pendingSyncsSnapshot interface {
     193             :         empty() bool
     194             : }
     195             : 
     196             : // PendingSync abstracts the sync specification for a record queued on the
     197             : // LogWriter. The only implementations are provided in this package since
     198             : // syncRequested is not exported.
     199             : type PendingSync interface {
     200             :         syncRequested() bool
     201             : }
     202             : 
     203             : // The implementation of pendingSyncs in standalone mode.
     204             : type pendingSyncsWithSyncQueue struct {
     205             :         syncQueue
     206             :         syncQueueLen    *base.GaugeSampleMetric
     207             :         snapshotBacking syncQueueSnapshot
     208             :         // See the comment for LogWriterConfig.QueueSemChan.
     209             :         queueSemChan chan struct{}
     210             : }
     211             : 
     212             : var _ pendingSyncs = &pendingSyncsWithSyncQueue{}
     213             : 
     214           1 : func (q *pendingSyncsWithSyncQueue) push(ps PendingSync) {
     215           1 :         ps2 := ps.(*pendingSyncForSyncQueue)
     216           1 :         q.syncQueue.push(ps2.wg, ps2.err)
     217           1 : }
     218             : 
     219           1 : func (q *pendingSyncsWithSyncQueue) snapshotForPop() pendingSyncsSnapshot {
     220           1 :         head, tail, realLength := q.syncQueue.load()
     221           1 :         q.snapshotBacking = syncQueueSnapshot{
     222           1 :                 head: head,
     223           1 :                 tail: tail,
     224           1 :         }
     225           1 :         q.syncQueueLen.AddSample(int64(realLength))
     226           1 :         return &q.snapshotBacking
     227           1 : }
     228             : 
     229           1 : func (q *pendingSyncsWithSyncQueue) pop(snap pendingSyncsSnapshot, err error) error {
     230           1 :         s := snap.(*syncQueueSnapshot)
     231           1 :         return q.syncQueue.pop(s.head, s.tail, err, q.queueSemChan)
     232           1 : }
     233             : 
     234             : // The implementation of pendingSyncsSnapshot in standalone mode.
     235             : type syncQueueSnapshot struct {
     236             :         head, tail uint32
     237             : }
     238             : 
     239           1 : func (s *syncQueueSnapshot) empty() bool {
     240           1 :         return s.head == s.tail
     241           1 : }
     242             : 
     243             : // The implementation of pendingSync in standalone mode.
     244             : type pendingSyncForSyncQueue struct {
     245             :         wg  *sync.WaitGroup
     246             :         err *error
     247             : }
     248             : 
     249           1 : func (ps *pendingSyncForSyncQueue) syncRequested() bool {
     250           1 :         return ps.wg != nil
     251           1 : }
     252             : 
     253             : // The implementation of pendingSyncs in failover mode.
     254             : type pendingSyncsWithHighestSyncIndex struct {
     255             :         // The highest "index" queued that is requesting a sync. Initialized
     256             :         // to NoSyncIndex, and reset to NoSyncIndex after the sync.
     257             :         index           atomic.Int64
     258             :         snapshotBacking PendingSyncIndex
     259             :         // blocked is an atomic boolean which indicates whether syncing is currently
     260             :         // blocked or can proceed. It is used by the implementation of
     261             :         // min-sync-interval to block syncing until the min interval has passed.
     262             :         blocked                   atomic.Bool
     263             :         externalSyncQueueCallback ExternalSyncQueueCallback
     264             : }
     265             : 
     266             : // NoSyncIndex is the value of PendingSyncIndex when a sync is not requested.
     267             : const NoSyncIndex = -1
     268             : 
     269             : func (si *pendingSyncsWithHighestSyncIndex) init(
     270             :         externalSyncQueueCallback ExternalSyncQueueCallback,
     271           0 : ) {
     272           0 :         si.index.Store(NoSyncIndex)
     273           0 :         si.externalSyncQueueCallback = externalSyncQueueCallback
     274           0 : }
     275             : 
     276           0 : func (si *pendingSyncsWithHighestSyncIndex) push(ps PendingSync) {
     277           0 :         ps2 := ps.(*PendingSyncIndex)
     278           0 :         si.index.Store(ps2.Index)
     279           0 : }
     280             : 
     281           0 : func (si *pendingSyncsWithHighestSyncIndex) setBlocked() {
     282           0 :         si.blocked.Store(true)
     283           0 : }
     284             : 
     285           0 : func (si *pendingSyncsWithHighestSyncIndex) clearBlocked() {
     286           0 :         si.blocked.Store(false)
     287           0 : }
     288             : 
     289           0 : func (si *pendingSyncsWithHighestSyncIndex) empty() bool {
     290           0 :         return si.load() == NoSyncIndex
     291           0 : }
     292             : 
     293           0 : func (si *pendingSyncsWithHighestSyncIndex) snapshotForPop() pendingSyncsSnapshot {
     294           0 :         si.snapshotBacking = PendingSyncIndex{Index: si.load()}
     295           0 :         return &si.snapshotBacking
     296           0 : }
     297             : 
     298           0 : func (si *pendingSyncsWithHighestSyncIndex) load() int64 {
     299           0 :         index := si.index.Load()
     300           0 :         if index != NoSyncIndex && si.blocked.Load() {
     301           0 :                 index = NoSyncIndex
     302           0 :         }
     303           0 :         return index
     304             : }
     305             : 
     306           0 : func (si *pendingSyncsWithHighestSyncIndex) pop(snap pendingSyncsSnapshot, err error) error {
     307           0 :         index := snap.(*PendingSyncIndex)
     308           0 :         if index.Index == NoSyncIndex {
     309           0 :                 return nil
     310           0 :         }
     311             :         // Set to NoSyncIndex if a higher index has not queued.
     312           0 :         si.index.CompareAndSwap(index.Index, NoSyncIndex)
     313           0 :         si.externalSyncQueueCallback(*index, err)
     314           0 :         return nil
     315             : }
     316             : 
     317             : // PendingSyncIndex implements both pendingSyncsSnapshot and PendingSync.
     318             : type PendingSyncIndex struct {
     319             :         // Index is some state meaningful to the user of LogWriter. The LogWriter
     320             :         // itself only examines whether Index is equal to NoSyncIndex.
     321             :         Index int64
     322             : }
     323             : 
     324           0 : func (s *PendingSyncIndex) empty() bool {
     325           0 :         return s.Index == NoSyncIndex
     326           0 : }
     327             : 
     328           0 : func (s *PendingSyncIndex) syncRequested() bool {
     329           0 :         return s.Index != NoSyncIndex
     330           0 : }
     331             : 
     332             : // flusherCond is a specialized condition variable that allows its condition to
     333             : // change and readiness be signalled without holding its associated mutex. In
     334             : // particular, when a waiter is added to syncQueue atomically, this condition
     335             : // variable can be signalled without holding flusher.Mutex.
     336             : type flusherCond struct {
     337             :         mu   *sync.Mutex
     338             :         q    pendingSyncs
     339             :         cond sync.Cond
     340             : }
     341             : 
     342           1 : func (c *flusherCond) init(mu *sync.Mutex, q pendingSyncs) {
     343           1 :         c.mu = mu
     344           1 :         c.q = q
     345           1 :         // Yes, this is a bit circular, but that is intentional. flusherCond.cond.L
     346           1 :         // points flusherCond so that when cond.L.Unlock is called flusherCond.Unlock
     347           1 :         // will be called and we can check the !syncQueue.empty() condition.
     348           1 :         c.cond.L = c
     349           1 : }
     350             : 
     351           1 : func (c *flusherCond) Signal() {
     352           1 :         // Pass-through to the cond var.
     353           1 :         c.cond.Signal()
     354           1 : }
     355             : 
     356           1 : func (c *flusherCond) Wait() {
     357           1 :         // Pass-through to the cond var. Note that internally the cond var implements
     358           1 :         // Wait as:
     359           1 :         //
     360           1 :         //   t := notifyListAdd()
     361           1 :         //   L.Unlock()
     362           1 :         //   notifyListWait(t)
     363           1 :         //   L.Lock()
     364           1 :         //
     365           1 :         // We've configured the cond var to call flusherReady.Unlock() which allows
     366           1 :         // us to check the !syncQueue.empty() condition without a danger of missing a
     367           1 :         // notification. Any call to flusherReady.Signal() after notifyListAdd() is
     368           1 :         // called will cause the subsequent notifyListWait() to return immediately.
     369           1 :         c.cond.Wait()
     370           1 : }
     371             : 
     372           1 : func (c *flusherCond) Lock() {
     373           1 :         c.mu.Lock()
     374           1 : }
     375             : 
     376           1 : func (c *flusherCond) Unlock() {
     377           1 :         c.mu.Unlock()
     378           1 :         if !c.q.empty() {
     379           1 :                 // If the current goroutine is about to block on sync.Cond.Wait, this call
     380           1 :                 // to Signal will prevent that. The comment in Wait above explains a bit
     381           1 :                 // about what is going on here, but it is worth reiterating:
     382           1 :                 //
     383           1 :                 //   flusherCond.Wait()
     384           1 :                 //     sync.Cond.Wait()
     385           1 :                 //       t := notifyListAdd()
     386           1 :                 //       flusherCond.Unlock()    <-- we are here
     387           1 :                 //       notifyListWait(t)
     388           1 :                 //       flusherCond.Lock()
     389           1 :                 //
     390           1 :                 // The call to Signal here results in:
     391           1 :                 //
     392           1 :                 //     sync.Cond.Signal()
     393           1 :                 //       notifyListNotifyOne()
     394           1 :                 //
     395           1 :                 // The call to notifyListNotifyOne() will prevent the call to
     396           1 :                 // notifyListWait(t) from blocking.
     397           1 :                 c.cond.Signal()
     398           1 :         }
     399             : }
     400             : 
     401             : type durationFunc func() time.Duration
     402             : 
     403             : // syncTimer is an interface for timers, modeled on the closure callback mode
     404             : // of time.Timer. See time.AfterFunc and LogWriter.afterFunc. syncTimer is used
     405             : // by tests to mock out the timer functionality used to implement
     406             : // min-sync-interval.
     407             : type syncTimer interface {
     408             :         Reset(time.Duration) bool
     409             :         Stop() bool
     410             : }
     411             : 
     412             : // LogWriter writes records to an underlying io.Writer. In order to support WAL
     413             : // file reuse, a LogWriter's records are tagged with the WAL's file
     414             : // number. When reading a log file a record from a previous incarnation of the
     415             : // file will return the error ErrInvalidLogNum.
     416             : type LogWriter struct {
     417             :         // w is the underlying writer.
     418             :         w io.Writer
     419             :         // c is w as a closer.
     420             :         c io.Closer
     421             :         // s is w as a syncer.
     422             :         s syncer
     423             :         // logNum is the low 32-bits of the log's file number.
     424             :         logNum uint32
     425             :         // blockNum is the zero based block number for the current block.
     426             :         blockNum int64
     427             :         // err is any accumulated error. It originates in flusher.err, and is
     428             :         // updated to reflect flusher.err when a block is full and getting enqueued.
     429             :         // Therefore, there is a lag between when flusher.err has a non-nil error,
     430             :         // and when that non-nil error is reflected in LogWriter.err. On close, it
     431             :         // is set to errClosedWriter to inform accidental future calls to
     432             :         // SyncRecord*.
     433             :         err error
     434             :         // block is the current block being written. Protected by flusher.Mutex.
     435             :         block *block
     436             :         free  struct {
     437             :                 sync.Mutex
     438             :                 blocks []*block
     439             :         }
     440             : 
     441             :         flusher struct {
     442             :                 sync.Mutex
     443             :                 // Flusher ready is a condition variable that is signalled when there are
     444             :                 // blocks to flush, syncing has been requested, or the LogWriter has been
     445             :                 // closed. For signalling of a sync, it is safe to call without holding
     446             :                 // flusher.Mutex.
     447             :                 ready flusherCond
     448             :                 // Set to true when the flush loop should be closed.
     449             :                 close bool
     450             :                 // Closed when the flush loop has terminated.
     451             :                 closed chan struct{}
     452             :                 // Accumulated flush error.
     453             :                 err error
     454             :                 // minSyncInterval is the minimum duration between syncs.
     455             :                 minSyncInterval durationFunc
     456             :                 fsyncLatency    prometheus.Histogram
     457             :                 pending         []*block
     458             :                 // Pushing and popping from pendingSyncs does not require flusher mutex to
     459             :                 // be held.
     460             :                 pendingSyncs pendingSyncs
     461             :                 metrics      *LogWriterMetrics
     462             :         }
     463             : 
     464             :         // afterFunc is a hook to allow tests to mock out the timer functionality
     465             :         // used for min-sync-interval. In normal operation this points to
     466             :         // time.AfterFunc.
     467             :         afterFunc func(d time.Duration, f func()) syncTimer
     468             : 
     469             :         // Backing for both pendingSyncs implementations.
     470             :         pendingSyncsBackingQ     pendingSyncsWithSyncQueue
     471             :         pendingSyncsBackingIndex pendingSyncsWithHighestSyncIndex
     472             : 
     473             :         pendingSyncForSyncQueueBacking pendingSyncForSyncQueue
     474             : }
     475             : 
     476             : // LogWriterConfig is a struct used for configuring new LogWriters
     477             : type LogWriterConfig struct {
     478             :         WALMinSyncInterval durationFunc
     479             :         WALFsyncLatency    prometheus.Histogram
     480             :         // QueueSemChan is an optional channel to pop from when popping from
     481             :         // LogWriter.flusher.syncQueue. It functions as a semaphore that prevents
     482             :         // the syncQueue from overflowing (which will cause a panic). All production
     483             :         // code ensures this is non-nil.
     484             :         QueueSemChan chan struct{}
     485             : 
     486             :         // ExternalSyncQueueCallback is set to non-nil when the LogWriter is used
     487             :         // as part of a WAL implementation that can failover between LogWriters.
     488             :         //
     489             :         // In this case, QueueSemChan is always nil, and SyncRecordGeneralized must
     490             :         // be used with a PendingSync parameter that is implemented by
     491             :         // PendingSyncIndex. When an index is synced (which implies all earlier
     492             :         // indices are also synced), this callback is invoked. The caller must not
     493             :         // hold any mutex when invoking this callback, since the lock ordering
     494             :         // requirement in this case is that any higher layer locks (in the wal
     495             :         // package) precede the lower layer locks (in the record package). These
     496             :         // callbacks are serialized since they are invoked from the flushLoop.
     497             :         ExternalSyncQueueCallback ExternalSyncQueueCallback
     498             : }
     499             : 
     500             : // ExternalSyncQueueCallback is to be run when a PendingSync has been
     501             : // processed, either successfully or with an error.
     502             : type ExternalSyncQueueCallback func(doneSync PendingSyncIndex, err error)
     503             : 
     504             : // initialAllocatedBlocksCap is the initial capacity of the various slices
     505             : // intended to hold LogWriter blocks. The LogWriter may allocate more blocks
     506             : // than this threshold allows.
     507             : const initialAllocatedBlocksCap = 32
     508             : 
     509             : // blockPool pools *blocks to avoid allocations. Blocks are only added to the
     510             : // Pool when a LogWriter is closed. Before that, free blocks are maintained
     511             : // within a LogWriter's own internal free list `w.free.blocks`.
     512             : var blockPool = sync.Pool{
     513           1 :         New: func() any { return &block{} },
     514             : }
     515             : 
     516             : // NewLogWriter returns a new LogWriter.
     517             : //
     518             : // The io.Writer may also be used as an io.Closer and syncer. No other methods
     519             : // will be called on the writer.
     520             : func NewLogWriter(
     521             :         w io.Writer, logNum base.DiskFileNum, logWriterConfig LogWriterConfig,
     522           1 : ) *LogWriter {
     523           1 :         c, _ := w.(io.Closer)
     524           1 :         s, _ := w.(syncer)
     525           1 :         r := &LogWriter{
     526           1 :                 w: w,
     527           1 :                 c: c,
     528           1 :                 s: s,
     529           1 :                 // NB: we truncate the 64-bit log number to 32-bits. This is ok because a)
     530           1 :                 // we are very unlikely to reach a file number of 4 billion and b) the log
     531           1 :                 // number is used as a validation check and using only the low 32-bits is
     532           1 :                 // sufficient for that purpose.
     533           1 :                 logNum: uint32(logNum),
     534           1 :                 afterFunc: func(d time.Duration, f func()) syncTimer {
     535           0 :                         return time.AfterFunc(d, f)
     536           0 :                 },
     537             :         }
     538           1 :         m := &LogWriterMetrics{}
     539           1 :         if logWriterConfig.ExternalSyncQueueCallback != nil {
     540           0 :                 r.pendingSyncsBackingIndex.init(logWriterConfig.ExternalSyncQueueCallback)
     541           0 :                 r.flusher.pendingSyncs = &r.pendingSyncsBackingIndex
     542           1 :         } else {
     543           1 :                 r.pendingSyncsBackingQ = pendingSyncsWithSyncQueue{
     544           1 :                         syncQueueLen: &m.SyncQueueLen,
     545           1 :                         queueSemChan: logWriterConfig.QueueSemChan,
     546           1 :                 }
     547           1 :                 r.flusher.pendingSyncs = &r.pendingSyncsBackingQ
     548           1 :         }
     549             : 
     550           1 :         r.free.blocks = make([]*block, 0, initialAllocatedBlocksCap)
     551           1 :         r.block = blockPool.Get().(*block)
     552           1 :         r.flusher.ready.init(&r.flusher.Mutex, r.flusher.pendingSyncs)
     553           1 :         r.flusher.closed = make(chan struct{})
     554           1 :         r.flusher.pending = make([]*block, 0, cap(r.free.blocks))
     555           1 :         r.flusher.metrics = m
     556           1 : 
     557           1 :         f := &r.flusher
     558           1 :         f.minSyncInterval = logWriterConfig.WALMinSyncInterval
     559           1 :         f.fsyncLatency = logWriterConfig.WALFsyncLatency
     560           1 : 
     561           1 :         go func() {
     562           1 :                 pprof.Do(context.Background(), walSyncLabels, r.flushLoop)
     563           1 :         }()
     564           1 :         return r
     565             : }
     566             : 
     567           1 : func (w *LogWriter) flushLoop(context.Context) {
     568           1 :         f := &w.flusher
     569           1 :         f.Lock()
     570           1 : 
     571           1 :         // Initialize idleStartTime to when the loop starts.
     572           1 :         idleStartTime := time.Now()
     573           1 :         var syncTimer syncTimer
     574           1 :         defer func() {
     575           1 :                 // Capture the idle duration between the last piece of work and when the
     576           1 :                 // loop terminated.
     577           1 :                 f.metrics.WriteThroughput.IdleDuration += time.Since(idleStartTime)
     578           1 :                 if syncTimer != nil {
     579           0 :                         syncTimer.Stop()
     580           0 :                 }
     581           1 :                 close(f.closed)
     582           1 :                 f.Unlock()
     583             :         }()
     584             : 
     585             :         // The flush loop performs flushing of full and partial data blocks to the
     586             :         // underlying writer (LogWriter.w), syncing of the writer, and notification
     587             :         // to sync requests that they have completed.
     588             :         //
     589             :         // - flusher.ready is a condition variable that is signalled when there is
     590             :         //   work to do. Full blocks are contained in flusher.pending. The current
     591             :         //   partial block is in LogWriter.block. And sync operations are held in
     592             :         //   flusher.syncQ.
     593             :         //
     594             :         // - The decision to sync is determined by whether there are any sync
     595             :         //   requests present in flusher.syncQ and whether enough time has elapsed
     596             :         //   since the last sync. If not enough time has elapsed since the last sync,
     597             :         //   flusher.syncQ.blocked will be set to 1. If syncing is blocked,
     598             :         //   syncQueue.empty() will return true and syncQueue.load() will return 0,0
     599             :         //   (i.e. an empty list).
     600             :         //
     601             :         // - flusher.syncQ.blocked is cleared by a timer that is initialized when
     602             :         //   blocked is set to 1. When blocked is 1, no syncing will take place, but
     603             :         //   flushing will continue to be performed. The on/off toggle for syncing
     604             :         //   does not need to be carefully synchronized with the rest of processing
     605             :         //   -- all we need to ensure is that after any transition to blocked=1 there
     606             :         //   is eventually a transition to blocked=0. syncTimer performs this
     607             :         //   transition. Note that any change to min-sync-interval will not take
     608             :         //   effect until the previous timer elapses.
     609             :         //
     610             :         // - Picking up the syncing work to perform requires coordination with
     611             :         //   picking up the flushing work. Specifically, flushing work is queued
     612             :         //   before syncing work. The guarantee of this code is that when a sync is
     613             :         //   requested, any previously queued flush work will be synced. This
     614             :         //   motivates reading the syncing work (f.syncQ.load()) before picking up
     615             :         //   the flush work (w.block.written.Load()).
     616             : 
     617             :         // The list of full blocks that need to be written. This is copied from
     618             :         // f.pending on every loop iteration, though the number of elements is
     619             :         // usually small (most frequently 1). In the case of the WAL LogWriter, the
     620             :         // number of blocks is bounded by the size of the WAL's corresponding
     621             :         // memtable (MemtableSize/BlockSize). With the default 64 MiB memtables,
     622             :         // this works out to at most 2048 elements if the entirety of the memtable's
     623             :         // contents are queued.
     624           1 :         pending := make([]*block, 0, cap(f.pending))
     625           1 :         for {
     626           1 :                 for {
     627           1 :                         // Grab the portion of the current block that requires flushing. Note that
     628           1 :                         // the current block can be added to the pending blocks list after we release
     629           1 :                         // the flusher lock, but it won't be part of pending.
     630           1 :                         written := w.block.written.Load()
     631           1 :                         if len(f.pending) > 0 || written > w.block.flushed || !f.pendingSyncs.empty() {
     632           1 :                                 break
     633             :                         }
     634           1 :                         if f.close {
     635           1 :                                 // If the writer is closed, pretend the sync timer fired immediately so
     636           1 :                                 // that we can process any queued sync requests.
     637           1 :                                 f.pendingSyncs.clearBlocked()
     638           1 :                                 if !f.pendingSyncs.empty() {
     639           0 :                                         break
     640             :                                 }
     641           1 :                                 return
     642             :                         }
     643           1 :                         f.ready.Wait()
     644           1 :                         continue
     645             :                 }
     646             :                 // Found work to do, so no longer idle.
     647             :                 //
     648             :                 // NB: it is safe to read pending before loading from the syncQ since
     649             :                 // mutations to pending require the w.flusher mutex, which is held here.
     650             :                 // There is no risk that someone will concurrently add to pending, so the
     651             :                 // following sequence, which would pick up a syncQ entry without the
     652             :                 // corresponding data, is impossible:
     653             :                 //
     654             :                 // Thread enqueueing       This thread
     655             :                 //                         1. read pending
     656             :                 // 2. add block to pending
     657             :                 // 3. add to syncQ
     658             :                 //                         4. read syncQ
     659           1 :                 workStartTime := time.Now()
     660           1 :                 idleDuration := workStartTime.Sub(idleStartTime)
     661           1 :                 pending = append(pending[:0], f.pending...)
     662           1 :                 f.pending = f.pending[:0]
     663           1 :                 f.metrics.PendingBufferLen.AddSample(int64(len(pending)))
     664           1 : 
     665           1 :                 // Grab the list of sync waiters. Note that syncQueue.load() will return
     666           1 :                 // 0,0 while we're waiting for the min-sync-interval to expire. This
     667           1 :                 // allows flushing to proceed even if we're not ready to sync.
     668           1 :                 snap := f.pendingSyncs.snapshotForPop()
     669           1 : 
     670           1 :                 // Grab the portion of the current block that requires flushing. Note that
     671           1 :                 // the current block can be added to the pending blocks list after we
     672           1 :                 // release the flusher lock, but it won't be part of pending. This has to
     673           1 :                 // be ordered after we get the list of sync waiters from syncQ in order to
     674           1 :                 // prevent a race where a waiter adds itself to syncQ, but this thread
     675           1 :                 // picks up the entry in syncQ and not the buffered data.
     676           1 :                 written := w.block.written.Load()
     677           1 :                 data := w.block.buf[w.block.flushed:written]
     678           1 :                 w.block.flushed = written
     679           1 : 
     680           1 :                 fErr := f.err
     681           1 :                 f.Unlock()
     682           1 :                 // If flusher has an error, we propagate it to waiters. Note in spite of
     683           1 :                 // error we consume the pending list above to free blocks for writers.
     684           1 :                 if fErr != nil {
     685           0 :                         // NB: pop may invoke ExternalSyncQueueCallback, which is why we have
     686           0 :                         // called f.Unlock() above. We will acquire the lock again below.
     687           0 :                         f.pendingSyncs.pop(snap, fErr)
     688           0 :                         // Update the idleStartTime if work could not be done, so that we don't
     689           0 :                         // include the duration we tried to do work as idle. We don't bother
     690           0 :                         // with the rest of the accounting, which means we will undercount.
     691           0 :                         idleStartTime = time.Now()
     692           0 :                         f.Lock()
     693           0 :                         continue
     694             :                 }
     695           1 :                 synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, snap)
     696           1 :                 f.Lock()
     697           1 :                 if synced && f.fsyncLatency != nil {
     698           1 :                         f.fsyncLatency.Observe(float64(syncLatency))
     699           1 :                 }
     700           1 :                 f.err = err
     701           1 :                 if f.err != nil {
     702           0 :                         f.pendingSyncs.clearBlocked()
     703           0 :                         // Update the idleStartTime if work could not be done, so that we don't
     704           0 :                         // include the duration we tried to do work as idle. We don't bother
     705           0 :                         // with the rest of the accounting, which means we will undercount.
     706           0 :                         idleStartTime = time.Now()
     707           0 :                         continue
     708             :                 }
     709             : 
     710           1 :                 if synced && f.minSyncInterval != nil {
     711           0 :                         // A sync was performed. Make sure we've waited for the min sync
     712           0 :                         // interval before syncing again.
     713           0 :                         if min := f.minSyncInterval(); min > 0 {
     714           0 :                                 f.pendingSyncs.setBlocked()
     715           0 :                                 if syncTimer == nil {
     716           0 :                                         syncTimer = w.afterFunc(min, func() {
     717           0 :                                                 f.pendingSyncs.clearBlocked()
     718           0 :                                                 f.ready.Signal()
     719           0 :                                         })
     720           0 :                                 } else {
     721           0 :                                         syncTimer.Reset(min)
     722           0 :                                 }
     723             :                         }
     724             :                 }
     725             :                 // Finished work, and started idling.
     726           1 :                 idleStartTime = time.Now()
     727           1 :                 workDuration := idleStartTime.Sub(workStartTime)
     728           1 :                 f.metrics.WriteThroughput.Bytes += bytesWritten
     729           1 :                 f.metrics.WriteThroughput.WorkDuration += workDuration
     730           1 :                 f.metrics.WriteThroughput.IdleDuration += idleDuration
     731             :         }
     732             : }
     733             : 
     734             : func (w *LogWriter) flushPending(
     735             :         data []byte, pending []*block, snap pendingSyncsSnapshot,
     736           1 : ) (synced bool, syncLatency time.Duration, bytesWritten int64, err error) {
     737           1 :         defer func() {
     738           1 :                 // Translate panics into errors. The errors will cause flushLoop to shut
     739           1 :                 // down, but allows us to do so in a controlled way and avoid swallowing
     740           1 :                 // the stack that created the panic if panic'ing itself hits a panic
     741           1 :                 // (e.g. unlock of unlocked mutex).
     742           1 :                 if r := recover(); r != nil {
     743           0 :                         err = errors.Newf("%v", r)
     744           0 :                 }
     745             :         }()
     746             : 
     747           1 :         for _, b := range pending {
     748           0 :                 bytesWritten += blockSize - int64(b.flushed)
     749           0 :                 if err = w.flushBlock(b); err != nil {
     750           0 :                         break
     751             :                 }
     752             :         }
     753           1 :         if n := len(data); err == nil && n > 0 {
     754           1 :                 bytesWritten += int64(n)
     755           1 :                 _, err = w.w.Write(data)
     756           1 :         }
     757             : 
     758           1 :         synced = !snap.empty()
     759           1 :         if synced {
     760           1 :                 if err == nil && w.s != nil {
     761           1 :                         syncLatency, err = w.syncWithLatency()
     762           1 :                 } else {
     763           0 :                         synced = false
     764           0 :                 }
     765           1 :                 f := &w.flusher
     766           1 :                 if popErr := f.pendingSyncs.pop(snap, err); popErr != nil {
     767           0 :                         return synced, syncLatency, bytesWritten, firstError(err, popErr)
     768           0 :                 }
     769             :         }
     770             : 
     771           1 :         return synced, syncLatency, bytesWritten, err
     772             : }
     773             : 
     774           1 : func (w *LogWriter) syncWithLatency() (time.Duration, error) {
     775           1 :         start := time.Now()
     776           1 :         err := w.s.Sync()
     777           1 :         syncLatency := time.Since(start)
     778           1 :         return syncLatency, err
     779           1 : }
     780             : 
     781           0 : func (w *LogWriter) flushBlock(b *block) error {
     782           0 :         if _, err := w.w.Write(b.buf[b.flushed:]); err != nil {
     783           0 :                 return err
     784           0 :         }
     785           0 :         b.written.Store(0)
     786           0 :         b.flushed = 0
     787           0 :         w.free.Lock()
     788           0 :         w.free.blocks = append(w.free.blocks, b)
     789           0 :         w.free.Unlock()
     790           0 :         return nil
     791             : }
     792             : 
     793             : // queueBlock queues the current block for writing to the underlying writer,
     794             : // allocates a new block and reserves space for the next header.
     795           0 : func (w *LogWriter) queueBlock() {
     796           0 :         // Allocate a new block, blocking until one is available. We do this first
     797           0 :         // because w.block is protected by w.flusher.Mutex.
     798           0 :         w.free.Lock()
     799           0 :         if len(w.free.blocks) == 0 {
     800           0 :                 w.free.blocks = append(w.free.blocks, blockPool.Get().(*block))
     801           0 :         }
     802           0 :         nextBlock := w.free.blocks[len(w.free.blocks)-1]
     803           0 :         w.free.blocks = w.free.blocks[:len(w.free.blocks)-1]
     804           0 :         w.free.Unlock()
     805           0 : 
     806           0 :         f := &w.flusher
     807           0 :         f.Lock()
     808           0 :         f.pending = append(f.pending, w.block)
     809           0 :         w.block = nextBlock
     810           0 :         f.ready.Signal()
     811           0 :         w.err = w.flusher.err
     812           0 :         f.Unlock()
     813           0 : 
     814           0 :         w.blockNum++
     815             : }
     816             : 
     817             : // Close flushes and syncs any unwritten data and closes the writer.
     818             : // Where required, external synchronisation is provided by commitPipeline.mu.
     819           1 : func (w *LogWriter) Close() error {
     820           1 :         return w.closeInternal(PendingSyncIndex{Index: NoSyncIndex})
     821           1 : }
     822             : 
     823             : // CloseWithLastQueuedRecord is like Close, but optionally accepts a
     824             : // lastQueuedRecord, that the caller will be notified about when synced.
     825           0 : func (w *LogWriter) CloseWithLastQueuedRecord(lastQueuedRecord PendingSyncIndex) error {
     826           0 :         return w.closeInternal(lastQueuedRecord)
     827           0 : }
     828             : 
     829           1 : func (w *LogWriter) closeInternal(lastQueuedRecord PendingSyncIndex) error {
     830           1 :         f := &w.flusher
     831           1 : 
     832           1 :         // Emit an EOF trailer signifying the end of this log. This helps readers
     833           1 :         // differentiate between a corrupted entry in the middle of a log from
     834           1 :         // garbage at the tail from a recycled log file.
     835           1 :         w.emitEOFTrailer()
     836           1 : 
     837           1 :         // Signal the flush loop to close.
     838           1 :         f.Lock()
     839           1 :         f.close = true
     840           1 :         f.ready.Signal()
     841           1 :         f.Unlock()
     842           1 : 
     843           1 :         // Wait for the flush loop to close. The flush loop will not close until all
     844           1 :         // pending data has been written or an error occurs.
     845           1 :         <-f.closed
     846           1 : 
     847           1 :         // Sync any flushed data to disk. NB: flushLoop will sync after flushing the
     848           1 :         // last buffered data only if it was requested via syncQ, so we need to sync
     849           1 :         // here to ensure that all the data is synced.
     850           1 :         err := w.flusher.err
     851           1 :         var syncLatency time.Duration
     852           1 :         if err == nil && w.s != nil {
     853           1 :                 syncLatency, err = w.syncWithLatency()
     854           1 :         }
     855           1 :         f.Lock()
     856           1 :         if err == nil && f.fsyncLatency != nil {
     857           1 :                 f.fsyncLatency.Observe(float64(syncLatency))
     858           1 :         }
     859           1 :         free := w.free.blocks
     860           1 :         f.Unlock()
     861           1 : 
     862           1 :         // NB: the caller of closeInternal may not care about a non-nil cerr below
     863           1 :         // if all queued writes have been successfully written and synced.
     864           1 :         if lastQueuedRecord.Index != NoSyncIndex {
     865           0 :                 w.pendingSyncsBackingIndex.externalSyncQueueCallback(lastQueuedRecord, err)
     866           0 :         }
     867           1 :         if w.c != nil {
     868           1 :                 cerr := w.c.Close()
     869           1 :                 w.c = nil
     870           1 :                 err = firstError(err, cerr)
     871           1 :         }
     872             : 
     873           1 :         for _, b := range free {
     874           0 :                 b.flushed = 0
     875           0 :                 b.written.Store(0)
     876           0 :                 blockPool.Put(b)
     877           0 :         }
     878             : 
     879           1 :         w.err = errClosedWriter
     880           1 :         return err
     881             : }
     882             : 
     883             : // firstError returns the first non-nil error of err0 and err1, or nil if both
     884             : // are nil.
     885           1 : func firstError(err0, err1 error) error {
     886           1 :         if err0 != nil {
     887           0 :                 return err0
     888           0 :         }
     889           1 :         return err1
     890             : }
     891             : 
     892             : // WriteRecord writes a complete record. Returns the offset just past the end
     893             : // of the record.
     894             : // External synchronisation provided by commitPipeline.mu.
     895           0 : func (w *LogWriter) WriteRecord(p []byte) (int64, error) {
     896           0 :         logSize, err := w.SyncRecord(p, nil, nil)
     897           0 :         return logSize, err
     898           0 : }
     899             : 
     900             : // SyncRecord writes a complete record. If wg != nil the record will be
     901             : // asynchronously persisted to the underlying writer and done will be called on
     902             : // the wait group upon completion. Returns the offset just past the end of the
     903             : // record.
     904             : // External synchronisation provided by commitPipeline.mu.
     905             : func (w *LogWriter) SyncRecord(
     906             :         p []byte, wg *sync.WaitGroup, err *error,
     907           1 : ) (logSize int64, err2 error) {
     908           1 :         w.pendingSyncForSyncQueueBacking = pendingSyncForSyncQueue{
     909           1 :                 wg:  wg,
     910           1 :                 err: err,
     911           1 :         }
     912           1 :         return w.SyncRecordGeneralized(p, &w.pendingSyncForSyncQueueBacking)
     913           1 : }
     914             : 
     915             : // SyncRecordGeneralized is a version of SyncRecord that accepts a
     916             : // PendingSync.
     917           1 : func (w *LogWriter) SyncRecordGeneralized(p []byte, ps PendingSync) (logSize int64, err2 error) {
     918           1 :         if w.err != nil {
     919           0 :                 return -1, w.err
     920           0 :         }
     921             : 
     922             :         // The `i == 0` condition ensures we handle empty records. Such records can
     923             :         // possibly be generated for VersionEdits stored in the MANIFEST. While the
     924             :         // MANIFEST is currently written using Writer, it is good to support the same
     925             :         // semantics with LogWriter.
     926           1 :         for i := 0; i == 0 || len(p) > 0; i++ {
     927           1 :                 p = w.emitFragment(i, p)
     928           1 :         }
     929             : 
     930           1 :         if ps.syncRequested() {
     931           1 :                 // If we've been asked to persist the record, add the WaitGroup to the sync
     932           1 :                 // queue and signal the flushLoop. Note that flushLoop will write partial
     933           1 :                 // blocks to the file if syncing has been requested. The contract is that
     934           1 :                 // any record written to the LogWriter to this point will be flushed to the
     935           1 :                 // OS and synced to disk.
     936           1 :                 f := &w.flusher
     937           1 :                 f.pendingSyncs.push(ps)
     938           1 :                 f.ready.Signal()
     939           1 :         }
     940             : 
     941           1 :         offset := w.blockNum*blockSize + int64(w.block.written.Load())
     942           1 :         // Note that we don't return w.err here as a concurrent call to Close would
     943           1 :         // race with our read. That's ok because the only error we could be seeing is
     944           1 :         // one to syncing for which the caller can receive notification of by passing
     945           1 :         // in a non-nil err argument.
     946           1 :         return offset, nil
     947             : }
     948             : 
     949             : // Size returns the current size of the file.
     950             : // External synchronisation provided by commitPipeline.mu.
     951           1 : func (w *LogWriter) Size() int64 {
     952           1 :         return w.blockNum*blockSize + int64(w.block.written.Load())
     953           1 : }
     954             : 
     955           1 : func (w *LogWriter) emitEOFTrailer() {
     956           1 :         // Write a recyclable chunk header with a different log number.  Readers
     957           1 :         // will treat the header as EOF when the log number does not match.
     958           1 :         b := w.block
     959           1 :         i := b.written.Load()
     960           1 :         binary.LittleEndian.PutUint32(b.buf[i+0:i+4], 0) // CRC
     961           1 :         binary.LittleEndian.PutUint16(b.buf[i+4:i+6], 0) // Size
     962           1 :         b.buf[i+6] = recyclableFullChunkType
     963           1 :         binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum+1) // Log number
     964           1 :         b.written.Store(i + int32(recyclableHeaderSize))
     965           1 : }
     966             : 
     967           1 : func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte) {
     968           1 :         b := w.block
     969           1 :         i := b.written.Load()
     970           1 :         first := n == 0
     971           1 :         last := blockSize-i-recyclableHeaderSize >= int32(len(p))
     972           1 : 
     973           1 :         if last {
     974           1 :                 if first {
     975           1 :                         b.buf[i+6] = recyclableFullChunkType
     976           1 :                 } else {
     977           0 :                         b.buf[i+6] = recyclableLastChunkType
     978           0 :                 }
     979           0 :         } else {
     980           0 :                 if first {
     981           0 :                         b.buf[i+6] = recyclableFirstChunkType
     982           0 :                 } else {
     983           0 :                         b.buf[i+6] = recyclableMiddleChunkType
     984           0 :                 }
     985             :         }
     986             : 
     987           1 :         binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum)
     988           1 : 
     989           1 :         r := copy(b.buf[i+recyclableHeaderSize:], p)
     990           1 :         j := i + int32(recyclableHeaderSize+r)
     991           1 :         binary.LittleEndian.PutUint32(b.buf[i+0:i+4], crc.New(b.buf[i+6:j]).Value())
     992           1 :         binary.LittleEndian.PutUint16(b.buf[i+4:i+6], uint16(r))
     993           1 :         b.written.Store(j)
     994           1 : 
     995           1 :         if blockSize-b.written.Load() < recyclableHeaderSize {
     996           0 :                 // There is no room for another fragment in the block, so fill the
     997           0 :                 // remaining bytes with zeros and queue the block for flushing.
     998           0 :                 clear(b.buf[b.written.Load():])
     999           0 :                 w.queueBlock()
    1000           0 :         }
    1001           1 :         return p[r:]
    1002             : }
    1003             : 
    1004             : // Metrics must typically be called after Close, since the callee will no
    1005             : // longer modify the returned LogWriterMetrics. It is also current if there is
    1006             : // nothing left to flush in the flush loop, but that is an implementation
    1007             : // detail that callers should not rely on.
    1008           1 : func (w *LogWriter) Metrics() LogWriterMetrics {
    1009           1 :         w.flusher.Lock()
    1010           1 :         defer w.flusher.Unlock()
    1011           1 :         m := *w.flusher.metrics
    1012           1 :         return m
    1013           1 : }
    1014             : 
    1015             : // LogWriterMetrics contains misc metrics for the log writer.
    1016             : type LogWriterMetrics struct {
    1017             :         WriteThroughput  base.ThroughputMetric
    1018             :         PendingBufferLen base.GaugeSampleMetric
    1019             :         SyncQueueLen     base.GaugeSampleMetric
    1020             : }
    1021             : 
    1022             : // Merge merges metrics from x. Requires that x is non-nil.
    1023           1 : func (m *LogWriterMetrics) Merge(x *LogWriterMetrics) error {
    1024           1 :         m.WriteThroughput.Merge(x.WriteThroughput)
    1025           1 :         m.PendingBufferLen.Merge(x.PendingBufferLen)
    1026           1 :         m.SyncQueueLen.Merge(x.SyncQueueLen)
    1027           1 :         return nil
    1028           1 : }

Generated by: LCOV version 1.14