LCOV - code coverage report
Current view: top level - pebble/record - log_writer.go (source / functions) Hit Total Coverage
Test: 2023-12-11 08:16Z f16e0f48 - tests only.lcov Lines: 417 433 96.3 %
Date: 2023-12-11 08:16:34 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package record
       6             : 
       7             : import (
       8             :         "context"
       9             :         "encoding/binary"
      10             :         "io"
      11             :         "runtime/pprof"
      12             :         "sync"
      13             :         "sync/atomic"
      14             :         "time"
      15             : 
      16             :         "github.com/cockroachdb/errors"
      17             :         "github.com/cockroachdb/pebble/internal/base"
      18             :         "github.com/cockroachdb/pebble/internal/crc"
      19             :         "github.com/prometheus/client_golang/prometheus"
      20             : )
      21             : 
      22             : var walSyncLabels = pprof.Labels("pebble", "wal-sync")
      23             : var errClosedWriter = errors.New("pebble/record: closed LogWriter")
      24             : 
      25             : type block struct {
      26             :         // buf[:written] has already been filled with fragments. Updated atomically.
      27             :         written atomic.Int32
      28             :         // buf[:flushed] has already been flushed to w.
      29             :         flushed int32
      30             :         buf     [blockSize]byte
      31             : }
      32             : 
      33             : type flusher interface {
      34             :         Flush() error
      35             : }
      36             : 
      37             : type syncer interface {
      38             :         Sync() error
      39             : }
      40             : 
      41             : const (
      42             :         syncConcurrencyBits = 12
      43             : 
      44             :         // SyncConcurrency is the maximum number of concurrent sync operations that
      45             :         // can be performed. Note that a sync operation is initiated either by a call
      46             :         // to SyncRecord or by a call to Close. Exported as this value also limits
      47             :         // the commit concurrency in commitPipeline.
      48             :         SyncConcurrency = 1 << syncConcurrencyBits
      49             : )
      50             : 
      51             : type syncSlot struct {
      52             :         wg  *sync.WaitGroup
      53             :         err *error
      54             : }
      55             : 
      56             : // syncQueue is a lock-free fixed-size single-producer, single-consumer
      57             : // queue. The single-producer can push to the head, and the single-consumer can
      58             : // pop multiple values from the tail. Popping calls Done() on each of the
      59             : // available *sync.WaitGroup elements.
      60             : type syncQueue struct {
      61             :         // headTail packs together a 32-bit head index and a 32-bit tail index. Both
      62             :         // are indexes into slots modulo len(slots)-1.
      63             :         //
      64             :         // tail = index of oldest data in queue
      65             :         // head = index of next slot to fill
      66             :         //
      67             :         // Slots in the range [tail, head) are owned by consumers.  A consumer
      68             :         // continues to own a slot outside this range until it nils the slot, at
      69             :         // which point ownership passes to the producer.
      70             :         //
      71             :         // The head index is stored in the most-significant bits so that we can
      72             :         // atomically add to it and the overflow is harmless.
      73             :         headTail atomic.Uint64
      74             : 
      75             :         // slots is a ring buffer of values stored in this queue. The size must be a
      76             :         // power of 2. A slot is in use until the tail index has moved beyond it.
      77             :         slots [SyncConcurrency]syncSlot
      78             : 
      79             :         // blocked is an atomic boolean which indicates whether syncing is currently
      80             :         // blocked or can proceed. It is used by the implementation of
      81             :         // min-sync-interval to block syncing until the min interval has passed.
      82             :         blocked atomic.Bool
      83             : }
      84             : 
      85             : const dequeueBits = 32
      86             : 
      87           1 : func (q *syncQueue) unpack(ptrs uint64) (head, tail uint32) {
      88           1 :         const mask = 1<<dequeueBits - 1
      89           1 :         head = uint32((ptrs >> dequeueBits) & mask)
      90           1 :         tail = uint32(ptrs & mask)
      91           1 :         return
      92           1 : }
      93             : 
      94           1 : func (q *syncQueue) push(wg *sync.WaitGroup, err *error) {
      95           1 :         ptrs := q.headTail.Load()
      96           1 :         head, tail := q.unpack(ptrs)
      97           1 :         if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
      98           0 :                 panic("pebble: queue is full")
      99             :         }
     100             : 
     101           1 :         slot := &q.slots[head&uint32(len(q.slots)-1)]
     102           1 :         slot.wg = wg
     103           1 :         slot.err = err
     104           1 : 
     105           1 :         // Increment head. This passes ownership of slot to dequeue and acts as a
     106           1 :         // store barrier for writing the slot.
     107           1 :         q.headTail.Add(1 << dequeueBits)
     108             : }
     109             : 
     110           1 : func (q *syncQueue) setBlocked() {
     111           1 :         q.blocked.Store(true)
     112           1 : }
     113             : 
     114           1 : func (q *syncQueue) clearBlocked() {
     115           1 :         q.blocked.Store(false)
     116           1 : }
     117             : 
     118           1 : func (q *syncQueue) empty() bool {
     119           1 :         head, tail, _ := q.load()
     120           1 :         return head == tail
     121           1 : }
     122             : 
     123             : // load returns the head, tail of the queue for what should be synced to the
     124             : // caller. It can return a head, tail of zero if syncing is blocked due to
     125             : // min-sync-interval. It additionally returns the real length of this queue,
     126             : // regardless of whether syncing is blocked.
     127           1 : func (q *syncQueue) load() (head, tail, realLength uint32) {
     128           1 :         ptrs := q.headTail.Load()
     129           1 :         head, tail = q.unpack(ptrs)
     130           1 :         realLength = head - tail
     131           1 :         if q.blocked.Load() {
     132           1 :                 return 0, 0, realLength
     133           1 :         }
     134           1 :         return head, tail, realLength
     135             : }
     136             : 
     137             : // REQUIRES: queueSemChan is non-nil.
     138           1 : func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{}) error {
     139           1 :         if tail == head {
     140           1 :                 // Queue is empty.
     141           1 :                 return nil
     142           1 :         }
     143             : 
     144           1 :         for ; tail != head; tail++ {
     145           1 :                 slot := &q.slots[tail&uint32(len(q.slots)-1)]
     146           1 :                 wg := slot.wg
     147           1 :                 if wg == nil {
     148           0 :                         return errors.Errorf("nil waiter at %d", errors.Safe(tail&uint32(len(q.slots)-1)))
     149           0 :                 }
     150           1 :                 *slot.err = err
     151           1 :                 slot.wg = nil
     152           1 :                 slot.err = nil
     153           1 :                 // We need to bump the tail count before signalling the wait group as
     154           1 :                 // signalling the wait group can trigger release a blocked goroutine which
     155           1 :                 // will try to enqueue before we've "freed" space in the queue.
     156           1 :                 q.headTail.Add(1)
     157           1 :                 wg.Done()
     158           1 :                 // Is always non-nil in production.
     159           1 :                 if queueSemChan != nil {
     160           1 :                         <-queueSemChan
     161           1 :                 }
     162             :         }
     163             : 
     164           1 :         return nil
     165             : }
     166             : 
     167             : // flusherCond is a specialized condition variable that allows its condition to
     168             : // change and readiness be signalled without holding its associated mutex. In
     169             : // particular, when a waiter is added to syncQueue atomically, this condition
     170             : // variable can be signalled without holding flusher.Mutex.
     171             : type flusherCond struct {
     172             :         mu   *sync.Mutex
     173             :         q    *syncQueue
     174             :         cond sync.Cond
     175             : }
     176             : 
     177           1 : func (c *flusherCond) init(mu *sync.Mutex, q *syncQueue) {
     178           1 :         c.mu = mu
     179           1 :         c.q = q
     180           1 :         // Yes, this is a bit circular, but that is intentional. flusherCond.cond.L
     181           1 :         // points flusherCond so that when cond.L.Unlock is called flusherCond.Unlock
     182           1 :         // will be called and we can check the !syncQueue.empty() condition.
     183           1 :         c.cond.L = c
     184           1 : }
     185             : 
     186           1 : func (c *flusherCond) Signal() {
     187           1 :         // Pass-through to the cond var.
     188           1 :         c.cond.Signal()
     189           1 : }
     190             : 
     191           1 : func (c *flusherCond) Wait() {
     192           1 :         // Pass-through to the cond var. Note that internally the cond var implements
     193           1 :         // Wait as:
     194           1 :         //
     195           1 :         //   t := notifyListAdd()
     196           1 :         //   L.Unlock()
     197           1 :         //   notifyListWait(t)
     198           1 :         //   L.Lock()
     199           1 :         //
     200           1 :         // We've configured the cond var to call flusherReady.Unlock() which allows
     201           1 :         // us to check the !syncQueue.empty() condition without a danger of missing a
     202           1 :         // notification. Any call to flusherReady.Signal() after notifyListAdd() is
     203           1 :         // called will cause the subsequent notifyListWait() to return immediately.
     204           1 :         c.cond.Wait()
     205           1 : }
     206             : 
     207           1 : func (c *flusherCond) Lock() {
     208           1 :         c.mu.Lock()
     209           1 : }
     210             : 
     211           1 : func (c *flusherCond) Unlock() {
     212           1 :         c.mu.Unlock()
     213           1 :         if !c.q.empty() {
     214           1 :                 // If the current goroutine is about to block on sync.Cond.Wait, this call
     215           1 :                 // to Signal will prevent that. The comment in Wait above explains a bit
     216           1 :                 // about what is going on here, but it is worth reiterating:
     217           1 :                 //
     218           1 :                 //   flusherCond.Wait()
     219           1 :                 //     sync.Cond.Wait()
     220           1 :                 //       t := notifyListAdd()
     221           1 :                 //       flusherCond.Unlock()    <-- we are here
     222           1 :                 //       notifyListWait(t)
     223           1 :                 //       flusherCond.Lock()
     224           1 :                 //
     225           1 :                 // The call to Signal here results in:
     226           1 :                 //
     227           1 :                 //     sync.Cond.Signal()
     228           1 :                 //       notifyListNotifyOne()
     229           1 :                 //
     230           1 :                 // The call to notifyListNotifyOne() will prevent the call to
     231           1 :                 // notifyListWait(t) from blocking.
     232           1 :                 c.cond.Signal()
     233           1 :         }
     234             : }
     235             : 
     236             : type durationFunc func() time.Duration
     237             : 
     238             : // syncTimer is an interface for timers, modeled on the closure callback mode
     239             : // of time.Timer. See time.AfterFunc and LogWriter.afterFunc. syncTimer is used
     240             : // by tests to mock out the timer functionality used to implement
     241             : // min-sync-interval.
     242             : type syncTimer interface {
     243             :         Reset(time.Duration) bool
     244             :         Stop() bool
     245             : }
     246             : 
     247             : // LogWriter writes records to an underlying io.Writer. In order to support WAL
     248             : // file reuse, a LogWriter's records are tagged with the WAL's file
     249             : // number. When reading a log file a record from a previous incarnation of the
     250             : // file will return the error ErrInvalidLogNum.
     251             : type LogWriter struct {
     252             :         // w is the underlying writer.
     253             :         w io.Writer
     254             :         // c is w as a closer.
     255             :         c io.Closer
     256             :         // s is w as a syncer.
     257             :         s syncer
     258             :         // logNum is the low 32-bits of the log's file number.
     259             :         logNum uint32
     260             :         // blockNum is the zero based block number for the current block.
     261             :         blockNum int64
     262             :         // err is any accumulated error. TODO(peter): This needs to be protected in
     263             :         // some fashion. Perhaps using atomic.Value.
     264             :         err error
     265             :         // block is the current block being written. Protected by flusher.Mutex.
     266             :         block *block
     267             :         free  struct {
     268             :                 sync.Mutex
     269             :                 blocks []*block
     270             :         }
     271             : 
     272             :         flusher struct {
     273             :                 sync.Mutex
     274             :                 // Flusher ready is a condition variable that is signalled when there are
     275             :                 // blocks to flush, syncing has been requested, or the LogWriter has been
     276             :                 // closed. For signalling of a sync, it is safe to call without holding
     277             :                 // flusher.Mutex.
     278             :                 ready flusherCond
     279             :                 // Set to true when the flush loop should be closed.
     280             :                 close bool
     281             :                 // Closed when the flush loop has terminated.
     282             :                 closed chan struct{}
     283             :                 // Accumulated flush error.
     284             :                 err error
     285             :                 // minSyncInterval is the minimum duration between syncs.
     286             :                 minSyncInterval durationFunc
     287             :                 fsyncLatency    prometheus.Histogram
     288             :                 pending         []*block
     289             :                 syncQ           syncQueue
     290             :                 metrics         *LogWriterMetrics
     291             :         }
     292             : 
     293             :         // afterFunc is a hook to allow tests to mock out the timer functionality
     294             :         // used for min-sync-interval. In normal operation this points to
     295             :         // time.AfterFunc.
     296             :         afterFunc func(d time.Duration, f func()) syncTimer
     297             : 
     298             :         // See the comment for LogWriterConfig.QueueSemChan.
     299             :         queueSemChan chan struct{}
     300             : }
     301             : 
     302             : // LogWriterConfig is a struct used for configuring new LogWriters
     303             : type LogWriterConfig struct {
     304             :         WALMinSyncInterval durationFunc
     305             :         WALFsyncLatency    prometheus.Histogram
     306             :         // QueueSemChan is an optional channel to pop from when popping from
     307             :         // LogWriter.flusher.syncQueue. It functions as a semaphore that prevents
     308             :         // the syncQueue from overflowing (which will cause a panic). All production
     309             :         // code ensures this is non-nil.
     310             :         QueueSemChan chan struct{}
     311             : }
     312             : 
     313             : // initialAllocatedBlocksCap is the initial capacity of the various slices
     314             : // intended to hold LogWriter blocks. The LogWriter may allocate more blocks
     315             : // than this threshold allows.
     316             : const initialAllocatedBlocksCap = 32
     317             : 
     318             : // blockPool pools *blocks to avoid allocations. Blocks are only added to the
     319             : // Pool when a LogWriter is closed. Before that, free blocks are maintained
     320             : // within a LogWriter's own internal free list `w.free.blocks`.
     321             : var blockPool = sync.Pool{
     322           1 :         New: func() any { return &block{} },
     323             : }
     324             : 
     325             : // NewLogWriter returns a new LogWriter.
     326             : func NewLogWriter(
     327             :         w io.Writer, logNum base.DiskFileNum, logWriterConfig LogWriterConfig,
     328           1 : ) *LogWriter {
     329           1 :         c, _ := w.(io.Closer)
     330           1 :         s, _ := w.(syncer)
     331           1 :         r := &LogWriter{
     332           1 :                 w: w,
     333           1 :                 c: c,
     334           1 :                 s: s,
     335           1 :                 // NB: we truncate the 64-bit log number to 32-bits. This is ok because a)
     336           1 :                 // we are very unlikely to reach a file number of 4 billion and b) the log
     337           1 :                 // number is used as a validation check and using only the low 32-bits is
     338           1 :                 // sufficient for that purpose.
     339           1 :                 logNum: uint32(logNum),
     340           1 :                 afterFunc: func(d time.Duration, f func()) syncTimer {
     341           0 :                         return time.AfterFunc(d, f)
     342           0 :                 },
     343             :                 queueSemChan: logWriterConfig.QueueSemChan,
     344             :         }
     345           1 :         r.free.blocks = make([]*block, 0, initialAllocatedBlocksCap)
     346           1 :         r.block = blockPool.Get().(*block)
     347           1 :         r.flusher.ready.init(&r.flusher.Mutex, &r.flusher.syncQ)
     348           1 :         r.flusher.closed = make(chan struct{})
     349           1 :         r.flusher.pending = make([]*block, 0, cap(r.free.blocks))
     350           1 :         r.flusher.metrics = &LogWriterMetrics{}
     351           1 : 
     352           1 :         f := &r.flusher
     353           1 :         f.minSyncInterval = logWriterConfig.WALMinSyncInterval
     354           1 :         f.fsyncLatency = logWriterConfig.WALFsyncLatency
     355           1 : 
     356           1 :         go func() {
     357           1 :                 pprof.Do(context.Background(), walSyncLabels, r.flushLoop)
     358           1 :         }()
     359           1 :         return r
     360             : }
     361             : 
     362           1 : func (w *LogWriter) flushLoop(context.Context) {
     363           1 :         f := &w.flusher
     364           1 :         f.Lock()
     365           1 : 
     366           1 :         // Initialize idleStartTime to when the loop starts.
     367           1 :         idleStartTime := time.Now()
     368           1 :         var syncTimer syncTimer
     369           1 :         defer func() {
     370           1 :                 // Capture the idle duration between the last piece of work and when the
     371           1 :                 // loop terminated.
     372           1 :                 f.metrics.WriteThroughput.IdleDuration += time.Since(idleStartTime)
     373           1 :                 if syncTimer != nil {
     374           1 :                         syncTimer.Stop()
     375           1 :                 }
     376           1 :                 close(f.closed)
     377           1 :                 f.Unlock()
     378             :         }()
     379             : 
     380             :         // The flush loop performs flushing of full and partial data blocks to the
     381             :         // underlying writer (LogWriter.w), syncing of the writer, and notification
     382             :         // to sync requests that they have completed.
     383             :         //
     384             :         // - flusher.ready is a condition variable that is signalled when there is
     385             :         //   work to do. Full blocks are contained in flusher.pending. The current
     386             :         //   partial block is in LogWriter.block. And sync operations are held in
     387             :         //   flusher.syncQ.
     388             :         //
     389             :         // - The decision to sync is determined by whether there are any sync
     390             :         //   requests present in flusher.syncQ and whether enough time has elapsed
     391             :         //   since the last sync. If not enough time has elapsed since the last sync,
     392             :         //   flusher.syncQ.blocked will be set to 1. If syncing is blocked,
     393             :         //   syncQueue.empty() will return true and syncQueue.load() will return 0,0
     394             :         //   (i.e. an empty list).
     395             :         //
     396             :         // - flusher.syncQ.blocked is cleared by a timer that is initialized when
     397             :         //   blocked is set to 1. When blocked is 1, no syncing will take place, but
     398             :         //   flushing will continue to be performed. The on/off toggle for syncing
     399             :         //   does not need to be carefully synchronized with the rest of processing
     400             :         //   -- all we need to ensure is that after any transition to blocked=1 there
     401             :         //   is eventually a transition to blocked=0. syncTimer performs this
     402             :         //   transition. Note that any change to min-sync-interval will not take
     403             :         //   effect until the previous timer elapses.
     404             :         //
     405             :         // - Picking up the syncing work to perform requires coordination with
     406             :         //   picking up the flushing work. Specifically, flushing work is queued
     407             :         //   before syncing work. The guarantee of this code is that when a sync is
     408             :         //   requested, any previously queued flush work will be synced. This
     409             :         //   motivates reading the syncing work (f.syncQ.load()) before picking up
     410             :         //   the flush work (w.block.written.Load()).
     411             : 
     412             :         // The list of full blocks that need to be written. This is copied from
     413             :         // f.pending on every loop iteration, though the number of elements is
     414             :         // usually small (most frequently 1). In the case of the WAL LogWriter, the
     415             :         // number of blocks is bounded by the size of the WAL's corresponding
     416             :         // memtable (MemtableSize/BlockSize). With the default 64 MiB memtables,
     417             :         // this works out to at most 2048 elements if the entirety of the memtable's
     418             :         // contents are queued.
     419           1 :         pending := make([]*block, 0, cap(f.pending))
     420           1 :         for {
     421           1 :                 for {
     422           1 :                         // Grab the portion of the current block that requires flushing. Note that
     423           1 :                         // the current block can be added to the pending blocks list after we release
     424           1 :                         // the flusher lock, but it won't be part of pending.
     425           1 :                         written := w.block.written.Load()
     426           1 :                         if len(f.pending) > 0 || written > w.block.flushed || !f.syncQ.empty() {
     427           1 :                                 break
     428             :                         }
     429           1 :                         if f.close {
     430           1 :                                 // If the writer is closed, pretend the sync timer fired immediately so
     431           1 :                                 // that we can process any queued sync requests.
     432           1 :                                 f.syncQ.clearBlocked()
     433           1 :                                 if !f.syncQ.empty() {
     434           1 :                                         break
     435             :                                 }
     436           1 :                                 return
     437             :                         }
     438           1 :                         f.ready.Wait()
     439           1 :                         continue
     440             :                 }
     441             :                 // Found work to do, so no longer idle.
     442           1 :                 workStartTime := time.Now()
     443           1 :                 idleDuration := workStartTime.Sub(idleStartTime)
     444           1 :                 pending = append(pending[:0], f.pending...)
     445           1 :                 f.pending = f.pending[:0]
     446           1 :                 f.metrics.PendingBufferLen.AddSample(int64(len(pending)))
     447           1 : 
     448           1 :                 // Grab the list of sync waiters. Note that syncQueue.load() will return
     449           1 :                 // 0,0 while we're waiting for the min-sync-interval to expire. This
     450           1 :                 // allows flushing to proceed even if we're not ready to sync.
     451           1 :                 head, tail, realSyncQLen := f.syncQ.load()
     452           1 :                 f.metrics.SyncQueueLen.AddSample(int64(realSyncQLen))
     453           1 : 
     454           1 :                 // Grab the portion of the current block that requires flushing. Note that
     455           1 :                 // the current block can be added to the pending blocks list after we
     456           1 :                 // release the flusher lock, but it won't be part of pending. This has to
     457           1 :                 // be ordered after we get the list of sync waiters from syncQ in order to
     458           1 :                 // prevent a race where a waiter adds itself to syncQ, but this thread
     459           1 :                 // picks up the entry in syncQ and not the buffered data.
     460           1 :                 written := w.block.written.Load()
     461           1 :                 data := w.block.buf[w.block.flushed:written]
     462           1 :                 w.block.flushed = written
     463           1 : 
     464           1 :                 // If flusher has an error, we propagate it to waiters. Note in spite of
     465           1 :                 // error we consume the pending list above to free blocks for writers.
     466           1 :                 if f.err != nil {
     467           1 :                         f.syncQ.pop(head, tail, f.err, w.queueSemChan)
     468           1 :                         // Update the idleStartTime if work could not be done, so that we don't
     469           1 :                         // include the duration we tried to do work as idle. We don't bother
     470           1 :                         // with the rest of the accounting, which means we will undercount.
     471           1 :                         idleStartTime = time.Now()
     472           1 :                         continue
     473             :                 }
     474           1 :                 f.Unlock()
     475           1 :                 synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, head, tail)
     476           1 :                 f.Lock()
     477           1 :                 if synced && f.fsyncLatency != nil {
     478           1 :                         f.fsyncLatency.Observe(float64(syncLatency))
     479           1 :                 }
     480           1 :                 f.err = err
     481           1 :                 if f.err != nil {
     482           1 :                         f.syncQ.clearBlocked()
     483           1 :                         // Update the idleStartTime if work could not be done, so that we don't
     484           1 :                         // include the duration we tried to do work as idle. We don't bother
     485           1 :                         // with the rest of the accounting, which means we will undercount.
     486           1 :                         idleStartTime = time.Now()
     487           1 :                         continue
     488             :                 }
     489             : 
     490           1 :                 if synced && f.minSyncInterval != nil {
     491           1 :                         // A sync was performed. Make sure we've waited for the min sync
     492           1 :                         // interval before syncing again.
     493           1 :                         if min := f.minSyncInterval(); min > 0 {
     494           1 :                                 f.syncQ.setBlocked()
     495           1 :                                 if syncTimer == nil {
     496           1 :                                         syncTimer = w.afterFunc(min, func() {
     497           1 :                                                 f.syncQ.clearBlocked()
     498           1 :                                                 f.ready.Signal()
     499           1 :                                         })
     500           1 :                                 } else {
     501           1 :                                         syncTimer.Reset(min)
     502           1 :                                 }
     503             :                         }
     504             :                 }
     505             :                 // Finished work, and started idling.
     506           1 :                 idleStartTime = time.Now()
     507           1 :                 workDuration := idleStartTime.Sub(workStartTime)
     508           1 :                 f.metrics.WriteThroughput.Bytes += bytesWritten
     509           1 :                 f.metrics.WriteThroughput.WorkDuration += workDuration
     510           1 :                 f.metrics.WriteThroughput.IdleDuration += idleDuration
     511             :         }
     512             : }
     513             : 
     514             : func (w *LogWriter) flushPending(
     515             :         data []byte, pending []*block, head, tail uint32,
     516           1 : ) (synced bool, syncLatency time.Duration, bytesWritten int64, err error) {
     517           1 :         defer func() {
     518           1 :                 // Translate panics into errors. The errors will cause flushLoop to shut
     519           1 :                 // down, but allows us to do so in a controlled way and avoid swallowing
     520           1 :                 // the stack that created the panic if panic'ing itself hits a panic
     521           1 :                 // (e.g. unlock of unlocked mutex).
     522           1 :                 if r := recover(); r != nil {
     523           0 :                         err = errors.Newf("%v", r)
     524           0 :                 }
     525             :         }()
     526             : 
     527           1 :         for _, b := range pending {
     528           1 :                 bytesWritten += blockSize - int64(b.flushed)
     529           1 :                 if err = w.flushBlock(b); err != nil {
     530           0 :                         break
     531             :                 }
     532             :         }
     533           1 :         if n := len(data); err == nil && n > 0 {
     534           1 :                 bytesWritten += int64(n)
     535           1 :                 _, err = w.w.Write(data)
     536           1 :         }
     537             : 
     538           1 :         synced = head != tail
     539           1 :         if synced {
     540           1 :                 if err == nil && w.s != nil {
     541           1 :                         syncLatency, err = w.syncWithLatency()
     542           1 :                 }
     543           1 :                 f := &w.flusher
     544           1 :                 if popErr := f.syncQ.pop(head, tail, err, w.queueSemChan); popErr != nil {
     545           0 :                         return synced, syncLatency, bytesWritten, popErr
     546           0 :                 }
     547             :         }
     548             : 
     549           1 :         return synced, syncLatency, bytesWritten, err
     550             : }
     551             : 
     552           1 : func (w *LogWriter) syncWithLatency() (time.Duration, error) {
     553           1 :         start := time.Now()
     554           1 :         err := w.s.Sync()
     555           1 :         syncLatency := time.Since(start)
     556           1 :         return syncLatency, err
     557           1 : }
     558             : 
     559           1 : func (w *LogWriter) flushBlock(b *block) error {
     560           1 :         if _, err := w.w.Write(b.buf[b.flushed:]); err != nil {
     561           0 :                 return err
     562           0 :         }
     563           1 :         b.written.Store(0)
     564           1 :         b.flushed = 0
     565           1 :         w.free.Lock()
     566           1 :         w.free.blocks = append(w.free.blocks, b)
     567           1 :         w.free.Unlock()
     568           1 :         return nil
     569             : }
     570             : 
     571             : // queueBlock queues the current block for writing to the underlying writer,
     572             : // allocates a new block and reserves space for the next header.
     573           1 : func (w *LogWriter) queueBlock() {
     574           1 :         // Allocate a new block, blocking until one is available. We do this first
     575           1 :         // because w.block is protected by w.flusher.Mutex.
     576           1 :         w.free.Lock()
     577           1 :         if len(w.free.blocks) == 0 {
     578           1 :                 w.free.blocks = append(w.free.blocks, blockPool.Get().(*block))
     579           1 :         }
     580           1 :         nextBlock := w.free.blocks[len(w.free.blocks)-1]
     581           1 :         w.free.blocks = w.free.blocks[:len(w.free.blocks)-1]
     582           1 :         w.free.Unlock()
     583           1 : 
     584           1 :         f := &w.flusher
     585           1 :         f.Lock()
     586           1 :         f.pending = append(f.pending, w.block)
     587           1 :         w.block = nextBlock
     588           1 :         f.ready.Signal()
     589           1 :         w.err = w.flusher.err
     590           1 :         f.Unlock()
     591           1 : 
     592           1 :         w.blockNum++
     593             : }
     594             : 
     595             : // Close flushes and syncs any unwritten data and closes the writer.
     596             : // Where required, external synchronisation is provided by commitPipeline.mu.
     597           1 : func (w *LogWriter) Close() error {
     598           1 :         f := &w.flusher
     599           1 : 
     600           1 :         // Emit an EOF trailer signifying the end of this log. This helps readers
     601           1 :         // differentiate between a corrupted entry in the middle of a log from
     602           1 :         // garbage at the tail from a recycled log file.
     603           1 :         w.emitEOFTrailer()
     604           1 : 
     605           1 :         // Signal the flush loop to close.
     606           1 :         f.Lock()
     607           1 :         f.close = true
     608           1 :         f.ready.Signal()
     609           1 :         f.Unlock()
     610           1 : 
     611           1 :         // Wait for the flush loop to close. The flush loop will not close until all
     612           1 :         // pending data has been written or an error occurs.
     613           1 :         <-f.closed
     614           1 : 
     615           1 :         // Sync any flushed data to disk. NB: flushLoop will sync after flushing the
     616           1 :         // last buffered data only if it was requested via syncQ, so we need to sync
     617           1 :         // here to ensure that all the data is synced.
     618           1 :         err := w.flusher.err
     619           1 :         var syncLatency time.Duration
     620           1 :         if err == nil && w.s != nil {
     621           1 :                 syncLatency, err = w.syncWithLatency()
     622           1 :         }
     623           1 :         f.Lock()
     624           1 :         if f.fsyncLatency != nil {
     625           1 :                 f.fsyncLatency.Observe(float64(syncLatency))
     626           1 :         }
     627           1 :         free := w.free.blocks
     628           1 :         f.Unlock()
     629           1 : 
     630           1 :         if w.c != nil {
     631           1 :                 cerr := w.c.Close()
     632           1 :                 w.c = nil
     633           1 :                 if cerr != nil {
     634           0 :                         return cerr
     635           0 :                 }
     636             :         }
     637             : 
     638           1 :         for _, b := range free {
     639           1 :                 b.flushed = 0
     640           1 :                 b.written.Store(0)
     641           1 :                 blockPool.Put(b)
     642           1 :         }
     643             : 
     644           1 :         w.err = errClosedWriter
     645           1 :         return err
     646             : }
     647             : 
     648             : // WriteRecord writes a complete record. Returns the offset just past the end
     649             : // of the record.
     650             : // External synchronisation provided by commitPipeline.mu.
     651           1 : func (w *LogWriter) WriteRecord(p []byte) (int64, error) {
     652           1 :         logSize, err := w.SyncRecord(p, nil, nil)
     653           1 :         return logSize, err
     654           1 : }
     655             : 
     656             : // SyncRecord writes a complete record. If wg != nil the record will be
     657             : // asynchronously persisted to the underlying writer and done will be called on
     658             : // the wait group upon completion. Returns the offset just past the end of the
     659             : // record.
     660             : // External synchronisation provided by commitPipeline.mu.
     661             : func (w *LogWriter) SyncRecord(
     662             :         p []byte, wg *sync.WaitGroup, err *error,
     663           1 : ) (logSize int64, err2 error) {
     664           1 :         if w.err != nil {
     665           0 :                 return -1, w.err
     666           0 :         }
     667             : 
     668             :         // The `i == 0` condition ensures we handle empty records. Such records can
     669             :         // possibly be generated for VersionEdits stored in the MANIFEST. While the
     670             :         // MANIFEST is currently written using Writer, it is good to support the same
     671             :         // semantics with LogWriter.
     672           1 :         for i := 0; i == 0 || len(p) > 0; i++ {
     673           1 :                 p = w.emitFragment(i, p)
     674           1 :         }
     675             : 
     676           1 :         if wg != nil {
     677           1 :                 // If we've been asked to persist the record, add the WaitGroup to the sync
     678           1 :                 // queue and signal the flushLoop. Note that flushLoop will write partial
     679           1 :                 // blocks to the file if syncing has been requested. The contract is that
     680           1 :                 // any record written to the LogWriter to this point will be flushed to the
     681           1 :                 // OS and synced to disk.
     682           1 :                 f := &w.flusher
     683           1 :                 f.syncQ.push(wg, err)
     684           1 :                 f.ready.Signal()
     685           1 :         }
     686             : 
     687           1 :         offset := w.blockNum*blockSize + int64(w.block.written.Load())
     688           1 :         // Note that we don't return w.err here as a concurrent call to Close would
     689           1 :         // race with our read. That's ok because the only error we could be seeing is
     690           1 :         // one to syncing for which the caller can receive notification of by passing
     691           1 :         // in a non-nil err argument.
     692           1 :         return offset, nil
     693             : }
     694             : 
     695             : // Size returns the current size of the file.
     696             : // External synchronisation provided by commitPipeline.mu.
     697           1 : func (w *LogWriter) Size() int64 {
     698           1 :         return w.blockNum*blockSize + int64(w.block.written.Load())
     699           1 : }
     700             : 
     701           1 : func (w *LogWriter) emitEOFTrailer() {
     702           1 :         // Write a recyclable chunk header with a different log number.  Readers
     703           1 :         // will treat the header as EOF when the log number does not match.
     704           1 :         b := w.block
     705           1 :         i := b.written.Load()
     706           1 :         binary.LittleEndian.PutUint32(b.buf[i+0:i+4], 0) // CRC
     707           1 :         binary.LittleEndian.PutUint16(b.buf[i+4:i+6], 0) // Size
     708           1 :         b.buf[i+6] = recyclableFullChunkType
     709           1 :         binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum+1) // Log number
     710           1 :         b.written.Store(i + int32(recyclableHeaderSize))
     711           1 : }
     712             : 
     713           1 : func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte) {
     714           1 :         b := w.block
     715           1 :         i := b.written.Load()
     716           1 :         first := n == 0
     717           1 :         last := blockSize-i-recyclableHeaderSize >= int32(len(p))
     718           1 : 
     719           1 :         if last {
     720           1 :                 if first {
     721           1 :                         b.buf[i+6] = recyclableFullChunkType
     722           1 :                 } else {
     723           1 :                         b.buf[i+6] = recyclableLastChunkType
     724           1 :                 }
     725           1 :         } else {
     726           1 :                 if first {
     727           1 :                         b.buf[i+6] = recyclableFirstChunkType
     728           1 :                 } else {
     729           1 :                         b.buf[i+6] = recyclableMiddleChunkType
     730           1 :                 }
     731             :         }
     732             : 
     733           1 :         binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum)
     734           1 : 
     735           1 :         r := copy(b.buf[i+recyclableHeaderSize:], p)
     736           1 :         j := i + int32(recyclableHeaderSize+r)
     737           1 :         binary.LittleEndian.PutUint32(b.buf[i+0:i+4], crc.New(b.buf[i+6:j]).Value())
     738           1 :         binary.LittleEndian.PutUint16(b.buf[i+4:i+6], uint16(r))
     739           1 :         b.written.Store(j)
     740           1 : 
     741           1 :         if blockSize-b.written.Load() < recyclableHeaderSize {
     742           1 :                 // There is no room for another fragment in the block, so fill the
     743           1 :                 // remaining bytes with zeros and queue the block for flushing.
     744           1 :                 clear(b.buf[b.written.Load():])
     745           1 :                 w.queueBlock()
     746           1 :         }
     747           1 :         return p[r:]
     748             : }
     749             : 
     750             : // Metrics must be called after Close. The callee will no longer modify the
     751             : // returned LogWriterMetrics.
     752           1 : func (w *LogWriter) Metrics() *LogWriterMetrics {
     753           1 :         return w.flusher.metrics
     754           1 : }
     755             : 
     756             : // LogWriterMetrics contains misc metrics for the log writer.
     757             : type LogWriterMetrics struct {
     758             :         WriteThroughput  base.ThroughputMetric
     759             :         PendingBufferLen base.GaugeSampleMetric
     760             :         SyncQueueLen     base.GaugeSampleMetric
     761             : }
     762             : 
     763             : // Merge merges metrics from x. Requires that x is non-nil.
     764           1 : func (m *LogWriterMetrics) Merge(x *LogWriterMetrics) error {
     765           1 :         m.WriteThroughput.Merge(x.WriteThroughput)
     766           1 :         m.PendingBufferLen.Merge(x.PendingBufferLen)
     767           1 :         m.SyncQueueLen.Merge(x.SyncQueueLen)
     768           1 :         return nil
     769           1 : }

Generated by: LCOV version 1.14