LCOV - code coverage report
Current view: top level - pebble/wal - failover_writer.go (source / functions) Hit Total Coverage
Test: 2024-02-22 08:15Z e05c22dc - tests + meta.lcov Lines: 462 476 97.1 %
Date: 2024-02-22 08:16:48 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package wal
       6             : 
       7             : import (
       8             :         "io"
       9             :         "sync"
      10             :         "sync/atomic"
      11             :         "time"
      12             : 
      13             :         "github.com/cockroachdb/errors"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/record"
      16             :         "github.com/cockroachdb/pebble/vfs"
      17             :         "github.com/prometheus/client_golang/prometheus"
      18             : )
      19             : 
      20             : // recordQueueEntry is an entry in recordQueue.
      21             : type recordQueueEntry struct {
      22             :         p    []byte
      23             :         opts SyncOptions
      24             : }
      25             : 
      26             : const initialBufferLen = 8192
      27             : 
      28             : // recordQueue is a variable-size single-producer multiple-consumer queue. It
      29             : // is not lock-free, but most operations only need mu.RLock. It needs a mutex
      30             : // to grow the size, since there is no upper bound on the number of queued
      31             : // records (which are all the records that are not synced, and will need to be
      32             : // written again in case of failover). Additionally, it needs a mutex to
      33             : // atomically grab a snapshot of the queued records and provide them to a new
      34             : // LogWriter that is being switched to.
      35             : type recordQueue struct {
      36             :         // Only held for reading for all pop operations and most push operations.
      37             :         // Held for writing when buffer needs to be grown or when switching to a new
      38             :         // writer.
      39             :         mu sync.RWMutex
      40             : 
      41             :         // queue is [tail, head). tail is the oldest entry and head is the index for
      42             :         // the next entry.
      43             :         //
      44             :         // Consumers: atomically read and write tail in pop (using
      45             :         // compare-and-swap). This is not the usual kind of queue consumer since
      46             :         // they already know the index that they are popping exists, hence don't
      47             :         // need to look at head.
      48             :         //
      49             :         // Producer: atomically reads tail in push. Writes to head.
      50             :         //
      51             :         // Based on the above we only need tail to be atomic. However, the producer
      52             :         // also populates entries in buffer, whose values need to be seen by the
      53             :         // consumers when doing a pop, which means they need to synchronize using a
      54             :         // release and acquire memory barrier pair, where the push does the release
      55             :         // and the pop does the acquire. For this reason we make head also atomic
      56             :         // and merge head and tail into a single atomic, so that the store of head
      57             :         // in push and the load of tail in pop accomplishes this release-acquire
      58             :         // pair.
      59             :         //
      60             :         // All updates to headTail hold mu at least for reading. So when mu is held
      61             :         // for writing, there is a guarantee that headTail is not being updated.
      62             :         //
      63             :         // head is most-significant 32 bits and tail is least-significant 32 bits.
      64             :         headTail atomic.Uint64
      65             : 
      66             :         // Access to buffer requires at least RLock.
      67             :         buffer []recordQueueEntry
      68             : 
      69             :         lastTailObservedByProducer uint32
      70             : 
      71             :         // Read requires RLock.
      72             :         writer *record.LogWriter
      73             : 
      74             :         // When writer != nil, this is the return value of the last call to
      75             :         // SyncRecordGeneralized. It is updated in (a) WriteRecord calls push, using
      76             :         // only RLock (since WriteRecord is externally synchronized), (b)
      77             :         // snapshotAndSwitchWriter, using Lock. (b) excludes (a).
      78             :         lastLogSize int64
      79             : }
      80             : 
      81           1 : func (q *recordQueue) init() {
      82           1 :         *q = recordQueue{
      83           1 :                 buffer: make([]recordQueueEntry, initialBufferLen),
      84           1 :         }
      85           1 : }
      86             : 
      87             : // NB: externally synchronized, i.e., no concurrent push calls.
      88             : func (q *recordQueue) push(
      89             :         p []byte,
      90             :         opts SyncOptions,
      91             :         latestLogSizeInWriteRecord int64,
      92             :         latestWriterInWriteRecord *record.LogWriter,
      93           1 : ) (index uint32, writer *record.LogWriter, lastLogSize int64) {
      94           1 :         ht := q.headTail.Load()
      95           1 :         h, t := unpackHeadTail(ht)
      96           1 :         n := int(h - t)
      97           1 :         if len(q.buffer) == n {
      98           1 :                 // Full
      99           1 :                 m := 2 * n
     100           1 :                 newBuffer := make([]recordQueueEntry, m)
     101           1 :                 for i := int(t); i < int(h); i++ {
     102           1 :                         newBuffer[i%m] = q.buffer[i%n]
     103           1 :                 }
     104           1 :                 q.mu.Lock()
     105           1 :                 q.buffer = newBuffer
     106           1 :                 q.mu.Unlock()
     107             :         }
     108           1 :         q.mu.RLock()
     109           1 :         q.buffer[h] = recordQueueEntry{
     110           1 :                 p:    p,
     111           1 :                 opts: opts,
     112           1 :         }
     113           1 :         // Reclaim memory for consumed entries. We couldn't do that in pop since
     114           1 :         // multiple consumers are popping using CAS and that immediately transfers
     115           1 :         // ownership to the producer.
     116           1 :         for i := q.lastTailObservedByProducer; i < t; i++ {
     117           1 :                 q.buffer[i] = recordQueueEntry{}
     118           1 :         }
     119           1 :         q.lastTailObservedByProducer = t
     120           1 :         q.headTail.Add(1 << headTailBits)
     121           1 :         writer = q.writer
     122           1 :         if writer == latestWriterInWriteRecord {
     123           1 :                 // WriteRecord has written to this writer since the switch.
     124           1 :                 q.lastLogSize = latestLogSizeInWriteRecord
     125           1 :         }
     126             :         // Else writer is a new writer that was switched to, so ignore the
     127             :         // latestLogSizeInWriteRecord.
     128             : 
     129           1 :         lastLogSize = q.lastLogSize
     130           1 :         q.mu.RUnlock()
     131           1 :         return h, writer, lastLogSize
     132             : }
     133             : 
     134           1 : func (q *recordQueue) length() int {
     135           1 :         ht := q.headTail.Load()
     136           1 :         h, t := unpackHeadTail(ht)
     137           1 :         return int(h - t)
     138           1 : }
     139             : 
     140             : // Pops all entries. Must be called only after the last push returns.
     141           1 : func (q *recordQueue) popAll(err error) (numRecords int, numSyncsPopped int) {
     142           1 :         ht := q.headTail.Load()
     143           1 :         h, t := unpackHeadTail(ht)
     144           1 :         n := int(h - t)
     145           1 :         if n == 0 {
     146           1 :                 return 0, 0
     147           1 :         }
     148           1 :         return n, q.pop(h-1, err, false)
     149             : }
     150             : 
     151             : // Pops all entries up to and including index. The remaining queue is
     152             : // [index+1, head).
     153             : //
     154             : // NB: we could slightly simplify to only have the latest writer be able to
     155             : // pop. This would avoid the CAS below, but it seems better to reduce the
     156             : // amount of queued work regardless of who has successfully written it.
     157           1 : func (q *recordQueue) pop(index uint32, err error, runCb bool) (numSyncsPopped int) {
     158           1 :         var buf [512]SyncOptions
     159           1 :         // Tail can increase, and numEntriesToPop decrease, due to competition with
     160           1 :         // other consumers. Head can increase due to the concurrent producer.
     161           1 :         headTailEntriesToPop := func() (ht uint64, h uint32, t uint32, numEntriesToPop int) {
     162           1 :                 ht = q.headTail.Load()
     163           1 :                 h, t = unpackHeadTail(ht)
     164           1 :                 tail := int(t)
     165           1 :                 numEntriesToPop = int(index) - tail + 1
     166           1 :                 return ht, h, t, numEntriesToPop
     167           1 :         }
     168           1 :         ht, head, tail, numEntriesToPop := headTailEntriesToPop()
     169           1 :         if numEntriesToPop <= 0 {
     170           1 :                 return 0
     171           1 :         }
     172           1 :         var b []SyncOptions
     173           1 :         if numEntriesToPop <= len(buf) {
     174           1 :                 b = buf[:numEntriesToPop]
     175           1 :         } else {
     176           1 :                 // Do allocation before acquiring the mutex.
     177           1 :                 b = make([]SyncOptions, numEntriesToPop)
     178           1 :         }
     179           1 :         q.mu.RLock()
     180           1 :         n := len(q.buffer)
     181           1 :         for i := 0; i < numEntriesToPop; i++ {
     182           1 :                 // Grab all the possible entries before doing CAS, since successful CAS
     183           1 :                 // will also release those buffer slots to the producer.
     184           1 :                 b[i] = q.buffer[(i+int(tail))%n].opts
     185           1 :         }
     186             :         // CAS, with retry loop, since this pop can race with other consumers.
     187           1 :         for {
     188           1 :                 newHT := makeHeadTail(head, index+1)
     189           1 :                 if q.headTail.CompareAndSwap(ht, newHT) {
     190           1 :                         break
     191             :                 }
     192           0 :                 ht, head, _, numEntriesToPop = headTailEntriesToPop()
     193           0 :                 if numEntriesToPop <= 0 {
     194           0 :                         break
     195             :                 }
     196             :         }
     197           1 :         q.mu.RUnlock()
     198           1 : 
     199           1 :         // The current value of numEntriesToPop is the number of entries that were
     200           1 :         // popped.
     201           1 :         if numEntriesToPop <= 0 {
     202           0 :                 return 0
     203           0 :         }
     204           1 :         bufLen := len(b)
     205           1 :         // [0, bufLen-numEntriesToPop) were not popped, since this pop raced with
     206           1 :         // other consumers that popped those entries.
     207           1 :         for i := bufLen - numEntriesToPop; i < bufLen; i++ {
     208           1 :                 if b[i].Done != nil {
     209           1 :                         numSyncsPopped++
     210           1 :                         if err != nil {
     211           1 :                                 *b[i].Err = err
     212           1 :                         }
     213           1 :                         b[i].Done.Done()
     214             :                 }
     215             :         }
     216           1 :         return numSyncsPopped
     217             : }
     218             : 
     219             : func (q *recordQueue) snapshotAndSwitchWriter(
     220             :         writer *record.LogWriter,
     221             :         snapshotFunc func(firstIndex uint32, entries []recordQueueEntry) (logSize int64),
     222           1 : ) {
     223           1 :         q.mu.Lock()
     224           1 :         defer q.mu.Unlock()
     225           1 :         q.writer = writer
     226           1 :         h, t := unpackHeadTail(q.headTail.Load())
     227           1 :         n := h - t
     228           1 :         if n > 0 {
     229           1 :                 m := uint32(len(q.buffer))
     230           1 :                 b := make([]recordQueueEntry, n)
     231           1 :                 for i := t; i < h; i++ {
     232           1 :                         b[i-t] = q.buffer[i%m]
     233           1 :                 }
     234           1 :                 q.lastLogSize = snapshotFunc(t, b)
     235             :         }
     236             : }
     237             : 
     238             : // getLastIndex is used by failoverWriter.Close.
     239           1 : func (q *recordQueue) getLastIndex() (lastIndex int64) {
     240           1 :         h, _ := unpackHeadTail(q.headTail.Load())
     241           1 :         return int64(h) - 1
     242           1 : }
     243             : 
     244             : const headTailBits = 32
     245             : 
     246           1 : func unpackHeadTail(ht uint64) (head, tail uint32) {
     247           1 :         const mask = 1<<headTailBits - 1
     248           1 :         head = uint32((ht >> headTailBits) & mask)
     249           1 :         tail = uint32(ht & mask)
     250           1 :         return head, tail
     251           1 : }
     252             : 
     253           1 : func makeHeadTail(head, tail uint32) uint64 {
     254           1 :         return (uint64(head) << headTailBits) | uint64(tail)
     255           1 : }
     256             : 
     257             : // Maximum number of physical log files when writing a virtual WAL. Arbitrarily
     258             : // chosen value. Setting this to 2 will not simplify the code. We make this a
     259             : // constant since we want a fixed size array for writer.writers.
     260             : const maxPhysicalLogs = 10
     261             : 
     262             : // failoverWriter is the implementation of Writer in failover mode. No Writer
     263             : // method blocks for IO, except for Close.
     264             : //
     265             : // Loosely speaking, Close blocks until all records are successfully written
     266             : // and synced to some log writer. Monitoring of log writer latency and errors
     267             : // continues after Close is called, which means failoverWriter can be switched
     268             : // to a new log writer after Close is called, to unblock Close.
     269             : //
     270             : // More precisely, Close does not block if there is an error in creating or
     271             : // closing the latest LogWriter when close was called. This is because errors
     272             : // are considered indicative of misconfiguration, and the user of
     273             : // failoverWriter can dampen switching when observing errors (e.g. see
     274             : // failoverMonitor), so close does not assume any liveness of calls to
     275             : // switchToNewDir when such errors occur. Since the caller (see db.go) treats
     276             : // an error on Writer.Close as fatal, this does mean that failoverWriter has
     277             : // limited ability to mask errors (its primary task is to mask high latency).
     278             : type failoverWriter struct {
     279             :         opts    failoverWriterOpts
     280             :         q       recordQueue
     281             :         writers [maxPhysicalLogs]logWriterAndRecorder
     282             :         mu      struct {
     283             :                 sync.Mutex
     284             :                 // cond is signaled when the latest LogWriter is set in writers (or there
     285             :                 // is a creation error), or when the latest LogWriter is successfully
     286             :                 // closed. It is waited on in Close. We don't use channels and select
     287             :                 // since what Close is waiting on is dynamic based on the local state in
     288             :                 // Close, so using Cond is simpler.
     289             :                 cond *sync.Cond
     290             :                 // nextWriterIndex is advanced before creating the *LogWriter. That is, a
     291             :                 // slot is reserved by taking the current value of nextWriterIndex and
     292             :                 // incrementing it, and then the *LogWriter for that slot is created. When
     293             :                 // newFailoverWriter returns, nextWriterIndex = 1.
     294             :                 //
     295             :                 // The latest *LogWriter is (will be) at nextWriterIndex-1.
     296             :                 //
     297             :                 // INVARIANT: nextWriterIndex <= len(writers)
     298             :                 nextWriterIndex logNameIndex
     299             :                 closed          bool
     300             :                 // metrics is initialized in Close. Currently we just use the metrics from
     301             :                 // the latest writer after it is closed, since in the common case with
     302             :                 // only one writer, that writer's flush loop will have finished and the
     303             :                 // metrics will be current. With multiple writers, these metrics can be
     304             :                 // quite inaccurate. The WriteThroughput metric includes an IdleDuration,
     305             :                 // which can be high for a writer that was switched away from, and
     306             :                 // therefore not indicative of overall work being done by the
     307             :                 // failoverWriter. The PendingBufferLen and SyncQueueLen are similarly
     308             :                 // inaccurate once there is no more work being given to a writer. We could
     309             :                 // add a method to LogWriter to stop sampling metrics when it is not the
     310             :                 // latest writer. Then we could aggregate all these metrics across all
     311             :                 // writers.
     312             :                 //
     313             :                 // Note that CockroachDB does not use these metrics in any meaningful way.
     314             :                 //
     315             :                 // TODO(sumeer): do the improved solution outlined above.
     316             :                 metrics record.LogWriterMetrics
     317             :         }
     318             :         // State for computing logical offset. The cumulative offset state is in
     319             :         // offset. Each time we call SyncRecordGeneralized from WriteRecord, we
     320             :         // compute the delta from the size returned by this LogWriter now, and the
     321             :         // size returned by this LogWriter in the previous call to
     322             :         // SyncRecordGeneralized. That previous call to SyncRecordGeneralized may
     323             :         // have happened from WriteRecord, or asynchronously during a switch. So
     324             :         // that previous call state requires synchronization and is maintained in
     325             :         // recordQueue. The offset is incremented by this delta without any
     326             :         // synchronization, since we rely on external synchronization (like the
     327             :         // standaloneWriter).
     328             :         logicalOffset struct {
     329             :                 latestWriterInWriteRecord  *record.LogWriter
     330             :                 latestLogSizeInWriteRecord int64
     331             :                 offset                     int64
     332             :                 // Transitions once from false => true when there is a non-nil writer.
     333             :                 notEstimatedOffset bool
     334             :         }
     335             :         psiForWriteRecordBacking record.PendingSyncIndex
     336             :         psiForSwitchBacking      record.PendingSyncIndex
     337             : }
     338             : 
     339             : type logWriterAndRecorder struct {
     340             :         // This may never become non-nil, if when the LogWriter was finally created,
     341             :         // it was no longer the latest writer. Additionally, if there was an error
     342             :         // in creating the writer, w will remain nil and createError will be set.
     343             :         w *record.LogWriter
     344             :         // createError is set if there is an error creating the writer. This is
     345             :         // useful in Close since we need to know when the work for creating the
     346             :         // latest writer is done, whether it resulted in success or not.
     347             :         createError error
     348             :         r           latencyAndErrorRecorder
     349             : }
     350             : 
     351             : var _ Writer = &failoverWriter{}
     352             : 
     353             : var _ switchableWriter = &failoverWriter{}
     354             : 
     355             : type failoverWriterOpts struct {
     356             :         wn     NumWAL
     357             :         logger base.Logger
     358             :         timeSource
     359             : 
     360             :         // Options that feed into SyncingFileOptions.
     361             :         noSyncOnClose   bool
     362             :         bytesPerSync    int
     363             :         preallocateSize func() int
     364             : 
     365             :         // Options for record.LogWriter.
     366             :         minSyncInterval func() time.Duration
     367             :         fsyncLatency    prometheus.Histogram
     368             :         queueSemChan    chan struct{}
     369             :         stopper         *stopper
     370             : 
     371             :         writerClosed func()
     372             : 
     373             :         writerCreatedForTest chan<- struct{}
     374             : }
     375             : 
     376             : func newFailoverWriter(
     377             :         opts failoverWriterOpts, initialDir dirAndFileHandle,
     378           1 : ) (*failoverWriter, error) {
     379           1 :         ww := &failoverWriter{
     380           1 :                 opts: opts,
     381           1 :         }
     382           1 :         ww.q.init()
     383           1 :         ww.mu.cond = sync.NewCond(&ww.mu)
     384           1 :         // The initial record.LogWriter creation also happens via a
     385           1 :         // switchToNewWriter since we don't want it to block newFailoverWriter.
     386           1 :         err := ww.switchToNewDir(initialDir)
     387           1 :         if err != nil {
     388           0 :                 // Switching limit cannot be exceeded when creating.
     389           0 :                 panic(err)
     390             :         }
     391           1 :         return ww, nil
     392             : }
     393             : 
     394             : // WriteRecord implements Writer.
     395           1 : func (ww *failoverWriter) WriteRecord(p []byte, opts SyncOptions) (logicalOffset int64, err error) {
     396           1 :         recordIndex, writer, lastLogSize := ww.q.push(
     397           1 :                 p, opts, ww.logicalOffset.latestLogSizeInWriteRecord, ww.logicalOffset.latestWriterInWriteRecord)
     398           1 :         if writer == nil {
     399           1 :                 // Don't have a record.LogWriter yet, so use an estimate. This estimate
     400           1 :                 // will get overwritten.
     401           1 :                 ww.logicalOffset.offset += int64(len(p))
     402           1 :                 return ww.logicalOffset.offset, nil
     403           1 :         }
     404             :         // INVARIANT: writer != nil.
     405           1 :         notEstimatedOffset := ww.logicalOffset.notEstimatedOffset
     406           1 :         if !notEstimatedOffset {
     407           1 :                 ww.logicalOffset.notEstimatedOffset = true
     408           1 :         }
     409           1 :         ww.psiForWriteRecordBacking = record.PendingSyncIndex{Index: record.NoSyncIndex}
     410           1 :         if opts.Done != nil {
     411           1 :                 ww.psiForWriteRecordBacking.Index = int64(recordIndex)
     412           1 :         }
     413           1 :         ww.logicalOffset.latestLogSizeInWriteRecord, err = writer.SyncRecordGeneralized(p, &ww.psiForWriteRecordBacking)
     414           1 :         ww.logicalOffset.latestWriterInWriteRecord = writer
     415           1 :         if notEstimatedOffset {
     416           1 :                 delta := ww.logicalOffset.latestLogSizeInWriteRecord - lastLogSize
     417           1 :                 ww.logicalOffset.offset += delta
     418           1 :         } else {
     419           1 :                 // Overwrite the estimate. This is a best-effort improvement in that it is
     420           1 :                 // accurate for the common case where writer is the first LogWriter.
     421           1 :                 // Consider a failover scenario where there was no LogWriter for the first
     422           1 :                 // 10 records, so they are all accumulated as an estimate. Then the first
     423           1 :                 // LogWriter successfully writes and syncs the first 5 records and gets
     424           1 :                 // stuck. A switch happens to a second LogWriter that is handed the
     425           1 :                 // remaining 5 records, and the the 11th record arrives via a WriteRecord.
     426           1 :                 // The transition from !notEstimatedOffset to notEstimatedOffset will
     427           1 :                 // happen on this 11th record, and the logic here will use the length of
     428           1 :                 // the second LogWriter, that does not reflect the full length.
     429           1 :                 //
     430           1 :                 // TODO(sumeer): try to make this more correct, without adding much more
     431           1 :                 // complexity, and without adding synchronization.
     432           1 :                 ww.logicalOffset.offset = ww.logicalOffset.latestLogSizeInWriteRecord
     433           1 :         }
     434           1 :         return ww.logicalOffset.offset, err
     435             : }
     436             : 
     437             : // switchToNewDir starts switching to dir. It implements switchableWriter. All
     438             : // work is async, and a non-nil error is returned only if the switching limit
     439             : // is exceeded.
     440           1 : func (ww *failoverWriter) switchToNewDir(dir dirAndFileHandle) error {
     441           1 :         ww.mu.Lock()
     442           1 :         // Can have a late switchToNewDir call is the failoverMonitor has not yet
     443           1 :         // been told that the writer is closed. Ignore.
     444           1 :         if ww.mu.closed {
     445           1 :                 ww.mu.Unlock()
     446           1 :                 if ww.opts.writerCreatedForTest != nil {
     447           1 :                         ww.opts.writerCreatedForTest <- struct{}{}
     448           1 :                 }
     449           1 :                 return nil
     450             :         }
     451             :         // writerIndex is the slot for this writer.
     452           1 :         writerIndex := ww.mu.nextWriterIndex
     453           1 :         if int(writerIndex) == len(ww.writers) {
     454           1 :                 ww.mu.Unlock()
     455           1 :                 return errors.Errorf("exceeded switching limit")
     456           1 :         }
     457           1 :         ww.mu.nextWriterIndex++
     458           1 :         ww.mu.Unlock()
     459           1 : 
     460           1 :         // Creation is async.
     461           1 :         ww.opts.stopper.runAsync(func() {
     462           1 :                 // TODO(sumeer): recycling of logs.
     463           1 :                 filename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(ww.opts.wn, writerIndex))
     464           1 :                 recorderAndWriter := &ww.writers[writerIndex].r
     465           1 :                 recorderAndWriter.ts = ww.opts.timeSource
     466           1 :                 var file vfs.File
     467           1 :                 // handleErrFunc is called when err != nil. It handles the multiple IO error
     468           1 :                 // cases below.
     469           1 :                 handleErrFunc := func(err error) {
     470           1 :                         if file != nil {
     471           1 :                                 file.Close()
     472           1 :                         }
     473           1 :                         ww.mu.Lock()
     474           1 :                         defer ww.mu.Unlock()
     475           1 :                         ww.writers[writerIndex].createError = err
     476           1 :                         ww.mu.cond.Signal()
     477           1 :                         if ww.opts.writerCreatedForTest != nil {
     478           1 :                                 ww.opts.writerCreatedForTest <- struct{}{}
     479           1 :                         }
     480             :                 }
     481           1 :                 var err error
     482           1 :                 // Create file.
     483           1 :                 recorderAndWriter.writeStart()
     484           1 :                 file, err = dir.FS.Create(filename)
     485           1 :                 recorderAndWriter.writeEnd(err)
     486           1 :                 // TODO(sumeer): should we fatal if primary dir? At some point it is better
     487           1 :                 // to fatal instead of continuing to failover.
     488           1 :                 // base.MustExist(dir.FS, filename, ww.opts.logger, err)
     489           1 :                 if err != nil {
     490           1 :                         handleErrFunc(err)
     491           1 :                         return
     492           1 :                 }
     493             :                 // Sync dir.
     494           1 :                 recorderAndWriter.writeStart()
     495           1 :                 err = dir.Sync()
     496           1 :                 recorderAndWriter.writeEnd(err)
     497           1 :                 if err != nil {
     498           1 :                         handleErrFunc(err)
     499           1 :                         return
     500           1 :                 }
     501             :                 // Wrap in a syncingFile.
     502           1 :                 syncingFile := vfs.NewSyncingFile(file, vfs.SyncingFileOptions{
     503           1 :                         NoSyncOnClose:   ww.opts.noSyncOnClose,
     504           1 :                         BytesPerSync:    ww.opts.bytesPerSync,
     505           1 :                         PreallocateSize: ww.opts.preallocateSize(),
     506           1 :                 })
     507           1 :                 // Wrap in the latencyAndErrorRecorder.
     508           1 :                 recorderAndWriter.setWriter(syncingFile)
     509           1 : 
     510           1 :                 // Using NumWAL as the DiskFileNum is fine since it is used only as
     511           1 :                 // EOF trailer for safe log recycling. Even though many log files can
     512           1 :                 // map to a single NumWAL, a file used for NumWAL n at index m will
     513           1 :                 // never get recycled for NumWAL n at a later index (since recycling
     514           1 :                 // happens when n as a whole is obsolete).
     515           1 :                 w := record.NewLogWriter(recorderAndWriter, base.DiskFileNum(ww.opts.wn),
     516           1 :                         record.LogWriterConfig{
     517           1 :                                 WALMinSyncInterval:        ww.opts.minSyncInterval,
     518           1 :                                 WALFsyncLatency:           ww.opts.fsyncLatency,
     519           1 :                                 QueueSemChan:              ww.opts.queueSemChan,
     520           1 :                                 ExternalSyncQueueCallback: ww.doneSyncCallback,
     521           1 :                         })
     522           1 :                 closeWriter := func() bool {
     523           1 :                         ww.mu.Lock()
     524           1 :                         defer ww.mu.Unlock()
     525           1 :                         if writerIndex+1 != ww.mu.nextWriterIndex || ww.mu.closed {
     526           1 :                                 // Not the latest writer or the writer was closed while this async
     527           1 :                                 // creation was ongoing.
     528           1 :                                 if ww.opts.writerCreatedForTest != nil {
     529           1 :                                         ww.opts.writerCreatedForTest <- struct{}{}
     530           1 :                                 }
     531           1 :                                 return true
     532             :                         }
     533             :                         // Latest writer.
     534           1 :                         ww.writers[writerIndex].w = w
     535           1 :                         ww.mu.cond.Signal()
     536           1 :                         // NB: snapshotAndSwitchWriter does not block on IO, since
     537           1 :                         // SyncRecordGeneralized does no IO.
     538           1 :                         ww.q.snapshotAndSwitchWriter(w,
     539           1 :                                 func(firstIndex uint32, entries []recordQueueEntry) (logSize int64) {
     540           1 :                                         for i := range entries {
     541           1 :                                                 ww.psiForSwitchBacking = record.PendingSyncIndex{Index: record.NoSyncIndex}
     542           1 :                                                 if entries[i].opts.Done != nil {
     543           1 :                                                         ww.psiForSwitchBacking.Index = int64(firstIndex) + int64(i)
     544           1 :                                                 }
     545           1 :                                                 var err error
     546           1 :                                                 logSize, err = w.SyncRecordGeneralized(entries[i].p, &ww.psiForSwitchBacking)
     547           1 :                                                 if err != nil {
     548           0 :                                                         // TODO(sumeer): log periodically. The err will also surface via
     549           0 :                                                         // the latencyAndErrorRecorder, so if a switch is possible, it
     550           0 :                                                         // will be done.
     551           0 :                                                         ww.opts.logger.Errorf("%s", err)
     552           0 :                                                 }
     553             :                                         }
     554           1 :                                         return logSize
     555             :                                 })
     556           1 :                         if ww.opts.writerCreatedForTest != nil {
     557           1 :                                 ww.opts.writerCreatedForTest <- struct{}{}
     558           1 :                         }
     559           1 :                         return false
     560             :                 }()
     561           1 :                 if closeWriter {
     562           1 :                         // Never wrote anything to this writer so don't care about the
     563           1 :                         // returned error.
     564           1 :                         ww.opts.stopper.runAsync(func() {
     565           1 :                                 _ = w.Close()
     566           1 :                         })
     567             :                 }
     568             :         })
     569           1 :         return nil
     570             : }
     571             : 
     572             : // doneSyncCallback is the record.ExternalSyncQueueCallback called by
     573             : // record.LogWriter.
     574             : //
     575             : // recordQueue is popped from only when some work requests a sync (and
     576             : // successfully syncs). In the worst case, if no syncs are requested, we could
     577             : // queue all the records needed to fill up a memtable in the recordQueue. This
     578             : // can have two negative effects: (a) in the case of failover, we need to
     579             : // replay all the data in the current mutable memtable, which takes more time,
     580             : // (b) the memory usage is proportional to the size of the memtable. We ignore
     581             : // these negatives since, (a) users like CockroachDB regularly sync, and (b)
     582             : // the default memtable size is only 64MB.
     583           1 : func (ww *failoverWriter) doneSyncCallback(doneSync record.PendingSyncIndex, err error) {
     584           1 :         if err != nil {
     585           1 :                 // Don't pop anything since we can retry after switching to a new
     586           1 :                 // LogWriter.
     587           1 :                 return
     588           1 :         }
     589             :         // NB: harmless after Close returns since numSyncsPopped will be 0.
     590           1 :         numSyncsPopped := ww.q.pop(uint32(doneSync.Index), err, true)
     591           1 :         if ww.opts.queueSemChan != nil {
     592           1 :                 for i := 0; i < numSyncsPopped; i++ {
     593           1 :                         <-ww.opts.queueSemChan
     594           1 :                 }
     595             :         }
     596             : }
     597             : 
     598             : // ongoingLatencyOrErrorForCurDir implements switchableWriter.
     599           1 : func (ww *failoverWriter) ongoingLatencyOrErrorForCurDir() (time.Duration, error) {
     600           1 :         r := ww.recorderForCurDir()
     601           1 :         if r == nil {
     602           1 :                 return 0, nil
     603           1 :         }
     604           1 :         return r.ongoingLatencyOrError()
     605             : }
     606             : 
     607             : // For internal use and testing.
     608           1 : func (ww *failoverWriter) recorderForCurDir() *latencyAndErrorRecorder {
     609           1 :         ww.mu.Lock()
     610           1 :         defer ww.mu.Unlock()
     611           1 :         if ww.mu.closed {
     612           1 :                 return nil
     613           1 :         }
     614           1 :         return &ww.writers[ww.mu.nextWriterIndex-1].r
     615             : }
     616             : 
     617             : // Close implements Writer.
     618             : //
     619             : // NB: getOngoingLatencyOrErrorForLatestWriter and switchToNewDir can be
     620             : // called after Close is called, and there is also a possibility that they get
     621             : // called after Close returns and before failoverMonitor knows that the
     622             : // failoverWriter is closed.
     623             : //
     624             : // doneSyncCallback can be called anytime after Close returns since there
     625             : // could be stuck writes that finish arbitrarily later.
     626             : //
     627             : // See the long comment about Close behavior where failoverWriter is declared.
     628           1 : func (ww *failoverWriter) Close() (logicalOffset int64, err error) {
     629           1 :         offset, err := ww.closeInternal()
     630           1 :         ww.opts.writerClosed()
     631           1 :         return offset, err
     632           1 : }
     633             : 
     634           1 : func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) {
     635           1 :         logicalOffset = ww.logicalOffset.offset
     636           1 :         // [0, closeCalledCount) have had LogWriter.Close called (though may not
     637           1 :         // have finished) or the LogWriter will never be non-nil. Either way, they
     638           1 :         // have been "processed".
     639           1 :         closeCalledCount := logNameIndex(0)
     640           1 :         // lastWriterState is the state for the last writer, for which we are
     641           1 :         // waiting for LogWriter.Close to finish or for creation to be unsuccessful.
     642           1 :         // What is considered the last writer can change. All state is protected by
     643           1 :         // ww.mu.
     644           1 :         type lastWriterState struct {
     645           1 :                 index   logNameIndex
     646           1 :                 closed  bool
     647           1 :                 err     error
     648           1 :                 metrics record.LogWriterMetrics
     649           1 :         }
     650           1 :         var lastWriter lastWriterState
     651           1 :         lastRecordIndex := record.PendingSyncIndex{Index: ww.q.getLastIndex()}
     652           1 :         ww.mu.Lock()
     653           1 :         defer ww.mu.Unlock()
     654           1 :         // Every iteration starts and ends with the mutex held.
     655           1 :         //
     656           1 :         // Invariant: ww.mu.nextWriterIndex >= 1.
     657           1 :         //
     658           1 :         // We will loop until we have closed the lastWriter (and use
     659           1 :         // lastPossibleWriter.err). We also need to call close on all LogWriters
     660           1 :         // that will not close themselves, i.e., those that have already been
     661           1 :         // created and installed in failoverWriter.writers (this set may change
     662           1 :         // while failoverWriter.Close runs).
     663           1 :         for !lastWriter.closed {
     664           1 :                 numWriters := ww.mu.nextWriterIndex
     665           1 :                 if numWriters > closeCalledCount {
     666           1 :                         // INVARIANT: numWriters > closeCalledCount.
     667           1 :                         lastWriter = lastWriterState{
     668           1 :                                 index: numWriters - 1,
     669           1 :                         }
     670           1 :                         // Try to process [closeCalledCount, numWriters). Will surely process
     671           1 :                         // [closeCalledCount, numWriters-1), since those writers are either done
     672           1 :                         // initializing, or will close themselves. The writer at numWriters-1 we
     673           1 :                         // can only process if it is done initializing, else we will iterate
     674           1 :                         // again.
     675           1 :                         for i := closeCalledCount; i < numWriters; i++ {
     676           1 :                                 w := ww.writers[i].w
     677           1 :                                 cErr := ww.writers[i].createError
     678           1 :                                 // Is the current index the last writer. If yes, this is also the last
     679           1 :                                 // loop iteration.
     680           1 :                                 isLastWriter := i == lastWriter.index
     681           1 :                                 if w != nil {
     682           1 :                                         // Can close it, so extend closeCalledCount.
     683           1 :                                         closeCalledCount = i + 1
     684           1 :                                         if isLastWriter {
     685           1 :                                                 // We may care about its error and when it finishes closing.
     686           1 :                                                 index := i
     687           1 :                                                 ww.opts.stopper.runAsync(func() {
     688           1 :                                                         // Last writer(s) (since new writers can be created and become
     689           1 :                                                         // last, as we iterate) are guaranteed to have seen the last
     690           1 :                                                         // record (since it was queued before Close was called). It is
     691           1 :                                                         // possible that a writer got created after the last record was
     692           1 :                                                         // dequeued and before this fact was realized by Close. In that
     693           1 :                                                         // case we will harmlessly tell it that it synced that last
     694           1 :                                                         // record, though it has already been written and synced by
     695           1 :                                                         // another writer.
     696           1 :                                                         err := w.CloseWithLastQueuedRecord(lastRecordIndex)
     697           1 :                                                         ww.mu.Lock()
     698           1 :                                                         defer ww.mu.Unlock()
     699           1 :                                                         if lastWriter.index == index {
     700           1 :                                                                 lastWriter.closed = true
     701           1 :                                                                 lastWriter.err = err
     702           1 :                                                                 lastWriter.metrics = w.Metrics()
     703           1 :                                                                 ww.mu.cond.Signal()
     704           1 :                                                         }
     705             :                                                 })
     706           1 :                                         } else {
     707           1 :                                                 // Don't care about the returned error since all the records we
     708           1 :                                                 // relied on this writer for were already successfully written.
     709           1 :                                                 ww.opts.stopper.runAsync(func() {
     710           1 :                                                         _ = w.CloseWithLastQueuedRecord(record.PendingSyncIndex{Index: record.NoSyncIndex})
     711           1 :                                                 })
     712             :                                         }
     713           1 :                                 } else if cErr != nil {
     714           1 :                                         // Have processed it, so extend closeCalledCount.
     715           1 :                                         closeCalledCount = i + 1
     716           1 :                                         if isLastWriter {
     717           1 :                                                 lastWriter.closed = true
     718           1 :                                                 lastWriter.err = cErr
     719           1 :                                                 lastWriter.metrics = record.LogWriterMetrics{}
     720           1 :                                         }
     721             :                                         // Else, ignore.
     722           1 :                                 } else {
     723           1 :                                         if !isLastWriter {
     724           1 :                                                 // Not last writer, so will close itself.
     725           1 :                                                 closeCalledCount = i + 1
     726           1 :                                         }
     727             :                                         // Else, last writer, so we may have to close it.
     728             :                                 }
     729             :                         }
     730             :                 }
     731           1 :                 if !lastWriter.closed {
     732           1 :                         // Either waiting for creation of last writer or waiting for the close
     733           1 :                         // to finish, or something else to become the last writer.
     734           1 :                         ww.mu.cond.Wait()
     735           1 :                 }
     736             :         }
     737           1 :         err = lastWriter.err
     738           1 :         ww.mu.metrics = lastWriter.metrics
     739           1 :         ww.mu.closed = true
     740           1 :         _, _ = ww.q.popAll(err)
     741           1 :         return logicalOffset, err
     742             : }
     743             : 
     744             : // Metrics implements writer.
     745           1 : func (ww *failoverWriter) Metrics() record.LogWriterMetrics {
     746           1 :         ww.mu.Lock()
     747           1 :         defer ww.mu.Unlock()
     748           1 :         return ww.mu.metrics
     749           1 : }
     750             : 
     751             : // latencyAndErrorRecorder records ongoing write and sync operations and errors
     752             : // in those operations. record.LogWriter cannot continue functioning after any
     753             : // error, so all errors are considered permanent.
     754             : //
     755             : // writeStart/writeEnd are used directly when creating a file. After the file
     756             : // is successfully created, setWriter turns latencyAndErrorRecorder into an
     757             : // implementation of writerSyncerCloser that will record for the Write and
     758             : // Sync methods.
     759             : type latencyAndErrorRecorder struct {
     760             :         ts                    timeSource
     761             :         ongoingOperationStart atomic.Int64
     762             :         error                 atomic.Pointer[error]
     763             :         writerSyncerCloser
     764             : }
     765             : 
     766             : type writerSyncerCloser interface {
     767             :         io.Writer
     768             :         io.Closer
     769             :         Sync() error
     770             : }
     771             : 
     772           1 : func (r *latencyAndErrorRecorder) writeStart() {
     773           1 :         r.ongoingOperationStart.Store(r.ts.now().UnixNano())
     774           1 : }
     775             : 
     776           1 : func (r *latencyAndErrorRecorder) writeEnd(err error) {
     777           1 :         if err != nil {
     778           1 :                 ptr := &err
     779           1 :                 r.error.Store(ptr)
     780           1 :         }
     781           1 :         r.ongoingOperationStart.Store(0)
     782             : }
     783             : 
     784           1 : func (r *latencyAndErrorRecorder) setWriter(w writerSyncerCloser) {
     785           1 :         r.writerSyncerCloser = w
     786           1 : }
     787             : 
     788           1 : func (r *latencyAndErrorRecorder) ongoingLatencyOrError() (time.Duration, error) {
     789           1 :         startTime := r.ongoingOperationStart.Load()
     790           1 :         var latency time.Duration
     791           1 :         if startTime != 0 {
     792           1 :                 l := r.ts.now().UnixNano() - startTime
     793           1 :                 if l < 0 {
     794           0 :                         l = 0
     795           0 :                 }
     796           1 :                 latency = time.Duration(l)
     797             :         }
     798           1 :         errPtr := r.error.Load()
     799           1 :         var err error
     800           1 :         if errPtr != nil {
     801           1 :                 err = *errPtr
     802           1 :         }
     803           1 :         return latency, err
     804             : }
     805             : 
     806             : // Sync implements writerSyncerCloser.
     807           1 : func (r *latencyAndErrorRecorder) Sync() error {
     808           1 :         r.writeStart()
     809           1 :         err := r.writerSyncerCloser.Sync()
     810           1 :         r.writeEnd(err)
     811           1 :         return err
     812           1 : }
     813             : 
     814             : // Write implements io.Writer.
     815           1 : func (r *latencyAndErrorRecorder) Write(p []byte) (n int, err error) {
     816           1 :         r.writeStart()
     817           1 :         n, err = r.writerSyncerCloser.Write(p)
     818           1 :         r.writeEnd(err)
     819           1 :         return n, err
     820           1 : }

Generated by: LCOV version 1.14