LCOV - code coverage report
Current view: top level - pebble/record/record - log_writer.go (source / functions) Coverage Total Hit
Test: 2025-11-17 08:20Z 5729a1c7 - meta test only.lcov Lines: 80.2 % 622 499
Test Date: 2025-11-17 08:21:10 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package record
       6              : 
       7              : import (
       8              :         "context"
       9              :         "encoding/binary"
      10              :         "io"
      11              :         "runtime/pprof"
      12              :         "sync"
      13              :         "sync/atomic"
      14              :         "time"
      15              : 
      16              :         "github.com/cockroachdb/crlib/crtime"
      17              :         "github.com/cockroachdb/errors"
      18              :         "github.com/cockroachdb/pebble/internal/base"
      19              :         "github.com/cockroachdb/pebble/internal/crc"
      20              :         "github.com/prometheus/client_golang/prometheus"
      21              : )
      22              : 
      23              : var walSyncLabels = pprof.Labels("pebble", "wal-sync")
      24              : var errClosedWriter = errors.New("pebble/record: closed LogWriter")
      25              : 
      26              : type block struct {
      27              :         // buf[:written] has already been filled with fragments. Updated atomically.
      28              :         written atomic.Int32
      29              :         // buf[:flushed] has already been flushed to w.
      30              :         flushed int32
      31              :         buf     [blockSize]byte
      32              : }
      33              : 
      34              : type flusher interface {
      35              :         Flush() error
      36              : }
      37              : 
      38              : type syncer interface {
      39              :         Sync() error
      40              : }
      41              : 
      42              : const (
      43              :         syncConcurrencyBits = 12
      44              : 
      45              :         // SyncConcurrency is the maximum number of concurrent sync operations that
      46              :         // can be performed. Note that a sync operation is initiated either by a call
      47              :         // to SyncRecord or by a call to Close. Exported as this value also limits
      48              :         // the commit concurrency in commitPipeline.
      49              :         SyncConcurrency = 1 << syncConcurrencyBits
      50              : )
      51              : 
      52              : type syncSlot struct {
      53              :         wg  *sync.WaitGroup
      54              :         err *error
      55              : }
      56              : 
      57              : // syncQueue is a lock-free fixed-size single-producer, single-consumer
      58              : // queue. The single-producer can push to the head, and the single-consumer can
      59              : // pop multiple values from the tail. Popping calls Done() on each of the
      60              : // available *sync.WaitGroup elements.
      61              : type syncQueue struct {
      62              :         // headTail packs together a 32-bit head index and a 32-bit tail index. Both
      63              :         // are indexes into slots modulo len(slots)-1.
      64              :         //
      65              :         // tail = index of oldest data in queue
      66              :         // head = index of next slot to fill
      67              :         //
      68              :         // Slots in the range [tail, head) are owned by consumers.  A consumer
      69              :         // continues to own a slot outside this range until it nils the slot, at
      70              :         // which point ownership passes to the producer.
      71              :         //
      72              :         // The head index is stored in the most-significant bits so that we can
      73              :         // atomically add to it and the overflow is harmless.
      74              :         headTail atomic.Uint64
      75              : 
      76              :         // slots is a ring buffer of values stored in this queue. The size must be a
      77              :         // power of 2. A slot is in use until the tail index has moved beyond it.
      78              :         slots [SyncConcurrency]syncSlot
      79              : 
      80              :         // blocked is an atomic boolean which indicates whether syncing is currently
      81              :         // blocked or can proceed. It is used by the implementation of
      82              :         // min-sync-interval to block syncing until the min interval has passed.
      83              :         blocked atomic.Bool
      84              : }
      85              : 
      86              : const dequeueBits = 32
      87              : 
      88              : // unpack extracts the head and tail indices from a 64-bit unsigned integer.
      89            1 : func (q *syncQueue) unpack(ptrs uint64) (head, tail uint32) {
      90            1 :         const mask = 1<<dequeueBits - 1
      91            1 :         head = uint32((ptrs >> dequeueBits) & mask)
      92            1 :         tail = uint32(ptrs & mask)
      93            1 :         return head, tail
      94            1 : }
      95              : 
      96            1 : func (q *syncQueue) push(wg *sync.WaitGroup, err *error) {
      97            1 :         ptrs := q.headTail.Load()
      98            1 :         head, tail := q.unpack(ptrs)
      99            1 :         if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
     100            0 :                 panic("pebble: queue is full")
     101              :         }
     102              : 
     103            1 :         slot := &q.slots[head&uint32(len(q.slots)-1)]
     104            1 :         slot.wg = wg
     105            1 :         slot.err = err
     106            1 : 
     107            1 :         // Increment head. This passes ownership of slot to dequeue and acts as a
     108            1 :         // store barrier for writing the slot.
     109            1 :         q.headTail.Add(1 << dequeueBits)
     110              : }
     111              : 
     112            0 : func (q *syncQueue) setBlocked() {
     113            0 :         q.blocked.Store(true)
     114            0 : }
     115              : 
     116            1 : func (q *syncQueue) clearBlocked() {
     117            1 :         q.blocked.Store(false)
     118            1 : }
     119              : 
     120            1 : func (q *syncQueue) empty() bool {
     121            1 :         head, tail, _ := q.load()
     122            1 :         return head == tail
     123            1 : }
     124              : 
     125              : // load returns the head, tail of the queue for what should be synced to the
     126              : // caller. It can return a head, tail of zero if syncing is blocked due to
     127              : // min-sync-interval. It additionally returns the real length of this queue,
     128              : // regardless of whether syncing is blocked.
     129            1 : func (q *syncQueue) load() (head, tail, realLength uint32) {
     130            1 :         ptrs := q.headTail.Load()
     131            1 :         head, tail = q.unpack(ptrs)
     132            1 :         realLength = head - tail
     133            1 :         if q.blocked.Load() {
     134            0 :                 return 0, 0, realLength
     135            0 :         }
     136            1 :         return head, tail, realLength
     137              : }
     138              : 
     139              : // REQUIRES: queueSemChan is non-nil.
     140            1 : func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{}) error {
     141            1 :         if tail == head {
     142            0 :                 // Queue is empty.
     143            0 :                 return nil
     144            0 :         }
     145              : 
     146            1 :         for ; tail != head; tail++ {
     147            1 :                 slot := &q.slots[tail&uint32(len(q.slots)-1)]
     148            1 :                 wg := slot.wg
     149            1 :                 if wg == nil {
     150            0 :                         return errors.Errorf("nil waiter at %d", errors.Safe(tail&uint32(len(q.slots)-1)))
     151            0 :                 }
     152            1 :                 *slot.err = err
     153            1 :                 slot.wg = nil
     154            1 :                 slot.err = nil
     155            1 :                 // We need to bump the tail count before releasing the queueSemChan
     156            1 :                 // semaphore as releasing the semaphore can cause a blocked goroutine to
     157            1 :                 // acquire the semaphore and enqueue before we've "freed" space in the
     158            1 :                 // queue.
     159            1 :                 q.headTail.Add(1)
     160            1 :                 wg.Done()
     161            1 :                 // Is always non-nil in production, unless using wal package for WAL
     162            1 :                 // failover.
     163            1 :                 if queueSemChan != nil {
     164            1 :                         <-queueSemChan
     165            1 :                 }
     166              :         }
     167              : 
     168            1 :         return nil
     169              : }
     170              : 
     171              : // pendingSyncs abstracts out the handling of pending sync requests. In
     172              : // standalone mode the implementation is a thin wrapper around syncQueue. In
     173              : // the mode where the LogWriter can be subject to failover, there is no queue
     174              : // kept in the LogWriter and the signaling to those waiting for sync is
     175              : // handled in the wal package.
     176              : //
     177              : // To avoid heap allocations due to the use of this interface, the parameters
     178              : // and return values follow some strict rules:
     179              : //   - The PendingSync parameter can be reused by the caller after push returns.
     180              : //     The implementation should be a pointer backed by a struct that is already
     181              : //     heap allocated, which the caller can reuse for the next push call.
     182              : //   - The pendingSyncSnapshot return value must be backed by the pendingSyncs
     183              : //     implementation, so calling snapshotForPop again will cause the previous
     184              : //     snapshot to be overwritten.
     185              : type pendingSyncs interface {
     186              :         push(PendingSync)
     187              :         setBlocked()
     188              :         clearBlocked()
     189              :         empty() bool
     190              :         snapshotForPop() pendingSyncsSnapshot
     191              :         pop(snap pendingSyncsSnapshot, err error) error
     192              : }
     193              : 
     194              : type pendingSyncsSnapshot interface {
     195              :         empty() bool
     196              : }
     197              : 
     198              : // PendingSync abstracts the sync specification for a record queued on the
     199              : // LogWriter. The only implementations are provided in this package since
     200              : // syncRequested is not exported.
     201              : type PendingSync interface {
     202              :         syncRequested() bool
     203              : }
     204              : 
     205              : // The implementation of pendingSyncs in standalone mode.
     206              : type pendingSyncsWithSyncQueue struct {
     207              :         syncQueue
     208              :         syncQueueLen    *base.GaugeSampleMetric
     209              :         snapshotBacking syncQueueSnapshot
     210              :         // See the comment for LogWriterConfig.QueueSemChan.
     211              :         queueSemChan chan struct{}
     212              : }
     213              : 
     214              : var _ pendingSyncs = &pendingSyncsWithSyncQueue{}
     215              : 
     216            1 : func (q *pendingSyncsWithSyncQueue) push(ps PendingSync) {
     217            1 :         ps2 := ps.(*pendingSyncForSyncQueue)
     218            1 :         q.syncQueue.push(ps2.wg, ps2.err)
     219            1 : }
     220              : 
     221            1 : func (q *pendingSyncsWithSyncQueue) snapshotForPop() pendingSyncsSnapshot {
     222            1 :         head, tail, realLength := q.syncQueue.load()
     223            1 :         q.snapshotBacking = syncQueueSnapshot{
     224            1 :                 head: head,
     225            1 :                 tail: tail,
     226            1 :         }
     227            1 :         q.syncQueueLen.AddSample(int64(realLength))
     228            1 :         return &q.snapshotBacking
     229            1 : }
     230              : 
     231            1 : func (q *pendingSyncsWithSyncQueue) pop(snap pendingSyncsSnapshot, err error) error {
     232            1 :         s := snap.(*syncQueueSnapshot)
     233            1 :         return q.syncQueue.pop(s.head, s.tail, err, q.queueSemChan)
     234            1 : }
     235              : 
     236              : // The implementation of pendingSyncsSnapshot in standalone mode.
     237              : type syncQueueSnapshot struct {
     238              :         head, tail uint32
     239              : }
     240              : 
     241            1 : func (s *syncQueueSnapshot) empty() bool {
     242            1 :         return s.head == s.tail
     243            1 : }
     244              : 
     245              : // The implementation of pendingSync in standalone mode.
     246              : type pendingSyncForSyncQueue struct {
     247              :         wg  *sync.WaitGroup
     248              :         err *error
     249              : }
     250              : 
     251            1 : func (ps *pendingSyncForSyncQueue) syncRequested() bool {
     252            1 :         return ps.wg != nil
     253            1 : }
     254              : 
     255              : // The implementation of pendingSyncs in failover mode.
     256              : type pendingSyncsWithHighestSyncIndex struct {
     257              :         // The highest "index" queued that is requesting a sync. Initialized
     258              :         // to NoSyncIndex, and reset to NoSyncIndex after the sync.
     259              :         index           atomic.Int64
     260              :         snapshotBacking PendingSyncIndex
     261              :         // blocked is an atomic boolean which indicates whether syncing is currently
     262              :         // blocked or can proceed. It is used by the implementation of
     263              :         // min-sync-interval to block syncing until the min interval has passed.
     264              :         blocked                   atomic.Bool
     265              :         externalSyncQueueCallback ExternalSyncQueueCallback
     266              : }
     267              : 
     268              : // NoSyncIndex is the value of PendingSyncIndex when a sync is not requested.
     269              : const NoSyncIndex = -1
     270              : 
     271              : func (si *pendingSyncsWithHighestSyncIndex) init(
     272              :         externalSyncQueueCallback ExternalSyncQueueCallback,
     273            1 : ) {
     274            1 :         si.index.Store(NoSyncIndex)
     275            1 :         si.externalSyncQueueCallback = externalSyncQueueCallback
     276            1 : }
     277              : 
     278            1 : func (si *pendingSyncsWithHighestSyncIndex) push(ps PendingSync) {
     279            1 :         ps2 := ps.(*PendingSyncIndex)
     280            1 :         si.index.Store(ps2.Index)
     281            1 : }
     282              : 
     283            0 : func (si *pendingSyncsWithHighestSyncIndex) setBlocked() {
     284            0 :         si.blocked.Store(true)
     285            0 : }
     286              : 
     287            1 : func (si *pendingSyncsWithHighestSyncIndex) clearBlocked() {
     288            1 :         si.blocked.Store(false)
     289            1 : }
     290              : 
     291            1 : func (si *pendingSyncsWithHighestSyncIndex) empty() bool {
     292            1 :         return si.load() == NoSyncIndex
     293            1 : }
     294              : 
     295            1 : func (si *pendingSyncsWithHighestSyncIndex) snapshotForPop() pendingSyncsSnapshot {
     296            1 :         si.snapshotBacking = PendingSyncIndex{Index: si.load()}
     297            1 :         return &si.snapshotBacking
     298            1 : }
     299              : 
     300            1 : func (si *pendingSyncsWithHighestSyncIndex) load() int64 {
     301            1 :         index := si.index.Load()
     302            1 :         if index != NoSyncIndex && si.blocked.Load() {
     303            0 :                 index = NoSyncIndex
     304            0 :         }
     305            1 :         return index
     306              : }
     307              : 
     308            1 : func (si *pendingSyncsWithHighestSyncIndex) pop(snap pendingSyncsSnapshot, err error) error {
     309            1 :         index := snap.(*PendingSyncIndex)
     310            1 :         if index.Index == NoSyncIndex {
     311            0 :                 return nil
     312            0 :         }
     313              :         // Set to NoSyncIndex if a higher index has not queued.
     314            1 :         si.index.CompareAndSwap(index.Index, NoSyncIndex)
     315            1 :         si.externalSyncQueueCallback(*index, err)
     316            1 :         return nil
     317              : }
     318              : 
     319              : // PendingSyncIndex implements both pendingSyncsSnapshot and PendingSync.
     320              : type PendingSyncIndex struct {
     321              :         // Index is some state meaningful to the user of LogWriter. The LogWriter
     322              :         // itself only examines whether Index is equal to NoSyncIndex.
     323              :         Index int64
     324              : }
     325              : 
     326            1 : func (s *PendingSyncIndex) empty() bool {
     327            1 :         return s.Index == NoSyncIndex
     328            1 : }
     329              : 
     330            1 : func (s *PendingSyncIndex) syncRequested() bool {
     331            1 :         return s.Index != NoSyncIndex
     332            1 : }
     333              : 
     334              : // flusherCond is a specialized condition variable that allows its condition to
     335              : // change and readiness be signalled without holding its associated mutex. In
     336              : // particular, when a waiter is added to syncQueue atomically, this condition
     337              : // variable can be signalled without holding flusher.Mutex.
     338              : type flusherCond struct {
     339              :         mu   *sync.Mutex
     340              :         q    pendingSyncs
     341              :         cond sync.Cond
     342              : }
     343              : 
     344            1 : func (c *flusherCond) init(mu *sync.Mutex, q pendingSyncs) {
     345            1 :         c.mu = mu
     346            1 :         c.q = q
     347            1 :         // Yes, this is a bit circular, but that is intentional. flusherCond.cond.L
     348            1 :         // points flusherCond so that when cond.L.Unlock is called flusherCond.Unlock
     349            1 :         // will be called and we can check the !syncQueue.empty() condition.
     350            1 :         c.cond.L = c
     351            1 : }
     352              : 
     353            1 : func (c *flusherCond) Signal() {
     354            1 :         // Pass-through to the cond var.
     355            1 :         c.cond.Signal()
     356            1 : }
     357              : 
     358            1 : func (c *flusherCond) Wait() {
     359            1 :         // Pass-through to the cond var. Note that internally the cond var implements
     360            1 :         // Wait as:
     361            1 :         //
     362            1 :         //   t := notifyListAdd()
     363            1 :         //   L.Unlock()
     364            1 :         //   notifyListWait(t)
     365            1 :         //   L.Lock()
     366            1 :         //
     367            1 :         // We've configured the cond var to call flusherReady.Unlock() which allows
     368            1 :         // us to check the !syncQueue.empty() condition without a danger of missing a
     369            1 :         // notification. Any call to flusherReady.Signal() after notifyListAdd() is
     370            1 :         // called will cause the subsequent notifyListWait() to return immediately.
     371            1 :         c.cond.Wait()
     372            1 : }
     373              : 
     374            1 : func (c *flusherCond) Lock() {
     375            1 :         c.mu.Lock()
     376            1 : }
     377              : 
     378            1 : func (c *flusherCond) Unlock() {
     379            1 :         c.mu.Unlock()
     380            1 :         if !c.q.empty() {
     381            1 :                 // If the current goroutine is about to block on sync.Cond.Wait, this call
     382            1 :                 // to Signal will prevent that. The comment in Wait above explains a bit
     383            1 :                 // about what is going on here, but it is worth reiterating:
     384            1 :                 //
     385            1 :                 //   flusherCond.Wait()
     386            1 :                 //     sync.Cond.Wait()
     387            1 :                 //       t := notifyListAdd()
     388            1 :                 //       flusherCond.Unlock()    <-- we are here
     389            1 :                 //       notifyListWait(t)
     390            1 :                 //       flusherCond.Lock()
     391            1 :                 //
     392            1 :                 // The call to Signal here results in:
     393            1 :                 //
     394            1 :                 //     sync.Cond.Signal()
     395            1 :                 //       notifyListNotifyOne()
     396            1 :                 //
     397            1 :                 // The call to notifyListNotifyOne() will prevent the call to
     398            1 :                 // notifyListWait(t) from blocking.
     399            1 :                 c.cond.Signal()
     400            1 :         }
     401              : }
     402              : 
     403              : type durationFunc func() time.Duration
     404              : 
     405              : // syncTimer is an interface for timers, modeled on the closure callback mode
     406              : // of time.Timer. See time.AfterFunc and LogWriter.afterFunc. syncTimer is used
     407              : // by tests to mock out the timer functionality used to implement
     408              : // min-sync-interval.
     409              : type syncTimer interface {
     410              :         Reset(time.Duration) bool
     411              :         Stop() bool
     412              : }
     413              : 
     414              : // LogWriter writes records to an underlying io.Writer. In order to support WAL
     415              : // file reuse, a LogWriter's records are tagged with the WAL's file
     416              : // number. When reading a log file a record from a previous incarnation of the
     417              : // file will return the error ErrInvalidLogNum.
     418              : type LogWriter struct {
     419              :         // w is the underlying writer.
     420              :         w io.Writer
     421              :         // c is w as a closer.
     422              :         c io.Closer
     423              :         // s is w as a syncer.
     424              :         s syncer
     425              :         // logNum is the low 32-bits of the log's file number.
     426              :         logNum uint32
     427              :         // blockNum is the zero based block number for the current block.
     428              :         blockNum int64
     429              :         // err is any accumulated error. It originates in flusher.err, and is
     430              :         // updated to reflect flusher.err when a block is full and getting enqueued.
     431              :         // Therefore, there is a lag between when flusher.err has a non-nil error,
     432              :         // and when that non-nil error is reflected in LogWriter.err. On close, it
     433              :         // is set to errClosedWriter to inform accidental future calls to
     434              :         // SyncRecord*.
     435              :         err error
     436              :         // block is the current block being written. Protected by flusher.Mutex.
     437              :         block *block
     438              :         free  struct {
     439              :                 sync.Mutex
     440              :                 blocks []*block
     441              :         }
     442              : 
     443              :         flusher struct {
     444              :                 sync.Mutex
     445              :                 // Flusher ready is a condition variable that is signalled when there are
     446              :                 // blocks to flush, syncing has been requested, or the LogWriter has been
     447              :                 // closed. For signalling of a sync, it is safe to call without holding
     448              :                 // flusher.Mutex.
     449              :                 ready flusherCond
     450              :                 // Set to true when the flush loop should be closed.
     451              :                 close bool
     452              :                 // Closed when the flush loop has terminated.
     453              :                 closed chan struct{}
     454              :                 // Accumulated flush error.
     455              :                 err error
     456              :                 // minSyncInterval is the minimum duration between syncs.
     457              :                 minSyncInterval durationFunc
     458              :                 fsyncLatency    prometheus.Histogram
     459              :                 pending         []*block
     460              :                 // Pushing and popping from pendingSyncs does not require flusher mutex to
     461              :                 // be held.
     462              :                 pendingSyncs pendingSyncs
     463              :                 metrics      *LogWriterMetrics
     464              :         }
     465              : 
     466              :         // afterFunc is a hook to allow tests to mock out the timer functionality
     467              :         // used for min-sync-interval. In normal operation this points to
     468              :         // time.AfterFunc.
     469              :         afterFunc func(d time.Duration, f func()) syncTimer
     470              : 
     471              :         // Backing for both pendingSyncs implementations.
     472              :         pendingSyncsBackingQ     pendingSyncsWithSyncQueue
     473              :         pendingSyncsBackingIndex pendingSyncsWithHighestSyncIndex
     474              : 
     475              :         pendingSyncForSyncQueueBacking pendingSyncForSyncQueue
     476              : 
     477              :         // syncedOffset is the offset in the log that is durably synced after a
     478              :         // flush. This member is used to write the WAL Sync chunk format's "Offset"
     479              :         // field in the header.
     480              :         syncedOffset atomic.Uint64
     481              : 
     482              :         // emitFragment is set at runtime depending on which FormatMajorVersion
     483              :         // is used. emitFragment will be set to writing WAL Sync chunk formats
     484              :         // if the FormatMajorVersion is greater than or equal to FormatWALSyncChunks,
     485              :         // otherwise it will write the recyclable chunk format.
     486              :         emitFragment func(n int, p []byte) (remainingP []byte)
     487              : }
     488              : 
     489              : // LogWriterConfig is a struct used for configuring new LogWriters
     490              : type LogWriterConfig struct {
     491              :         WALMinSyncInterval durationFunc
     492              :         WALFsyncLatency    prometheus.Histogram
     493              :         // QueueSemChan is an optional channel to pop from when popping from
     494              :         // LogWriter.flusher.syncQueue. It functions as a semaphore that prevents
     495              :         // the syncQueue from overflowing (which will cause a panic). All production
     496              :         // code ensures this is non-nil.
     497              :         QueueSemChan chan struct{}
     498              : 
     499              :         // ExternalSyncQueueCallback is set to non-nil when the LogWriter is used
     500              :         // as part of a WAL implementation that can failover between LogWriters.
     501              :         //
     502              :         // In this case, QueueSemChan is always nil, and SyncRecordGeneralized must
     503              :         // be used with a PendingSync parameter that is implemented by
     504              :         // PendingSyncIndex. When an index is synced (which implies all earlier
     505              :         // indices are also synced), this callback is invoked. The caller must not
     506              :         // hold any mutex when invoking this callback, since the lock ordering
     507              :         // requirement in this case is that any higher layer locks (in the wal
     508              :         // package) precede the lower layer locks (in the record package). These
     509              :         // callbacks are serialized since they are invoked from the flushLoop.
     510              :         ExternalSyncQueueCallback ExternalSyncQueueCallback
     511              : 
     512              :         // WriteWALSyncOffsets determines whether to write WAL sync chunk offsets.
     513              :         // The format major version can change (ratchet) at runtime, so this must be
     514              :         // a function rather than a static bool to ensure we use the latest format version.
     515              :         WriteWALSyncOffsets func() bool
     516              : }
     517              : 
     518              : // ExternalSyncQueueCallback is to be run when a PendingSync has been
     519              : // processed, either successfully or with an error.
     520              : type ExternalSyncQueueCallback func(doneSync PendingSyncIndex, err error)
     521              : 
     522              : // initialAllocatedBlocksCap is the initial capacity of the various slices
     523              : // intended to hold LogWriter blocks. The LogWriter may allocate more blocks
     524              : // than this threshold allows.
     525              : const initialAllocatedBlocksCap = 32
     526              : 
     527              : // blockPool pools *blocks to avoid allocations. Blocks are only added to the
     528              : // Pool when a LogWriter is closed. Before that, free blocks are maintained
     529              : // within a LogWriter's own internal free list `w.free.blocks`.
     530              : var blockPool = sync.Pool{
     531            1 :         New: func() any { return &block{} },
     532              : }
     533              : 
     534              : // NewLogWriter returns a new LogWriter.
     535              : //
     536              : // The io.Writer may also be used as an io.Closer and syncer. No other methods
     537              : // will be called on the writer.
     538              : func NewLogWriter(
     539              :         w io.Writer, logNum base.DiskFileNum, logWriterConfig LogWriterConfig,
     540            1 : ) *LogWriter {
     541            1 :         c, _ := w.(io.Closer)
     542            1 :         s, _ := w.(syncer)
     543            1 :         r := &LogWriter{
     544            1 :                 w: w,
     545            1 :                 c: c,
     546            1 :                 s: s,
     547            1 :                 // NB: we truncate the 64-bit log number to 32-bits. This is ok because a)
     548            1 :                 // we are very unlikely to reach a file number of 4 billion and b) the log
     549            1 :                 // number is used as a validation check and using only the low 32-bits is
     550            1 :                 // sufficient for that purpose.
     551            1 :                 logNum: uint32(logNum),
     552            1 :                 afterFunc: func(d time.Duration, f func()) syncTimer {
     553            0 :                         return time.AfterFunc(d, f)
     554            0 :                 },
     555              :         }
     556              : 
     557            1 :         if logWriterConfig.WriteWALSyncOffsets() {
     558            1 :                 r.emitFragment = r.emitFragmentSyncOffsets
     559            1 :         } else {
     560            1 :                 r.emitFragment = r.emitFragmentRecyclable
     561            1 :         }
     562              : 
     563            1 :         m := &LogWriterMetrics{}
     564            1 :         if logWriterConfig.ExternalSyncQueueCallback != nil {
     565            1 :                 r.pendingSyncsBackingIndex.init(logWriterConfig.ExternalSyncQueueCallback)
     566            1 :                 r.flusher.pendingSyncs = &r.pendingSyncsBackingIndex
     567            1 :         } else {
     568            1 :                 r.pendingSyncsBackingQ = pendingSyncsWithSyncQueue{
     569            1 :                         syncQueueLen: &m.SyncQueueLen,
     570            1 :                         queueSemChan: logWriterConfig.QueueSemChan,
     571            1 :                 }
     572            1 :                 r.flusher.pendingSyncs = &r.pendingSyncsBackingQ
     573            1 :         }
     574              : 
     575            1 :         r.free.blocks = make([]*block, 0, initialAllocatedBlocksCap)
     576            1 :         r.block = blockPool.Get().(*block)
     577            1 :         r.flusher.ready.init(&r.flusher.Mutex, r.flusher.pendingSyncs)
     578            1 :         r.flusher.closed = make(chan struct{})
     579            1 :         r.flusher.pending = make([]*block, 0, cap(r.free.blocks))
     580            1 :         r.flusher.metrics = m
     581            1 : 
     582            1 :         f := &r.flusher
     583            1 :         f.minSyncInterval = logWriterConfig.WALMinSyncInterval
     584            1 :         f.fsyncLatency = logWriterConfig.WALFsyncLatency
     585            1 : 
     586            1 :         go func() {
     587            1 :                 pprof.Do(context.Background(), walSyncLabels, r.flushLoop)
     588            1 :         }()
     589            1 :         return r
     590              : }
     591              : 
     592            1 : func (w *LogWriter) flushLoop(context.Context) {
     593            1 :         f := &w.flusher
     594            1 :         f.Lock()
     595            1 : 
     596            1 :         // Initialize idleStartTime to when the loop starts.
     597            1 :         idleStartTime := crtime.NowMono()
     598            1 :         var syncTimer syncTimer
     599            1 :         defer func() {
     600            1 :                 // Capture the idle duration between the last piece of work and when the
     601            1 :                 // loop terminated.
     602            1 :                 f.metrics.WriteThroughput.IdleDuration += idleStartTime.Elapsed()
     603            1 :                 if syncTimer != nil {
     604            0 :                         syncTimer.Stop()
     605            0 :                 }
     606            1 :                 close(f.closed)
     607            1 :                 f.Unlock()
     608              :         }()
     609              : 
     610              :         // writtenOffset is the amount of data that has been written
     611              :         // but not necessarily synced. This is used to update logWriter's
     612              :         // syncedOffset after a sync.
     613            1 :         var writtenOffset uint64 = 0
     614            1 : 
     615            1 :         // The flush loop performs flushing of full and partial data blocks to the
     616            1 :         // underlying writer (LogWriter.w), syncing of the writer, and notification
     617            1 :         // to sync requests that they have completed.
     618            1 :         //
     619            1 :         // - flusher.ready is a condition variable that is signalled when there is
     620            1 :         //   work to do. Full blocks are contained in flusher.pending. The current
     621            1 :         //   partial block is in LogWriter.block. And sync operations are held in
     622            1 :         //   flusher.syncQ.
     623            1 :         //
     624            1 :         // - The decision to sync is determined by whether there are any sync
     625            1 :         //   requests present in flusher.syncQ and whether enough time has elapsed
     626            1 :         //   since the last sync. If not enough time has elapsed since the last sync,
     627            1 :         //   flusher.syncQ.blocked will be set to 1. If syncing is blocked,
     628            1 :         //   syncQueue.empty() will return true and syncQueue.load() will return 0,0
     629            1 :         //   (i.e. an empty list).
     630            1 :         //
     631            1 :         // - flusher.syncQ.blocked is cleared by a timer that is initialized when
     632            1 :         //   blocked is set to 1. When blocked is 1, no syncing will take place, but
     633            1 :         //   flushing will continue to be performed. The on/off toggle for syncing
     634            1 :         //   does not need to be carefully synchronized with the rest of processing
     635            1 :         //   -- all we need to ensure is that after any transition to blocked=1 there
     636            1 :         //   is eventually a transition to blocked=0. syncTimer performs this
     637            1 :         //   transition. Note that any change to min-sync-interval will not take
     638            1 :         //   effect until the previous timer elapses.
     639            1 :         //
     640            1 :         // - Picking up the syncing work to perform requires coordination with
     641            1 :         //   picking up the flushing work. Specifically, flushing work is queued
     642            1 :         //   before syncing work. The guarantee of this code is that when a sync is
     643            1 :         //   requested, any previously queued flush work will be synced. This
     644            1 :         //   motivates reading the syncing work (f.syncQ.load()) before picking up
     645            1 :         //   the flush work (w.block.written.Load()).
     646            1 : 
     647            1 :         // The list of full blocks that need to be written. This is copied from
     648            1 :         // f.pending on every loop iteration, though the number of elements is
     649            1 :         // usually small (most frequently 1). In the case of the WAL LogWriter, the
     650            1 :         // number of blocks is bounded by the size of the WAL's corresponding
     651            1 :         // memtable (MemtableSize/BlockSize). With the default 64 MiB memtables,
     652            1 :         // this works out to at most 2048 elements if the entirety of the memtable's
     653            1 :         // contents are queued.
     654            1 :         pending := make([]*block, 0, cap(f.pending))
     655            1 :         for {
     656            1 :                 for {
     657            1 :                         // Grab the portion of the current block that requires flushing. Note that
     658            1 :                         // the current block can be added to the pending blocks list after we release
     659            1 :                         // the flusher lock, but it won't be part of pending.
     660            1 :                         written := w.block.written.Load()
     661            1 :                         if len(f.pending) > 0 || written > w.block.flushed || !f.pendingSyncs.empty() {
     662            1 :                                 break
     663              :                         }
     664            1 :                         if f.close {
     665            1 :                                 // If the writer is closed, pretend the sync timer fired immediately so
     666            1 :                                 // that we can process any queued sync requests.
     667            1 :                                 f.pendingSyncs.clearBlocked()
     668            1 :                                 if !f.pendingSyncs.empty() {
     669            0 :                                         break
     670              :                                 }
     671            1 :                                 return
     672              :                         }
     673            1 :                         f.ready.Wait()
     674            1 :                         continue
     675              :                 }
     676              :                 // Found work to do, so no longer idle.
     677              :                 //
     678              :                 // NB: it is safe to read pending before loading from the syncQ since
     679              :                 // mutations to pending require the w.flusher mutex, which is held here.
     680              :                 // There is no risk that someone will concurrently add to pending, so the
     681              :                 // following sequence, which would pick up a syncQ entry without the
     682              :                 // corresponding data, is impossible:
     683              :                 //
     684              :                 // Thread enqueueing       This thread
     685              :                 //                         1. read pending
     686              :                 // 2. add block to pending
     687              :                 // 3. add to syncQ
     688              :                 //                         4. read syncQ
     689            1 :                 workStartTime := crtime.NowMono()
     690            1 :                 idleDuration := workStartTime.Sub(idleStartTime)
     691            1 :                 pending = append(pending[:0], f.pending...)
     692            1 :                 f.pending = f.pending[:0]
     693            1 :                 f.metrics.PendingBufferLen.AddSample(int64(len(pending)))
     694            1 : 
     695            1 :                 // Grab the list of sync waiters. Note that syncQueue.load() will return
     696            1 :                 // 0,0 while we're waiting for the min-sync-interval to expire. This
     697            1 :                 // allows flushing to proceed even if we're not ready to sync.
     698            1 :                 snap := f.pendingSyncs.snapshotForPop()
     699            1 : 
     700            1 :                 // Grab the portion of the current block that requires flushing. Note that
     701            1 :                 // the current block can be added to the pending blocks list after we
     702            1 :                 // release the flusher lock, but it won't be part of pending. This has to
     703            1 :                 // be ordered after we get the list of sync waiters from syncQ in order to
     704            1 :                 // prevent a race where a waiter adds itself to syncQ, but this thread
     705            1 :                 // picks up the entry in syncQ and not the buffered data.
     706            1 :                 written := w.block.written.Load()
     707            1 :                 data := w.block.buf[w.block.flushed:written]
     708            1 :                 w.block.flushed = written
     709            1 : 
     710            1 :                 fErr := f.err
     711            1 :                 f.Unlock()
     712            1 :                 // If flusher has an error, we propagate it to waiters. Note in spite of
     713            1 :                 // error we consume the pending list above to free blocks for writers.
     714            1 :                 if fErr != nil {
     715            0 :                         // NB: pop may invoke ExternalSyncQueueCallback, which is why we have
     716            0 :                         // called f.Unlock() above. We will acquire the lock again below.
     717            0 :                         _ = f.pendingSyncs.pop(snap, fErr)
     718            0 :                         // Update the idleStartTime if work could not be done, so that we don't
     719            0 :                         // include the duration we tried to do work as idle. We don't bother
     720            0 :                         // with the rest of the accounting, which means we will undercount.
     721            0 :                         idleStartTime = crtime.NowMono()
     722            0 :                         f.Lock()
     723            0 :                         continue
     724              :                 }
     725            1 :                 writtenOffset += uint64(len(data))
     726            1 :                 synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, snap)
     727            1 :                 f.Lock()
     728            1 :                 if synced && f.fsyncLatency != nil {
     729            1 :                         w.syncedOffset.Store(writtenOffset)
     730            1 :                         f.fsyncLatency.Observe(float64(syncLatency))
     731            1 :                 }
     732            1 :                 f.err = err
     733            1 :                 if f.err != nil {
     734            0 :                         f.pendingSyncs.clearBlocked()
     735            0 :                         // Update the idleStartTime if work could not be done, so that we don't
     736            0 :                         // include the duration we tried to do work as idle. We don't bother
     737            0 :                         // with the rest of the accounting, which means we will undercount.
     738            0 :                         idleStartTime = crtime.NowMono()
     739            0 :                         continue
     740              :                 }
     741              : 
     742            1 :                 if synced && f.minSyncInterval != nil {
     743            0 :                         // A sync was performed. Make sure we've waited for the min sync
     744            0 :                         // interval before syncing again.
     745            0 :                         if min := f.minSyncInterval(); min > 0 {
     746            0 :                                 f.pendingSyncs.setBlocked()
     747            0 :                                 if syncTimer == nil {
     748            0 :                                         syncTimer = w.afterFunc(min, func() {
     749            0 :                                                 f.pendingSyncs.clearBlocked()
     750            0 :                                                 f.ready.Signal()
     751            0 :                                         })
     752            0 :                                 } else {
     753            0 :                                         syncTimer.Reset(min)
     754            0 :                                 }
     755              :                         }
     756              :                 }
     757              :                 // Finished work, and started idling.
     758            1 :                 idleStartTime = crtime.NowMono()
     759            1 :                 workDuration := idleStartTime.Sub(workStartTime)
     760            1 :                 f.metrics.WriteThroughput.Bytes += bytesWritten
     761            1 :                 f.metrics.WriteThroughput.WorkDuration += workDuration
     762            1 :                 f.metrics.WriteThroughput.IdleDuration += idleDuration
     763              :         }
     764              : }
     765              : 
     766              : func (w *LogWriter) flushPending(
     767              :         data []byte, pending []*block, snap pendingSyncsSnapshot,
     768            1 : ) (synced bool, syncLatency time.Duration, bytesWritten int64, err error) {
     769            1 :         defer func() {
     770            1 :                 // Translate panics into errors. The errors will cause flushLoop to shut
     771            1 :                 // down, but allows us to do so in a controlled way and avoid swallowing
     772            1 :                 // the stack that created the panic if panic'ing itself hits a panic
     773            1 :                 // (e.g. unlock of unlocked mutex).
     774            1 :                 if r := recover(); r != nil {
     775            0 :                         err = errors.Newf("%v", r)
     776            0 :                 }
     777              :         }()
     778              : 
     779            1 :         for _, b := range pending {
     780            0 :                 bytesWritten += blockSize - int64(b.flushed)
     781            0 :                 if err = w.flushBlock(b); err != nil {
     782            0 :                         break
     783              :                 }
     784              :         }
     785            1 :         if n := len(data); err == nil && n > 0 {
     786            1 :                 bytesWritten += int64(n)
     787            1 :                 _, err = w.w.Write(data)
     788            1 :         }
     789              : 
     790            1 :         synced = !snap.empty()
     791            1 :         if synced {
     792            1 :                 if err == nil && w.s != nil {
     793            1 :                         syncLatency, err = w.syncWithLatency()
     794            1 :                 } else {
     795            0 :                         synced = false
     796            0 :                 }
     797            1 :                 f := &w.flusher
     798            1 :                 if popErr := f.pendingSyncs.pop(snap, err); popErr != nil {
     799            0 :                         return synced, syncLatency, bytesWritten, firstError(err, popErr)
     800            0 :                 }
     801              :         }
     802              : 
     803            1 :         return synced, syncLatency, bytesWritten, err
     804              : }
     805              : 
     806            1 : func (w *LogWriter) syncWithLatency() (time.Duration, error) {
     807            1 :         start := crtime.NowMono()
     808            1 :         err := w.s.Sync()
     809            1 :         syncLatency := start.Elapsed()
     810            1 :         return syncLatency, err
     811            1 : }
     812              : 
     813            0 : func (w *LogWriter) flushBlock(b *block) error {
     814            0 :         if _, err := w.w.Write(b.buf[b.flushed:]); err != nil {
     815            0 :                 return err
     816            0 :         }
     817            0 :         b.written.Store(0)
     818            0 :         b.flushed = 0
     819            0 :         w.free.Lock()
     820            0 :         w.free.blocks = append(w.free.blocks, b)
     821            0 :         w.free.Unlock()
     822            0 :         return nil
     823              : }
     824              : 
     825              : // queueBlock queues the current block for writing to the underlying writer,
     826              : // allocates a new block and reserves space for the next header.
     827            0 : func (w *LogWriter) queueBlock() {
     828            0 :         // Allocate a new block, blocking until one is available. We do this first
     829            0 :         // because w.block is protected by w.flusher.Mutex.
     830            0 :         w.free.Lock()
     831            0 :         if len(w.free.blocks) == 0 {
     832            0 :                 w.free.blocks = append(w.free.blocks, blockPool.Get().(*block))
     833            0 :         }
     834            0 :         nextBlock := w.free.blocks[len(w.free.blocks)-1]
     835            0 :         w.free.blocks = w.free.blocks[:len(w.free.blocks)-1]
     836            0 :         w.free.Unlock()
     837            0 : 
     838            0 :         f := &w.flusher
     839            0 :         f.Lock()
     840            0 :         f.pending = append(f.pending, w.block)
     841            0 :         w.block = nextBlock
     842            0 :         f.ready.Signal()
     843            0 :         w.err = w.flusher.err
     844            0 :         f.Unlock()
     845            0 : 
     846            0 :         w.blockNum++
     847              : }
     848              : 
     849              : // Close flushes and syncs any unwritten data and closes the writer.
     850              : // Where required, external synchronisation is provided by commitPipeline.mu.
     851            1 : func (w *LogWriter) Close() error {
     852            1 :         return w.closeInternal(PendingSyncIndex{Index: NoSyncIndex})
     853            1 : }
     854              : 
     855              : // CloseWithLastQueuedRecord is like Close, but optionally accepts a
     856              : // lastQueuedRecord, that the caller will be notified about when synced.
     857            1 : func (w *LogWriter) CloseWithLastQueuedRecord(lastQueuedRecord PendingSyncIndex) error {
     858            1 :         return w.closeInternal(lastQueuedRecord)
     859            1 : }
     860              : 
     861            1 : func (w *LogWriter) closeInternal(lastQueuedRecord PendingSyncIndex) error {
     862            1 :         f := &w.flusher
     863            1 : 
     864            1 :         // Emit an EOF trailer signifying the end of this log. This helps readers
     865            1 :         // differentiate between a corrupted entry in the middle of a log from
     866            1 :         // garbage at the tail from a recycled log file.
     867            1 :         w.emitEOFTrailer()
     868            1 : 
     869            1 :         // Signal the flush loop to close.
     870            1 :         f.Lock()
     871            1 :         f.close = true
     872            1 :         f.ready.Signal()
     873            1 :         f.Unlock()
     874            1 : 
     875            1 :         // Wait for the flush loop to close. The flush loop will not close until all
     876            1 :         // pending data has been written or an error occurs.
     877            1 :         <-f.closed
     878            1 : 
     879            1 :         // Sync any flushed data to disk. NB: flushLoop will sync after flushing the
     880            1 :         // last buffered data only if it was requested via syncQ, so we need to sync
     881            1 :         // here to ensure that all the data is synced.
     882            1 :         err := w.flusher.err
     883            1 :         var syncLatency time.Duration
     884            1 :         if err == nil && w.s != nil {
     885            1 :                 syncLatency, err = w.syncWithLatency()
     886            1 :         }
     887            1 :         f.Lock()
     888            1 :         if err == nil && f.fsyncLatency != nil {
     889            1 :                 f.fsyncLatency.Observe(float64(syncLatency))
     890            1 :         }
     891            1 :         free := w.free.blocks
     892            1 :         f.Unlock()
     893            1 : 
     894            1 :         // NB: the caller of closeInternal may not care about a non-nil cerr below
     895            1 :         // if all queued writes have been successfully written and synced.
     896            1 :         if lastQueuedRecord.Index != NoSyncIndex {
     897            1 :                 w.pendingSyncsBackingIndex.externalSyncQueueCallback(lastQueuedRecord, err)
     898            1 :         }
     899            1 :         if w.c != nil {
     900            1 :                 cerr := w.c.Close()
     901            1 :                 w.c = nil
     902            1 :                 err = firstError(err, cerr)
     903            1 :         }
     904              : 
     905            1 :         for _, b := range free {
     906            0 :                 b.flushed = 0
     907            0 :                 b.written.Store(0)
     908            0 :                 blockPool.Put(b)
     909            0 :         }
     910              : 
     911            1 :         w.err = errClosedWriter
     912            1 :         return err
     913              : }
     914              : 
     915              : // firstError returns the first non-nil error of err0 and err1, or nil if both
     916              : // are nil.
     917            1 : func firstError(err0, err1 error) error {
     918            1 :         if err0 != nil {
     919            0 :                 return err0
     920            0 :         }
     921            1 :         return err1
     922              : }
     923              : 
     924              : // WriteRecord writes a complete record. Returns the offset just past the end
     925              : // of the record.
     926              : // External synchronisation provided by commitPipeline.mu.
     927            1 : func (w *LogWriter) WriteRecord(p []byte) (int64, error) {
     928            1 :         logSize, err := w.SyncRecord(p, nil, nil)
     929            1 :         return logSize, err
     930            1 : }
     931              : 
     932              : // SyncRecord writes a complete record. If wg != nil the record will be
     933              : // asynchronously persisted to the underlying writer and done will be called on
     934              : // the wait group upon completion. Returns the offset just past the end of the
     935              : // record.
     936              : // External synchronisation provided by commitPipeline.mu.
     937              : func (w *LogWriter) SyncRecord(
     938              :         p []byte, wg *sync.WaitGroup, err *error,
     939            1 : ) (logSize int64, err2 error) {
     940            1 :         w.pendingSyncForSyncQueueBacking = pendingSyncForSyncQueue{
     941            1 :                 wg:  wg,
     942            1 :                 err: err,
     943            1 :         }
     944            1 :         return w.SyncRecordGeneralized(p, &w.pendingSyncForSyncQueueBacking)
     945            1 : }
     946              : 
     947              : // SyncRecordGeneralized is a version of SyncRecord that accepts a
     948              : // PendingSync.
     949            1 : func (w *LogWriter) SyncRecordGeneralized(p []byte, ps PendingSync) (logSize int64, err2 error) {
     950            1 :         if w.err != nil {
     951            0 :                 return -1, w.err
     952            0 :         }
     953              : 
     954              :         // The `i == 0` condition ensures we handle empty records. Such records can
     955              :         // possibly be generated for VersionEdits stored in the MANIFEST. While the
     956              :         // MANIFEST is currently written using Writer, it is good to support the same
     957              :         // semantics with LogWriter.
     958            1 :         for i := 0; i == 0 || len(p) > 0; i++ {
     959            1 :                 p = w.emitFragment(i, p)
     960            1 :         }
     961              : 
     962            1 :         if ps.syncRequested() {
     963            1 :                 // If we've been asked to persist the record, add the WaitGroup to the sync
     964            1 :                 // queue and signal the flushLoop. Note that flushLoop will write partial
     965            1 :                 // blocks to the file if syncing has been requested. The contract is that
     966            1 :                 // any record written to the LogWriter to this point will be flushed to the
     967            1 :                 // OS and synced to disk.
     968            1 :                 f := &w.flusher
     969            1 :                 f.pendingSyncs.push(ps)
     970            1 :                 f.ready.Signal()
     971            1 :         }
     972              : 
     973            1 :         offset := w.blockNum*blockSize + int64(w.block.written.Load())
     974            1 :         // Note that we don't return w.err here as a concurrent call to Close would
     975            1 :         // race with our read. That's ok because the only error we could be seeing is
     976            1 :         // one to syncing for which the caller can receive notification of by passing
     977            1 :         // in a non-nil err argument.
     978            1 :         return offset, nil
     979              : }
     980              : 
     981              : // Size returns the current size of the file.
     982              : // External synchronisation provided by commitPipeline.mu.
     983            1 : func (w *LogWriter) Size() int64 {
     984            1 :         return w.blockNum*blockSize + int64(w.block.written.Load())
     985            1 : }
     986              : 
     987              : // emitEOFTrailer writes a special recyclable chunk header to signal EOF.
     988              : // The reason why this function writes the recyclable chunk header instead
     989              : // of having a function for writing recyclable and WAL sync chunks as
     990              : // emitFragment does it because there is no reason to add 8 additional
     991              : // bytes to the EOFTrailer for the SyncedOffset as it will be zeroed out anyway.
     992            1 : func (w *LogWriter) emitEOFTrailer() {
     993            1 :         // Write a recyclable chunk header with a different log number.  Readers
     994            1 :         // will treat the header as EOF when the log number does not match.
     995            1 :         b := w.block
     996            1 :         i := b.written.Load()
     997            1 :         binary.LittleEndian.PutUint32(b.buf[i+0:i+4], 0) // CRC
     998            1 :         binary.LittleEndian.PutUint16(b.buf[i+4:i+6], 0) // Size
     999            1 :         b.buf[i+6] = recyclableFullChunkEncoding
    1000            1 :         binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum+1) // Log number
    1001            1 :         b.written.Store(i + int32(recyclableHeaderSize))
    1002            1 : }
    1003              : 
    1004            1 : func (w *LogWriter) emitFragmentRecyclable(n int, p []byte) (remainingP []byte) {
    1005            1 :         b := w.block
    1006            1 :         i := b.written.Load()
    1007            1 :         first := n == 0
    1008            1 :         last := blockSize-i-recyclableHeaderSize >= int32(len(p))
    1009            1 : 
    1010            1 :         if last {
    1011            1 :                 if first {
    1012            1 :                         b.buf[i+6] = recyclableFullChunkEncoding
    1013            1 :                 } else {
    1014            0 :                         b.buf[i+6] = recyclableLastChunkEncoding
    1015            0 :                 }
    1016            0 :         } else {
    1017            0 :                 if first {
    1018            0 :                         b.buf[i+6] = recyclableFirstChunkEncoding
    1019            0 :                 } else {
    1020            0 :                         b.buf[i+6] = recyclableMiddleChunkEncoding
    1021            0 :                 }
    1022              :         }
    1023              : 
    1024            1 :         binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum)
    1025            1 : 
    1026            1 :         r := copy(b.buf[i+recyclableHeaderSize:], p)
    1027            1 :         j := i + int32(recyclableHeaderSize+r)
    1028            1 :         binary.LittleEndian.PutUint32(b.buf[i+0:i+4], crc.New(b.buf[i+6:j]).Value())
    1029            1 :         binary.LittleEndian.PutUint16(b.buf[i+4:i+6], uint16(r))
    1030            1 :         b.written.Store(j)
    1031            1 : 
    1032            1 :         if blockSize-b.written.Load() < recyclableHeaderSize {
    1033            0 :                 // There is no room for another fragment in the block, so fill the
    1034            0 :                 // remaining bytes with zeros and queue the block for flushing.
    1035            0 :                 clear(b.buf[b.written.Load():])
    1036            0 :                 w.queueBlock()
    1037            0 :         }
    1038            1 :         return p[r:]
    1039              : }
    1040              : 
    1041            1 : func (w *LogWriter) emitFragmentSyncOffsets(n int, p []byte) (remainingP []byte) {
    1042            1 :         b := w.block
    1043            1 :         i := b.written.Load()
    1044            1 :         first := n == 0
    1045            1 :         last := blockSize-i-walSyncHeaderSize >= int32(len(p))
    1046            1 : 
    1047            1 :         if last {
    1048            1 :                 if first {
    1049            1 :                         b.buf[i+6] = walSyncFullChunkEncoding
    1050            1 :                 } else {
    1051            0 :                         b.buf[i+6] = walSyncLastChunkEncoding
    1052            0 :                 }
    1053            0 :         } else {
    1054            0 :                 if first {
    1055            0 :                         b.buf[i+6] = walSyncFirstChunkEncoding
    1056            0 :                 } else {
    1057            0 :                         b.buf[i+6] = walSyncMiddleChunkEncoding
    1058            0 :                 }
    1059              :         }
    1060              : 
    1061            1 :         binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum)
    1062            1 :         binary.LittleEndian.PutUint64(b.buf[i+11:i+19], w.syncedOffset.Load())
    1063            1 : 
    1064            1 :         r := copy(b.buf[i+walSyncHeaderSize:], p)
    1065            1 :         j := i + int32(walSyncHeaderSize+r)
    1066            1 :         binary.LittleEndian.PutUint32(b.buf[i+0:i+4], crc.New(b.buf[i+6:j]).Value())
    1067            1 :         binary.LittleEndian.PutUint16(b.buf[i+4:i+6], uint16(r))
    1068            1 :         b.written.Store(j)
    1069            1 : 
    1070            1 :         if blockSize-b.written.Load() < walSyncHeaderSize {
    1071            0 :                 // There is no room for another fragment in the block, so fill the
    1072            0 :                 // remaining bytes with zeros and queue the block for flushing.
    1073            0 :                 clear(b.buf[b.written.Load():])
    1074            0 :                 w.queueBlock()
    1075            0 :         }
    1076            1 :         return p[r:]
    1077              : }
    1078              : 
    1079              : // Metrics must typically be called after Close, since the callee will no
    1080              : // longer modify the returned LogWriterMetrics. It is also current if there is
    1081              : // nothing left to flush in the flush loop, but that is an implementation
    1082              : // detail that callers should not rely on.
    1083            1 : func (w *LogWriter) Metrics() LogWriterMetrics {
    1084            1 :         w.flusher.Lock()
    1085            1 :         defer w.flusher.Unlock()
    1086            1 :         m := *w.flusher.metrics
    1087            1 :         return m
    1088            1 : }
    1089              : 
    1090              : // LogWriterMetrics contains misc metrics for the log writer.
    1091              : type LogWriterMetrics struct {
    1092              :         WriteThroughput  base.ThroughputMetric
    1093              :         PendingBufferLen base.GaugeSampleMetric
    1094              :         SyncQueueLen     base.GaugeSampleMetric
    1095              : }
    1096              : 
    1097              : // Merge merges metrics from x. Requires that x is non-nil.
    1098            1 : func (m *LogWriterMetrics) Merge(x *LogWriterMetrics) error {
    1099            1 :         m.WriteThroughput.Merge(x.WriteThroughput)
    1100            1 :         m.PendingBufferLen.Merge(x.PendingBufferLen)
    1101            1 :         m.SyncQueueLen.Merge(x.SyncQueueLen)
    1102            1 :         return nil
    1103            1 : }
        

Generated by: LCOV version 2.0-1