LCOV - code coverage report
Current view: top level - pebble/record - log_writer.go (source / functions) Hit Total Coverage
Test: 2024-09-20 08:16Z aba5868b - meta test only.lcov Lines: 426 540 78.9 %
Date: 2024-09-20 08:17:27 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package record
       6             : 
       7             : import (
       8             :         "context"
       9             :         "encoding/binary"
      10             :         "io"
      11             :         "runtime/pprof"
      12             :         "sync"
      13             :         "sync/atomic"
      14             :         "time"
      15             : 
      16             :         "github.com/cockroachdb/errors"
      17             :         "github.com/cockroachdb/pebble/internal/base"
      18             :         "github.com/cockroachdb/pebble/internal/crc"
      19             :         "github.com/prometheus/client_golang/prometheus"
      20             : )
      21             : 
      22             : var walSyncLabels = pprof.Labels("pebble", "wal-sync")
      23             : var errClosedWriter = errors.New("pebble/record: closed LogWriter")
      24             : 
      25             : type block struct {
      26             :         // buf[:written] has already been filled with fragments. Updated atomically.
      27             :         written atomic.Int32
      28             :         // buf[:flushed] has already been flushed to w.
      29             :         flushed int32
      30             :         buf     [blockSize]byte
      31             : }
      32             : 
      33             : type flusher interface {
      34             :         Flush() error
      35             : }
      36             : 
      37             : type syncer interface {
      38             :         Sync() error
      39             : }
      40             : 
      41             : const (
      42             :         syncConcurrencyBits = 12
      43             : 
      44             :         // SyncConcurrency is the maximum number of concurrent sync operations that
      45             :         // can be performed. Note that a sync operation is initiated either by a call
      46             :         // to SyncRecord or by a call to Close. Exported as this value also limits
      47             :         // the commit concurrency in commitPipeline.
      48             :         SyncConcurrency = 1 << syncConcurrencyBits
      49             : )
      50             : 
      51             : type syncSlot struct {
      52             :         wg  *sync.WaitGroup
      53             :         err *error
      54             : }
      55             : 
      56             : // syncQueue is a lock-free fixed-size single-producer, single-consumer
      57             : // queue. The single-producer can push to the head, and the single-consumer can
      58             : // pop multiple values from the tail. Popping calls Done() on each of the
      59             : // available *sync.WaitGroup elements.
      60             : type syncQueue struct {
      61             :         // headTail packs together a 32-bit head index and a 32-bit tail index. Both
      62             :         // are indexes into slots modulo len(slots)-1.
      63             :         //
      64             :         // tail = index of oldest data in queue
      65             :         // head = index of next slot to fill
      66             :         //
      67             :         // Slots in the range [tail, head) are owned by consumers.  A consumer
      68             :         // continues to own a slot outside this range until it nils the slot, at
      69             :         // which point ownership passes to the producer.
      70             :         //
      71             :         // The head index is stored in the most-significant bits so that we can
      72             :         // atomically add to it and the overflow is harmless.
      73             :         headTail atomic.Uint64
      74             : 
      75             :         // slots is a ring buffer of values stored in this queue. The size must be a
      76             :         // power of 2. A slot is in use until the tail index has moved beyond it.
      77             :         slots [SyncConcurrency]syncSlot
      78             : 
      79             :         // blocked is an atomic boolean which indicates whether syncing is currently
      80             :         // blocked or can proceed. It is used by the implementation of
      81             :         // min-sync-interval to block syncing until the min interval has passed.
      82             :         blocked atomic.Bool
      83             : }
      84             : 
      85             : const dequeueBits = 32
      86             : 
      87             : // unpack extracts the head and tail indices from a 64-bit unsigned integer.
      88           1 : func (q *syncQueue) unpack(ptrs uint64) (head, tail uint32) {
      89           1 :         const mask = 1<<dequeueBits - 1
      90           1 :         head = uint32((ptrs >> dequeueBits) & mask)
      91           1 :         tail = uint32(ptrs & mask)
      92           1 :         return head, tail
      93           1 : }
      94             : 
      95           1 : func (q *syncQueue) push(wg *sync.WaitGroup, err *error) {
      96           1 :         ptrs := q.headTail.Load()
      97           1 :         head, tail := q.unpack(ptrs)
      98           1 :         if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
      99           0 :                 panic("pebble: queue is full")
     100             :         }
     101             : 
     102           1 :         slot := &q.slots[head&uint32(len(q.slots)-1)]
     103           1 :         slot.wg = wg
     104           1 :         slot.err = err
     105           1 : 
     106           1 :         // Increment head. This passes ownership of slot to dequeue and acts as a
     107           1 :         // store barrier for writing the slot.
     108           1 :         q.headTail.Add(1 << dequeueBits)
     109             : }
     110             : 
     111           0 : func (q *syncQueue) setBlocked() {
     112           0 :         q.blocked.Store(true)
     113           0 : }
     114             : 
     115           1 : func (q *syncQueue) clearBlocked() {
     116           1 :         q.blocked.Store(false)
     117           1 : }
     118             : 
     119           1 : func (q *syncQueue) empty() bool {
     120           1 :         head, tail, _ := q.load()
     121           1 :         return head == tail
     122           1 : }
     123             : 
     124             : // load returns the head, tail of the queue for what should be synced to the
     125             : // caller. It can return a head, tail of zero if syncing is blocked due to
     126             : // min-sync-interval. It additionally returns the real length of this queue,
     127             : // regardless of whether syncing is blocked.
     128           1 : func (q *syncQueue) load() (head, tail, realLength uint32) {
     129           1 :         ptrs := q.headTail.Load()
     130           1 :         head, tail = q.unpack(ptrs)
     131           1 :         realLength = head - tail
     132           1 :         if q.blocked.Load() {
     133           0 :                 return 0, 0, realLength
     134           0 :         }
     135           1 :         return head, tail, realLength
     136             : }
     137             : 
     138             : // REQUIRES: queueSemChan is non-nil.
     139           1 : func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{}) error {
     140           1 :         if tail == head {
     141           0 :                 // Queue is empty.
     142           0 :                 return nil
     143           0 :         }
     144             : 
     145           1 :         for ; tail != head; tail++ {
     146           1 :                 slot := &q.slots[tail&uint32(len(q.slots)-1)]
     147           1 :                 wg := slot.wg
     148           1 :                 if wg == nil {
     149           0 :                         return errors.Errorf("nil waiter at %d", errors.Safe(tail&uint32(len(q.slots)-1)))
     150           0 :                 }
     151           1 :                 *slot.err = err
     152           1 :                 slot.wg = nil
     153           1 :                 slot.err = nil
     154           1 :                 // We need to bump the tail count before releasing the queueSemChan
     155           1 :                 // semaphore as releasing the semaphore can cause a blocked goroutine to
     156           1 :                 // acquire the semaphore and enqueue before we've "freed" space in the
     157           1 :                 // queue.
     158           1 :                 q.headTail.Add(1)
     159           1 :                 wg.Done()
     160           1 :                 // Is always non-nil in production, unless using wal package for WAL
     161           1 :                 // failover.
     162           1 :                 if queueSemChan != nil {
     163           1 :                         <-queueSemChan
     164           1 :                 }
     165             :         }
     166             : 
     167           1 :         return nil
     168             : }
     169             : 
     170             : // pendingSyncs abstracts out the handling of pending sync requests. In
     171             : // standalone mode the implementation is a thin wrapper around syncQueue. In
     172             : // the mode where the LogWriter can be subject to failover, there is no queue
     173             : // kept in the LogWriter and the signaling to those waiting for sync is
     174             : // handled in the wal package.
     175             : //
     176             : // To avoid heap allocations due to the use of this interface, the parameters
     177             : // and return values follow some strict rules:
     178             : //   - The PendingSync parameter can be reused by the caller after push returns.
     179             : //     The implementation should be a pointer backed by a struct that is already
     180             : //     heap allocated, which the caller can reuse for the next push call.
     181             : //   - The pendingSyncSnapshot return value must be backed by the pendingSyncs
     182             : //     implementation, so calling snapshotForPop again will cause the previous
     183             : //     snapshot to be overwritten.
     184             : type pendingSyncs interface {
     185             :         push(PendingSync)
     186             :         setBlocked()
     187             :         clearBlocked()
     188             :         empty() bool
     189             :         snapshotForPop() pendingSyncsSnapshot
     190             :         pop(snap pendingSyncsSnapshot, err error) error
     191             : }
     192             : 
     193             : type pendingSyncsSnapshot interface {
     194             :         empty() bool
     195             : }
     196             : 
     197             : // PendingSync abstracts the sync specification for a record queued on the
     198             : // LogWriter. The only implementations are provided in this package since
     199             : // syncRequested is not exported.
     200             : type PendingSync interface {
     201             :         syncRequested() bool
     202             : }
     203             : 
     204             : // The implementation of pendingSyncs in standalone mode.
     205             : type pendingSyncsWithSyncQueue struct {
     206             :         syncQueue
     207             :         syncQueueLen    *base.GaugeSampleMetric
     208             :         snapshotBacking syncQueueSnapshot
     209             :         // See the comment for LogWriterConfig.QueueSemChan.
     210             :         queueSemChan chan struct{}
     211             : }
     212             : 
     213             : var _ pendingSyncs = &pendingSyncsWithSyncQueue{}
     214             : 
     215           1 : func (q *pendingSyncsWithSyncQueue) push(ps PendingSync) {
     216           1 :         ps2 := ps.(*pendingSyncForSyncQueue)
     217           1 :         q.syncQueue.push(ps2.wg, ps2.err)
     218           1 : }
     219             : 
     220           1 : func (q *pendingSyncsWithSyncQueue) snapshotForPop() pendingSyncsSnapshot {
     221           1 :         head, tail, realLength := q.syncQueue.load()
     222           1 :         q.snapshotBacking = syncQueueSnapshot{
     223           1 :                 head: head,
     224           1 :                 tail: tail,
     225           1 :         }
     226           1 :         q.syncQueueLen.AddSample(int64(realLength))
     227           1 :         return &q.snapshotBacking
     228           1 : }
     229             : 
     230           1 : func (q *pendingSyncsWithSyncQueue) pop(snap pendingSyncsSnapshot, err error) error {
     231           1 :         s := snap.(*syncQueueSnapshot)
     232           1 :         return q.syncQueue.pop(s.head, s.tail, err, q.queueSemChan)
     233           1 : }
     234             : 
     235             : // The implementation of pendingSyncsSnapshot in standalone mode.
     236             : type syncQueueSnapshot struct {
     237             :         head, tail uint32
     238             : }
     239             : 
     240           1 : func (s *syncQueueSnapshot) empty() bool {
     241           1 :         return s.head == s.tail
     242           1 : }
     243             : 
     244             : // The implementation of pendingSync in standalone mode.
     245             : type pendingSyncForSyncQueue struct {
     246             :         wg  *sync.WaitGroup
     247             :         err *error
     248             : }
     249             : 
     250           1 : func (ps *pendingSyncForSyncQueue) syncRequested() bool {
     251           1 :         return ps.wg != nil
     252           1 : }
     253             : 
     254             : // The implementation of pendingSyncs in failover mode.
     255             : type pendingSyncsWithHighestSyncIndex struct {
     256             :         // The highest "index" queued that is requesting a sync. Initialized
     257             :         // to NoSyncIndex, and reset to NoSyncIndex after the sync.
     258             :         index           atomic.Int64
     259             :         snapshotBacking PendingSyncIndex
     260             :         // blocked is an atomic boolean which indicates whether syncing is currently
     261             :         // blocked or can proceed. It is used by the implementation of
     262             :         // min-sync-interval to block syncing until the min interval has passed.
     263             :         blocked                   atomic.Bool
     264             :         externalSyncQueueCallback ExternalSyncQueueCallback
     265             : }
     266             : 
     267             : // NoSyncIndex is the value of PendingSyncIndex when a sync is not requested.
     268             : const NoSyncIndex = -1
     269             : 
     270             : func (si *pendingSyncsWithHighestSyncIndex) init(
     271             :         externalSyncQueueCallback ExternalSyncQueueCallback,
     272           1 : ) {
     273           1 :         si.index.Store(NoSyncIndex)
     274           1 :         si.externalSyncQueueCallback = externalSyncQueueCallback
     275           1 : }
     276             : 
     277           1 : func (si *pendingSyncsWithHighestSyncIndex) push(ps PendingSync) {
     278           1 :         ps2 := ps.(*PendingSyncIndex)
     279           1 :         si.index.Store(ps2.Index)
     280           1 : }
     281             : 
     282           0 : func (si *pendingSyncsWithHighestSyncIndex) setBlocked() {
     283           0 :         si.blocked.Store(true)
     284           0 : }
     285             : 
     286           1 : func (si *pendingSyncsWithHighestSyncIndex) clearBlocked() {
     287           1 :         si.blocked.Store(false)
     288           1 : }
     289             : 
     290           1 : func (si *pendingSyncsWithHighestSyncIndex) empty() bool {
     291           1 :         return si.load() == NoSyncIndex
     292           1 : }
     293             : 
     294           1 : func (si *pendingSyncsWithHighestSyncIndex) snapshotForPop() pendingSyncsSnapshot {
     295           1 :         si.snapshotBacking = PendingSyncIndex{Index: si.load()}
     296           1 :         return &si.snapshotBacking
     297           1 : }
     298             : 
     299           1 : func (si *pendingSyncsWithHighestSyncIndex) load() int64 {
     300           1 :         index := si.index.Load()
     301           1 :         if index != NoSyncIndex && si.blocked.Load() {
     302           0 :                 index = NoSyncIndex
     303           0 :         }
     304           1 :         return index
     305             : }
     306             : 
     307           1 : func (si *pendingSyncsWithHighestSyncIndex) pop(snap pendingSyncsSnapshot, err error) error {
     308           1 :         index := snap.(*PendingSyncIndex)
     309           1 :         if index.Index == NoSyncIndex {
     310           0 :                 return nil
     311           0 :         }
     312             :         // Set to NoSyncIndex if a higher index has not queued.
     313           1 :         si.index.CompareAndSwap(index.Index, NoSyncIndex)
     314           1 :         si.externalSyncQueueCallback(*index, err)
     315           1 :         return nil
     316             : }
     317             : 
     318             : // PendingSyncIndex implements both pendingSyncsSnapshot and PendingSync.
     319             : type PendingSyncIndex struct {
     320             :         // Index is some state meaningful to the user of LogWriter. The LogWriter
     321             :         // itself only examines whether Index is equal to NoSyncIndex.
     322             :         Index int64
     323             : }
     324             : 
     325           1 : func (s *PendingSyncIndex) empty() bool {
     326           1 :         return s.Index == NoSyncIndex
     327           1 : }
     328             : 
     329           1 : func (s *PendingSyncIndex) syncRequested() bool {
     330           1 :         return s.Index != NoSyncIndex
     331           1 : }
     332             : 
     333             : // flusherCond is a specialized condition variable that allows its condition to
     334             : // change and readiness be signalled without holding its associated mutex. In
     335             : // particular, when a waiter is added to syncQueue atomically, this condition
     336             : // variable can be signalled without holding flusher.Mutex.
     337             : type flusherCond struct {
     338             :         mu   *sync.Mutex
     339             :         q    pendingSyncs
     340             :         cond sync.Cond
     341             : }
     342             : 
     343           1 : func (c *flusherCond) init(mu *sync.Mutex, q pendingSyncs) {
     344           1 :         c.mu = mu
     345           1 :         c.q = q
     346           1 :         // Yes, this is a bit circular, but that is intentional. flusherCond.cond.L
     347           1 :         // points flusherCond so that when cond.L.Unlock is called flusherCond.Unlock
     348           1 :         // will be called and we can check the !syncQueue.empty() condition.
     349           1 :         c.cond.L = c
     350           1 : }
     351             : 
     352           1 : func (c *flusherCond) Signal() {
     353           1 :         // Pass-through to the cond var.
     354           1 :         c.cond.Signal()
     355           1 : }
     356             : 
     357           1 : func (c *flusherCond) Wait() {
     358           1 :         // Pass-through to the cond var. Note that internally the cond var implements
     359           1 :         // Wait as:
     360           1 :         //
     361           1 :         //   t := notifyListAdd()
     362           1 :         //   L.Unlock()
     363           1 :         //   notifyListWait(t)
     364           1 :         //   L.Lock()
     365           1 :         //
     366           1 :         // We've configured the cond var to call flusherReady.Unlock() which allows
     367           1 :         // us to check the !syncQueue.empty() condition without a danger of missing a
     368           1 :         // notification. Any call to flusherReady.Signal() after notifyListAdd() is
     369           1 :         // called will cause the subsequent notifyListWait() to return immediately.
     370           1 :         c.cond.Wait()
     371           1 : }
     372             : 
     373           1 : func (c *flusherCond) Lock() {
     374           1 :         c.mu.Lock()
     375           1 : }
     376             : 
     377           1 : func (c *flusherCond) Unlock() {
     378           1 :         c.mu.Unlock()
     379           1 :         if !c.q.empty() {
     380           1 :                 // If the current goroutine is about to block on sync.Cond.Wait, this call
     381           1 :                 // to Signal will prevent that. The comment in Wait above explains a bit
     382           1 :                 // about what is going on here, but it is worth reiterating:
     383           1 :                 //
     384           1 :                 //   flusherCond.Wait()
     385           1 :                 //     sync.Cond.Wait()
     386           1 :                 //       t := notifyListAdd()
     387           1 :                 //       flusherCond.Unlock()    <-- we are here
     388           1 :                 //       notifyListWait(t)
     389           1 :                 //       flusherCond.Lock()
     390           1 :                 //
     391           1 :                 // The call to Signal here results in:
     392           1 :                 //
     393           1 :                 //     sync.Cond.Signal()
     394           1 :                 //       notifyListNotifyOne()
     395           1 :                 //
     396           1 :                 // The call to notifyListNotifyOne() will prevent the call to
     397           1 :                 // notifyListWait(t) from blocking.
     398           1 :                 c.cond.Signal()
     399           1 :         }
     400             : }
     401             : 
     402             : type durationFunc func() time.Duration
     403             : 
     404             : // syncTimer is an interface for timers, modeled on the closure callback mode
     405             : // of time.Timer. See time.AfterFunc and LogWriter.afterFunc. syncTimer is used
     406             : // by tests to mock out the timer functionality used to implement
     407             : // min-sync-interval.
     408             : type syncTimer interface {
     409             :         Reset(time.Duration) bool
     410             :         Stop() bool
     411             : }
     412             : 
     413             : // LogWriter writes records to an underlying io.Writer. In order to support WAL
     414             : // file reuse, a LogWriter's records are tagged with the WAL's file
     415             : // number. When reading a log file a record from a previous incarnation of the
     416             : // file will return the error ErrInvalidLogNum.
     417             : type LogWriter struct {
     418             :         // w is the underlying writer.
     419             :         w io.Writer
     420             :         // c is w as a closer.
     421             :         c io.Closer
     422             :         // s is w as a syncer.
     423             :         s syncer
     424             :         // logNum is the low 32-bits of the log's file number.
     425             :         logNum uint32
     426             :         // blockNum is the zero based block number for the current block.
     427             :         blockNum int64
     428             :         // err is any accumulated error. It originates in flusher.err, and is
     429             :         // updated to reflect flusher.err when a block is full and getting enqueued.
     430             :         // Therefore, there is a lag between when flusher.err has a non-nil error,
     431             :         // and when that non-nil error is reflected in LogWriter.err. On close, it
     432             :         // is set to errClosedWriter to inform accidental future calls to
     433             :         // SyncRecord*.
     434             :         err error
     435             :         // block is the current block being written. Protected by flusher.Mutex.
     436             :         block *block
     437             :         free  struct {
     438             :                 sync.Mutex
     439             :                 blocks []*block
     440             :         }
     441             : 
     442             :         flusher struct {
     443             :                 sync.Mutex
     444             :                 // Flusher ready is a condition variable that is signalled when there are
     445             :                 // blocks to flush, syncing has been requested, or the LogWriter has been
     446             :                 // closed. For signalling of a sync, it is safe to call without holding
     447             :                 // flusher.Mutex.
     448             :                 ready flusherCond
     449             :                 // Set to true when the flush loop should be closed.
     450             :                 close bool
     451             :                 // Closed when the flush loop has terminated.
     452             :                 closed chan struct{}
     453             :                 // Accumulated flush error.
     454             :                 err error
     455             :                 // minSyncInterval is the minimum duration between syncs.
     456             :                 minSyncInterval durationFunc
     457             :                 fsyncLatency    prometheus.Histogram
     458             :                 pending         []*block
     459             :                 // Pushing and popping from pendingSyncs does not require flusher mutex to
     460             :                 // be held.
     461             :                 pendingSyncs pendingSyncs
     462             :                 metrics      *LogWriterMetrics
     463             :         }
     464             : 
     465             :         // afterFunc is a hook to allow tests to mock out the timer functionality
     466             :         // used for min-sync-interval. In normal operation this points to
     467             :         // time.AfterFunc.
     468             :         afterFunc func(d time.Duration, f func()) syncTimer
     469             : 
     470             :         // Backing for both pendingSyncs implementations.
     471             :         pendingSyncsBackingQ     pendingSyncsWithSyncQueue
     472             :         pendingSyncsBackingIndex pendingSyncsWithHighestSyncIndex
     473             : 
     474             :         pendingSyncForSyncQueueBacking pendingSyncForSyncQueue
     475             : }
     476             : 
     477             : // LogWriterConfig is a struct used for configuring new LogWriters
     478             : type LogWriterConfig struct {
     479             :         WALMinSyncInterval durationFunc
     480             :         WALFsyncLatency    prometheus.Histogram
     481             :         // QueueSemChan is an optional channel to pop from when popping from
     482             :         // LogWriter.flusher.syncQueue. It functions as a semaphore that prevents
     483             :         // the syncQueue from overflowing (which will cause a panic). All production
     484             :         // code ensures this is non-nil.
     485             :         QueueSemChan chan struct{}
     486             : 
     487             :         // ExternalSyncQueueCallback is set to non-nil when the LogWriter is used
     488             :         // as part of a WAL implementation that can failover between LogWriters.
     489             :         //
     490             :         // In this case, QueueSemChan is always nil, and SyncRecordGeneralized must
     491             :         // be used with a PendingSync parameter that is implemented by
     492             :         // PendingSyncIndex. When an index is synced (which implies all earlier
     493             :         // indices are also synced), this callback is invoked. The caller must not
     494             :         // hold any mutex when invoking this callback, since the lock ordering
     495             :         // requirement in this case is that any higher layer locks (in the wal
     496             :         // package) precede the lower layer locks (in the record package). These
     497             :         // callbacks are serialized since they are invoked from the flushLoop.
     498             :         ExternalSyncQueueCallback ExternalSyncQueueCallback
     499             : }
     500             : 
     501             : // ExternalSyncQueueCallback is to be run when a PendingSync has been
     502             : // processed, either successfully or with an error.
     503             : type ExternalSyncQueueCallback func(doneSync PendingSyncIndex, err error)
     504             : 
     505             : // initialAllocatedBlocksCap is the initial capacity of the various slices
     506             : // intended to hold LogWriter blocks. The LogWriter may allocate more blocks
     507             : // than this threshold allows.
     508             : const initialAllocatedBlocksCap = 32
     509             : 
     510             : // blockPool pools *blocks to avoid allocations. Blocks are only added to the
     511             : // Pool when a LogWriter is closed. Before that, free blocks are maintained
     512             : // within a LogWriter's own internal free list `w.free.blocks`.
     513             : var blockPool = sync.Pool{
     514           1 :         New: func() any { return &block{} },
     515             : }
     516             : 
     517             : // NewLogWriter returns a new LogWriter.
     518             : //
     519             : // The io.Writer may also be used as an io.Closer and syncer. No other methods
     520             : // will be called on the writer.
     521             : func NewLogWriter(
     522             :         w io.Writer, logNum base.DiskFileNum, logWriterConfig LogWriterConfig,
     523           1 : ) *LogWriter {
     524           1 :         c, _ := w.(io.Closer)
     525           1 :         s, _ := w.(syncer)
     526           1 :         r := &LogWriter{
     527           1 :                 w: w,
     528           1 :                 c: c,
     529           1 :                 s: s,
     530           1 :                 // NB: we truncate the 64-bit log number to 32-bits. This is ok because a)
     531           1 :                 // we are very unlikely to reach a file number of 4 billion and b) the log
     532           1 :                 // number is used as a validation check and using only the low 32-bits is
     533           1 :                 // sufficient for that purpose.
     534           1 :                 logNum: uint32(logNum),
     535           1 :                 afterFunc: func(d time.Duration, f func()) syncTimer {
     536           0 :                         return time.AfterFunc(d, f)
     537           0 :                 },
     538             :         }
     539           1 :         m := &LogWriterMetrics{}
     540           1 :         if logWriterConfig.ExternalSyncQueueCallback != nil {
     541           1 :                 r.pendingSyncsBackingIndex.init(logWriterConfig.ExternalSyncQueueCallback)
     542           1 :                 r.flusher.pendingSyncs = &r.pendingSyncsBackingIndex
     543           1 :         } else {
     544           1 :                 r.pendingSyncsBackingQ = pendingSyncsWithSyncQueue{
     545           1 :                         syncQueueLen: &m.SyncQueueLen,
     546           1 :                         queueSemChan: logWriterConfig.QueueSemChan,
     547           1 :                 }
     548           1 :                 r.flusher.pendingSyncs = &r.pendingSyncsBackingQ
     549           1 :         }
     550             : 
     551           1 :         r.free.blocks = make([]*block, 0, initialAllocatedBlocksCap)
     552           1 :         r.block = blockPool.Get().(*block)
     553           1 :         r.flusher.ready.init(&r.flusher.Mutex, r.flusher.pendingSyncs)
     554           1 :         r.flusher.closed = make(chan struct{})
     555           1 :         r.flusher.pending = make([]*block, 0, cap(r.free.blocks))
     556           1 :         r.flusher.metrics = m
     557           1 : 
     558           1 :         f := &r.flusher
     559           1 :         f.minSyncInterval = logWriterConfig.WALMinSyncInterval
     560           1 :         f.fsyncLatency = logWriterConfig.WALFsyncLatency
     561           1 : 
     562           1 :         go func() {
     563           1 :                 pprof.Do(context.Background(), walSyncLabels, r.flushLoop)
     564           1 :         }()
     565           1 :         return r
     566             : }
     567             : 
     568           1 : func (w *LogWriter) flushLoop(context.Context) {
     569           1 :         f := &w.flusher
     570           1 :         f.Lock()
     571           1 : 
     572           1 :         // Initialize idleStartTime to when the loop starts.
     573           1 :         idleStartTime := time.Now()
     574           1 :         var syncTimer syncTimer
     575           1 :         defer func() {
     576           1 :                 // Capture the idle duration between the last piece of work and when the
     577           1 :                 // loop terminated.
     578           1 :                 f.metrics.WriteThroughput.IdleDuration += time.Since(idleStartTime)
     579           1 :                 if syncTimer != nil {
     580           0 :                         syncTimer.Stop()
     581           0 :                 }
     582           1 :                 close(f.closed)
     583           1 :                 f.Unlock()
     584             :         }()
     585             : 
     586             :         // The flush loop performs flushing of full and partial data blocks to the
     587             :         // underlying writer (LogWriter.w), syncing of the writer, and notification
     588             :         // to sync requests that they have completed.
     589             :         //
     590             :         // - flusher.ready is a condition variable that is signalled when there is
     591             :         //   work to do. Full blocks are contained in flusher.pending. The current
     592             :         //   partial block is in LogWriter.block. And sync operations are held in
     593             :         //   flusher.syncQ.
     594             :         //
     595             :         // - The decision to sync is determined by whether there are any sync
     596             :         //   requests present in flusher.syncQ and whether enough time has elapsed
     597             :         //   since the last sync. If not enough time has elapsed since the last sync,
     598             :         //   flusher.syncQ.blocked will be set to 1. If syncing is blocked,
     599             :         //   syncQueue.empty() will return true and syncQueue.load() will return 0,0
     600             :         //   (i.e. an empty list).
     601             :         //
     602             :         // - flusher.syncQ.blocked is cleared by a timer that is initialized when
     603             :         //   blocked is set to 1. When blocked is 1, no syncing will take place, but
     604             :         //   flushing will continue to be performed. The on/off toggle for syncing
     605             :         //   does not need to be carefully synchronized with the rest of processing
     606             :         //   -- all we need to ensure is that after any transition to blocked=1 there
     607             :         //   is eventually a transition to blocked=0. syncTimer performs this
     608             :         //   transition. Note that any change to min-sync-interval will not take
     609             :         //   effect until the previous timer elapses.
     610             :         //
     611             :         // - Picking up the syncing work to perform requires coordination with
     612             :         //   picking up the flushing work. Specifically, flushing work is queued
     613             :         //   before syncing work. The guarantee of this code is that when a sync is
     614             :         //   requested, any previously queued flush work will be synced. This
     615             :         //   motivates reading the syncing work (f.syncQ.load()) before picking up
     616             :         //   the flush work (w.block.written.Load()).
     617             : 
     618             :         // The list of full blocks that need to be written. This is copied from
     619             :         // f.pending on every loop iteration, though the number of elements is
     620             :         // usually small (most frequently 1). In the case of the WAL LogWriter, the
     621             :         // number of blocks is bounded by the size of the WAL's corresponding
     622             :         // memtable (MemtableSize/BlockSize). With the default 64 MiB memtables,
     623             :         // this works out to at most 2048 elements if the entirety of the memtable's
     624             :         // contents are queued.
     625           1 :         pending := make([]*block, 0, cap(f.pending))
     626           1 :         for {
     627           1 :                 for {
     628           1 :                         // Grab the portion of the current block that requires flushing. Note that
     629           1 :                         // the current block can be added to the pending blocks list after we release
     630           1 :                         // the flusher lock, but it won't be part of pending.
     631           1 :                         written := w.block.written.Load()
     632           1 :                         if len(f.pending) > 0 || written > w.block.flushed || !f.pendingSyncs.empty() {
     633           1 :                                 break
     634             :                         }
     635           1 :                         if f.close {
     636           1 :                                 // If the writer is closed, pretend the sync timer fired immediately so
     637           1 :                                 // that we can process any queued sync requests.
     638           1 :                                 f.pendingSyncs.clearBlocked()
     639           1 :                                 if !f.pendingSyncs.empty() {
     640           0 :                                         break
     641             :                                 }
     642           1 :                                 return
     643             :                         }
     644           1 :                         f.ready.Wait()
     645           1 :                         continue
     646             :                 }
     647             :                 // Found work to do, so no longer idle.
     648             :                 //
     649             :                 // NB: it is safe to read pending before loading from the syncQ since
     650             :                 // mutations to pending require the w.flusher mutex, which is held here.
     651             :                 // There is no risk that someone will concurrently add to pending, so the
     652             :                 // following sequence, which would pick up a syncQ entry without the
     653             :                 // corresponding data, is impossible:
     654             :                 //
     655             :                 // Thread enqueueing       This thread
     656             :                 //                         1. read pending
     657             :                 // 2. add block to pending
     658             :                 // 3. add to syncQ
     659             :                 //                         4. read syncQ
     660           1 :                 workStartTime := time.Now()
     661           1 :                 idleDuration := workStartTime.Sub(idleStartTime)
     662           1 :                 pending = append(pending[:0], f.pending...)
     663           1 :                 f.pending = f.pending[:0]
     664           1 :                 f.metrics.PendingBufferLen.AddSample(int64(len(pending)))
     665           1 : 
     666           1 :                 // Grab the list of sync waiters. Note that syncQueue.load() will return
     667           1 :                 // 0,0 while we're waiting for the min-sync-interval to expire. This
     668           1 :                 // allows flushing to proceed even if we're not ready to sync.
     669           1 :                 snap := f.pendingSyncs.snapshotForPop()
     670           1 : 
     671           1 :                 // Grab the portion of the current block that requires flushing. Note that
     672           1 :                 // the current block can be added to the pending blocks list after we
     673           1 :                 // release the flusher lock, but it won't be part of pending. This has to
     674           1 :                 // be ordered after we get the list of sync waiters from syncQ in order to
     675           1 :                 // prevent a race where a waiter adds itself to syncQ, but this thread
     676           1 :                 // picks up the entry in syncQ and not the buffered data.
     677           1 :                 written := w.block.written.Load()
     678           1 :                 data := w.block.buf[w.block.flushed:written]
     679           1 :                 w.block.flushed = written
     680           1 : 
     681           1 :                 fErr := f.err
     682           1 :                 f.Unlock()
     683           1 :                 // If flusher has an error, we propagate it to waiters. Note in spite of
     684           1 :                 // error we consume the pending list above to free blocks for writers.
     685           1 :                 if fErr != nil {
     686           0 :                         // NB: pop may invoke ExternalSyncQueueCallback, which is why we have
     687           0 :                         // called f.Unlock() above. We will acquire the lock again below.
     688           0 :                         f.pendingSyncs.pop(snap, fErr)
     689           0 :                         // Update the idleStartTime if work could not be done, so that we don't
     690           0 :                         // include the duration we tried to do work as idle. We don't bother
     691           0 :                         // with the rest of the accounting, which means we will undercount.
     692           0 :                         idleStartTime = time.Now()
     693           0 :                         f.Lock()
     694           0 :                         continue
     695             :                 }
     696           1 :                 synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, snap)
     697           1 :                 f.Lock()
     698           1 :                 if synced && f.fsyncLatency != nil {
     699           1 :                         f.fsyncLatency.Observe(float64(syncLatency))
     700           1 :                 }
     701           1 :                 f.err = err
     702           1 :                 if f.err != nil {
     703           0 :                         f.pendingSyncs.clearBlocked()
     704           0 :                         // Update the idleStartTime if work could not be done, so that we don't
     705           0 :                         // include the duration we tried to do work as idle. We don't bother
     706           0 :                         // with the rest of the accounting, which means we will undercount.
     707           0 :                         idleStartTime = time.Now()
     708           0 :                         continue
     709             :                 }
     710             : 
     711           1 :                 if synced && f.minSyncInterval != nil {
     712           0 :                         // A sync was performed. Make sure we've waited for the min sync
     713           0 :                         // interval before syncing again.
     714           0 :                         if min := f.minSyncInterval(); min > 0 {
     715           0 :                                 f.pendingSyncs.setBlocked()
     716           0 :                                 if syncTimer == nil {
     717           0 :                                         syncTimer = w.afterFunc(min, func() {
     718           0 :                                                 f.pendingSyncs.clearBlocked()
     719           0 :                                                 f.ready.Signal()
     720           0 :                                         })
     721           0 :                                 } else {
     722           0 :                                         syncTimer.Reset(min)
     723           0 :                                 }
     724             :                         }
     725             :                 }
     726             :                 // Finished work, and started idling.
     727           1 :                 idleStartTime = time.Now()
     728           1 :                 workDuration := idleStartTime.Sub(workStartTime)
     729           1 :                 f.metrics.WriteThroughput.Bytes += bytesWritten
     730           1 :                 f.metrics.WriteThroughput.WorkDuration += workDuration
     731           1 :                 f.metrics.WriteThroughput.IdleDuration += idleDuration
     732             :         }
     733             : }
     734             : 
     735             : func (w *LogWriter) flushPending(
     736             :         data []byte, pending []*block, snap pendingSyncsSnapshot,
     737           1 : ) (synced bool, syncLatency time.Duration, bytesWritten int64, err error) {
     738           1 :         defer func() {
     739           1 :                 // Translate panics into errors. The errors will cause flushLoop to shut
     740           1 :                 // down, but allows us to do so in a controlled way and avoid swallowing
     741           1 :                 // the stack that created the panic if panic'ing itself hits a panic
     742           1 :                 // (e.g. unlock of unlocked mutex).
     743           1 :                 if r := recover(); r != nil {
     744           0 :                         err = errors.Newf("%v", r)
     745           0 :                 }
     746             :         }()
     747             : 
     748           1 :         for _, b := range pending {
     749           0 :                 bytesWritten += blockSize - int64(b.flushed)
     750           0 :                 if err = w.flushBlock(b); err != nil {
     751           0 :                         break
     752             :                 }
     753             :         }
     754           1 :         if n := len(data); err == nil && n > 0 {
     755           1 :                 bytesWritten += int64(n)
     756           1 :                 _, err = w.w.Write(data)
     757           1 :         }
     758             : 
     759           1 :         synced = !snap.empty()
     760           1 :         if synced {
     761           1 :                 if err == nil && w.s != nil {
     762           1 :                         syncLatency, err = w.syncWithLatency()
     763           1 :                 } else {
     764           0 :                         synced = false
     765           0 :                 }
     766           1 :                 f := &w.flusher
     767           1 :                 if popErr := f.pendingSyncs.pop(snap, err); popErr != nil {
     768           0 :                         return synced, syncLatency, bytesWritten, firstError(err, popErr)
     769           0 :                 }
     770             :         }
     771             : 
     772           1 :         return synced, syncLatency, bytesWritten, err
     773             : }
     774             : 
     775           1 : func (w *LogWriter) syncWithLatency() (time.Duration, error) {
     776           1 :         start := time.Now()
     777           1 :         err := w.s.Sync()
     778           1 :         syncLatency := time.Since(start)
     779           1 :         return syncLatency, err
     780           1 : }
     781             : 
     782           0 : func (w *LogWriter) flushBlock(b *block) error {
     783           0 :         if _, err := w.w.Write(b.buf[b.flushed:]); err != nil {
     784           0 :                 return err
     785           0 :         }
     786           0 :         b.written.Store(0)
     787           0 :         b.flushed = 0
     788           0 :         w.free.Lock()
     789           0 :         w.free.blocks = append(w.free.blocks, b)
     790           0 :         w.free.Unlock()
     791           0 :         return nil
     792             : }
     793             : 
     794             : // queueBlock queues the current block for writing to the underlying writer,
     795             : // allocates a new block and reserves space for the next header.
     796           0 : func (w *LogWriter) queueBlock() {
     797           0 :         // Allocate a new block, blocking until one is available. We do this first
     798           0 :         // because w.block is protected by w.flusher.Mutex.
     799           0 :         w.free.Lock()
     800           0 :         if len(w.free.blocks) == 0 {
     801           0 :                 w.free.blocks = append(w.free.blocks, blockPool.Get().(*block))
     802           0 :         }
     803           0 :         nextBlock := w.free.blocks[len(w.free.blocks)-1]
     804           0 :         w.free.blocks = w.free.blocks[:len(w.free.blocks)-1]
     805           0 :         w.free.Unlock()
     806           0 : 
     807           0 :         f := &w.flusher
     808           0 :         f.Lock()
     809           0 :         f.pending = append(f.pending, w.block)
     810           0 :         w.block = nextBlock
     811           0 :         f.ready.Signal()
     812           0 :         w.err = w.flusher.err
     813           0 :         f.Unlock()
     814           0 : 
     815           0 :         w.blockNum++
     816             : }
     817             : 
     818             : // Close flushes and syncs any unwritten data and closes the writer.
     819             : // Where required, external synchronisation is provided by commitPipeline.mu.
     820           1 : func (w *LogWriter) Close() error {
     821           1 :         return w.closeInternal(PendingSyncIndex{Index: NoSyncIndex})
     822           1 : }
     823             : 
     824             : // CloseWithLastQueuedRecord is like Close, but optionally accepts a
     825             : // lastQueuedRecord, that the caller will be notified about when synced.
     826           1 : func (w *LogWriter) CloseWithLastQueuedRecord(lastQueuedRecord PendingSyncIndex) error {
     827           1 :         return w.closeInternal(lastQueuedRecord)
     828           1 : }
     829             : 
     830           1 : func (w *LogWriter) closeInternal(lastQueuedRecord PendingSyncIndex) error {
     831           1 :         f := &w.flusher
     832           1 : 
     833           1 :         // Emit an EOF trailer signifying the end of this log. This helps readers
     834           1 :         // differentiate between a corrupted entry in the middle of a log from
     835           1 :         // garbage at the tail from a recycled log file.
     836           1 :         w.emitEOFTrailer()
     837           1 : 
     838           1 :         // Signal the flush loop to close.
     839           1 :         f.Lock()
     840           1 :         f.close = true
     841           1 :         f.ready.Signal()
     842           1 :         f.Unlock()
     843           1 : 
     844           1 :         // Wait for the flush loop to close. The flush loop will not close until all
     845           1 :         // pending data has been written or an error occurs.
     846           1 :         <-f.closed
     847           1 : 
     848           1 :         // Sync any flushed data to disk. NB: flushLoop will sync after flushing the
     849           1 :         // last buffered data only if it was requested via syncQ, so we need to sync
     850           1 :         // here to ensure that all the data is synced.
     851           1 :         err := w.flusher.err
     852           1 :         var syncLatency time.Duration
     853           1 :         if err == nil && w.s != nil {
     854           1 :                 syncLatency, err = w.syncWithLatency()
     855           1 :         }
     856           1 :         f.Lock()
     857           1 :         if err == nil && f.fsyncLatency != nil {
     858           1 :                 f.fsyncLatency.Observe(float64(syncLatency))
     859           1 :         }
     860           1 :         free := w.free.blocks
     861           1 :         f.Unlock()
     862           1 : 
     863           1 :         // NB: the caller of closeInternal may not care about a non-nil cerr below
     864           1 :         // if all queued writes have been successfully written and synced.
     865           1 :         if lastQueuedRecord.Index != NoSyncIndex {
     866           1 :                 w.pendingSyncsBackingIndex.externalSyncQueueCallback(lastQueuedRecord, err)
     867           1 :         }
     868           1 :         if w.c != nil {
     869           1 :                 cerr := w.c.Close()
     870           1 :                 w.c = nil
     871           1 :                 err = firstError(err, cerr)
     872           1 :         }
     873             : 
     874           1 :         for _, b := range free {
     875           0 :                 b.flushed = 0
     876           0 :                 b.written.Store(0)
     877           0 :                 blockPool.Put(b)
     878           0 :         }
     879             : 
     880           1 :         w.err = errClosedWriter
     881           1 :         return err
     882             : }
     883             : 
     884             : // firstError returns the first non-nil error of err0 and err1, or nil if both
     885             : // are nil.
     886           1 : func firstError(err0, err1 error) error {
     887           1 :         if err0 != nil {
     888           0 :                 return err0
     889           0 :         }
     890           1 :         return err1
     891             : }
     892             : 
     893             : // WriteRecord writes a complete record. Returns the offset just past the end
     894             : // of the record.
     895             : // External synchronisation provided by commitPipeline.mu.
     896           0 : func (w *LogWriter) WriteRecord(p []byte) (int64, error) {
     897           0 :         logSize, err := w.SyncRecord(p, nil, nil)
     898           0 :         return logSize, err
     899           0 : }
     900             : 
     901             : // SyncRecord writes a complete record. If wg != nil the record will be
     902             : // asynchronously persisted to the underlying writer and done will be called on
     903             : // the wait group upon completion. Returns the offset just past the end of the
     904             : // record.
     905             : // External synchronisation provided by commitPipeline.mu.
     906             : func (w *LogWriter) SyncRecord(
     907             :         p []byte, wg *sync.WaitGroup, err *error,
     908           1 : ) (logSize int64, err2 error) {
     909           1 :         w.pendingSyncForSyncQueueBacking = pendingSyncForSyncQueue{
     910           1 :                 wg:  wg,
     911           1 :                 err: err,
     912           1 :         }
     913           1 :         return w.SyncRecordGeneralized(p, &w.pendingSyncForSyncQueueBacking)
     914           1 : }
     915             : 
     916             : // SyncRecordGeneralized is a version of SyncRecord that accepts a
     917             : // PendingSync.
     918           1 : func (w *LogWriter) SyncRecordGeneralized(p []byte, ps PendingSync) (logSize int64, err2 error) {
     919           1 :         if w.err != nil {
     920           0 :                 return -1, w.err
     921           0 :         }
     922             : 
     923             :         // The `i == 0` condition ensures we handle empty records. Such records can
     924             :         // possibly be generated for VersionEdits stored in the MANIFEST. While the
     925             :         // MANIFEST is currently written using Writer, it is good to support the same
     926             :         // semantics with LogWriter.
     927           1 :         for i := 0; i == 0 || len(p) > 0; i++ {
     928           1 :                 p = w.emitFragment(i, p)
     929           1 :         }
     930             : 
     931           1 :         if ps.syncRequested() {
     932           1 :                 // If we've been asked to persist the record, add the WaitGroup to the sync
     933           1 :                 // queue and signal the flushLoop. Note that flushLoop will write partial
     934           1 :                 // blocks to the file if syncing has been requested. The contract is that
     935           1 :                 // any record written to the LogWriter to this point will be flushed to the
     936           1 :                 // OS and synced to disk.
     937           1 :                 f := &w.flusher
     938           1 :                 f.pendingSyncs.push(ps)
     939           1 :                 f.ready.Signal()
     940           1 :         }
     941             : 
     942           1 :         offset := w.blockNum*blockSize + int64(w.block.written.Load())
     943           1 :         // Note that we don't return w.err here as a concurrent call to Close would
     944           1 :         // race with our read. That's ok because the only error we could be seeing is
     945           1 :         // one to syncing for which the caller can receive notification of by passing
     946           1 :         // in a non-nil err argument.
     947           1 :         return offset, nil
     948             : }
     949             : 
     950             : // Size returns the current size of the file.
     951             : // External synchronisation provided by commitPipeline.mu.
     952           1 : func (w *LogWriter) Size() int64 {
     953           1 :         return w.blockNum*blockSize + int64(w.block.written.Load())
     954           1 : }
     955             : 
     956           1 : func (w *LogWriter) emitEOFTrailer() {
     957           1 :         // Write a recyclable chunk header with a different log number.  Readers
     958           1 :         // will treat the header as EOF when the log number does not match.
     959           1 :         b := w.block
     960           1 :         i := b.written.Load()
     961           1 :         binary.LittleEndian.PutUint32(b.buf[i+0:i+4], 0) // CRC
     962           1 :         binary.LittleEndian.PutUint16(b.buf[i+4:i+6], 0) // Size
     963           1 :         b.buf[i+6] = recyclableFullChunkType
     964           1 :         binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum+1) // Log number
     965           1 :         b.written.Store(i + int32(recyclableHeaderSize))
     966           1 : }
     967             : 
     968           1 : func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte) {
     969           1 :         b := w.block
     970           1 :         i := b.written.Load()
     971           1 :         first := n == 0
     972           1 :         last := blockSize-i-recyclableHeaderSize >= int32(len(p))
     973           1 : 
     974           1 :         if last {
     975           1 :                 if first {
     976           1 :                         b.buf[i+6] = recyclableFullChunkType
     977           1 :                 } else {
     978           0 :                         b.buf[i+6] = recyclableLastChunkType
     979           0 :                 }
     980           0 :         } else {
     981           0 :                 if first {
     982           0 :                         b.buf[i+6] = recyclableFirstChunkType
     983           0 :                 } else {
     984           0 :                         b.buf[i+6] = recyclableMiddleChunkType
     985           0 :                 }
     986             :         }
     987             : 
     988           1 :         binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum)
     989           1 : 
     990           1 :         r := copy(b.buf[i+recyclableHeaderSize:], p)
     991           1 :         j := i + int32(recyclableHeaderSize+r)
     992           1 :         binary.LittleEndian.PutUint32(b.buf[i+0:i+4], crc.New(b.buf[i+6:j]).Value())
     993           1 :         binary.LittleEndian.PutUint16(b.buf[i+4:i+6], uint16(r))
     994           1 :         b.written.Store(j)
     995           1 : 
     996           1 :         if blockSize-b.written.Load() < recyclableHeaderSize {
     997           0 :                 // There is no room for another fragment in the block, so fill the
     998           0 :                 // remaining bytes with zeros and queue the block for flushing.
     999           0 :                 clear(b.buf[b.written.Load():])
    1000           0 :                 w.queueBlock()
    1001           0 :         }
    1002           1 :         return p[r:]
    1003             : }
    1004             : 
    1005             : // Metrics must typically be called after Close, since the callee will no
    1006             : // longer modify the returned LogWriterMetrics. It is also current if there is
    1007             : // nothing left to flush in the flush loop, but that is an implementation
    1008             : // detail that callers should not rely on.
    1009           1 : func (w *LogWriter) Metrics() LogWriterMetrics {
    1010           1 :         w.flusher.Lock()
    1011           1 :         defer w.flusher.Unlock()
    1012           1 :         m := *w.flusher.metrics
    1013           1 :         return m
    1014           1 : }
    1015             : 
    1016             : // LogWriterMetrics contains misc metrics for the log writer.
    1017             : type LogWriterMetrics struct {
    1018             :         WriteThroughput  base.ThroughputMetric
    1019             :         PendingBufferLen base.GaugeSampleMetric
    1020             :         SyncQueueLen     base.GaugeSampleMetric
    1021             : }
    1022             : 
    1023             : // Merge merges metrics from x. Requires that x is non-nil.
    1024           1 : func (m *LogWriterMetrics) Merge(x *LogWriterMetrics) error {
    1025           1 :         m.WriteThroughput.Merge(x.WriteThroughput)
    1026           1 :         m.PendingBufferLen.Merge(x.PendingBufferLen)
    1027           1 :         m.SyncQueueLen.Merge(x.SyncQueueLen)
    1028           1 :         return nil
    1029           1 : }

Generated by: LCOV version 1.14