LCOV - code coverage report
Current view: top level - pebble/wal/wal - failover_writer.go (source / functions) Coverage Total Hit
Test: 2025-11-16 08:18Z 5729a1c7 - tests + meta.lcov Lines: 97.6 % 581 567
Test Date: 2025-11-16 08:20:11 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package wal
       6              : 
       7              : import (
       8              :         "io"
       9              :         "sync"
      10              :         "sync/atomic"
      11              :         "time"
      12              : 
      13              :         "github.com/cockroachdb/crlib/crtime"
      14              :         "github.com/cockroachdb/errors"
      15              :         "github.com/cockroachdb/pebble/internal/base"
      16              :         "github.com/cockroachdb/pebble/record"
      17              :         "github.com/cockroachdb/pebble/vfs"
      18              :         "github.com/prometheus/client_golang/prometheus"
      19              : )
      20              : 
      21              : // recordQueueEntry is an entry in recordQueue.
      22              : type recordQueueEntry struct {
      23              :         p          []byte
      24              :         opts       SyncOptions
      25              :         refCount   RefCount
      26              :         writeStart crtime.Mono
      27              : }
      28              : 
      29              : type poppedEntry struct {
      30              :         opts       SyncOptions
      31              :         refCount   RefCount
      32              :         writeStart crtime.Mono
      33              : }
      34              : 
      35              : const initialBufferLen = 8192
      36              : 
      37              : // recordQueue is a variable-size single-producer multiple-consumer queue. It
      38              : // is not lock-free, but most operations only need mu.RLock. It needs a mutex
      39              : // to grow the size, since there is no upper bound on the number of queued
      40              : // records (which are all the records that are not synced, and will need to be
      41              : // written again in case of failover). Additionally, it needs a mutex to
      42              : // atomically grab a snapshot of the queued records and provide them to a new
      43              : // LogWriter that is being switched to.
      44              : type recordQueue struct {
      45              :         // Only held for reading for all pop operations and most push operations.
      46              :         // Held for writing when buffer needs to be grown or when switching to a new
      47              :         // writer.
      48              :         mu sync.RWMutex
      49              : 
      50              :         // queue is [tail, head). tail is the oldest entry and head is the index for
      51              :         // the next entry.
      52              :         //
      53              :         // Consumers: atomically read and write tail in pop. This is not the usual
      54              :         // kind of queue consumer since they already know the index that they are
      55              :         // popping exists, hence don't need to look at head.
      56              :         //
      57              :         // Producer: atomically reads tail in push. Writes to head.
      58              :         //
      59              :         // Based on the above we only need tail to be atomic. However, the producer
      60              :         // also populates entries in buffer, whose values need to be seen by the
      61              :         // consumers when doing a pop, which means they need to synchronize using a
      62              :         // release and acquire memory barrier pair, where the push does the release
      63              :         // and the pop does the acquire. For this reason we make head also atomic
      64              :         // and merge head and tail into a single atomic, so that the store of head
      65              :         // in push and the load of tail in pop accomplishes this release-acquire
      66              :         // pair.
      67              :         //
      68              :         // We initially implemented competition between multiple consumers solely
      69              :         // via atomic read-write of the tail using compare-and-swap (CAS). Since the
      70              :         // atomic read-write to tail in pop releases those buffer entries for reuse
      71              :         // to the producer, the consumer needs to grab the contents of
      72              :         // recordQueueEntry that it needs to do callbacks etc. (specifically the
      73              :         // contents corresponding to poppedEntry), *before* it succeeds with the
      74              :         // atomic read-write. This introduced a false data race in the Golang data
      75              :         // race detector
      76              :         // https://github.com/cockroachdb/pebble/issues/3380#issuecomment-1981188174.
      77              :         // Consider the case where the queue is [10,20), and consumer C1 is trying
      78              :         // to pop [10,12) and consumer C2 is trying to pop [10,14). The following
      79              :         // interleaving can happen:
      80              :         //
      81              :         // [C1] reads head=10, tail=20
      82              :         // [C2] reads head=10, tail=20
      83              :         // [C1] reads buffer contents [10,12) and makes local copy
      84              :         // [C1] CAS to make the queue [12,20)
      85              :         // [C2] reads buffer contents [10,14) and makes local copy, concurrently
      86              :         //      with producer writing to 10, 11. *
      87              :         // [C2] CAS fails for popping [10,14), so revises to [12,14), and succeeds
      88              :         //      in CAS. C2 only uses the contents it read into the local copy for
      89              :         //      [12,14).
      90              :         //
      91              :         // * is a false data race since C2 is later deciding what contents it should
      92              :         // use from among the contents it read, based on what indices it
      93              :         // successfully popped. Unfortunately, we don't have a way to annotate the
      94              :         // code to tell the data race detector to ignore this false positive. So we
      95              :         // need to strengthen the synchronization to prevent such false positives.
      96              :         // We observe that usually a consumer will be popping a batch of entries
      97              :         // (based on a single successful fsync), and the number of consumers will be
      98              :         // small (usually 1). In comparison, producers can be highly concurrent (due
      99              :         // to workload concurrency). We don't want consumers to compete for a mutex
     100              :         // with producers, but we can afford to have multiple consumers compete for
     101              :         // a mutex. So we fix this false data race by using consumerMu to force
     102              :         // single-threaded popping.
     103              :         //
     104              :         // An alternative would be to pass the information contained in poppedEntry
     105              :         // to the LogWriter, so that it can pass it back when popping (so we don't
     106              :         // have to retrieve it from the recordQueue.buffer). We would still need
     107              :         // recordQueue.buffer, since writer switching needs those entries to be
     108              :         // replayed. We don't consider this solution for the same reason we replaced
     109              :         // record.pendingSyncsWithSyncQueue with
     110              :         // record.pendingSyncsWithHighestSyncIndex for the failoverWriter code path
     111              :         // -- we cannot bound the queue in the LogWriter by record.SyncConcurrency:
     112              :         // say SyncConcurrency was 4096, and LogWriter1's queue was full, and we
     113              :         // switched to LogWriter2, to which we replayed the same records and filled
     114              :         // up the queue. Then if LogWriter1 unblocks and pops all the 4096 entries,
     115              :         // the commit pipeline can send another 4096 entries, while LogWriter2 is
     116              :         // still blocked on trying to write and sync the previous 4096 entries. This
     117              :         // will overflow the queue in LogWriter2.
     118              :         //
     119              :         // All updates to headTail hold mu at least for reading. So when mu is held
     120              :         // for writing, there is a guarantee that headTail is not being updated.
     121              :         //
     122              :         // head is most-significant 32 bits and tail is least-significant 32 bits.
     123              :         headTail atomic.Uint64
     124              : 
     125              :         consumerMu sync.Mutex
     126              : 
     127              :         // Access to buffer requires at least RLock.
     128              :         buffer []recordQueueEntry
     129              : 
     130              :         lastTailObservedByProducer uint32
     131              : 
     132              :         // Read requires RLock.
     133              :         writer *record.LogWriter
     134              : 
     135              :         // When writer != nil, this is the return value of the last call to
     136              :         // SyncRecordGeneralized. It is updated in (a) WriteRecord calls push, using
     137              :         // only RLock (since WriteRecord is externally synchronized), (b)
     138              :         // snapshotAndSwitchWriter, using Lock. (b) excludes (a).
     139              :         lastLogSize int64
     140              : 
     141              :         failoverWriteAndSyncLatency prometheus.Histogram
     142              : }
     143              : 
     144            2 : func (q *recordQueue) init(failoverWriteAndSyncLatency prometheus.Histogram) {
     145            2 :         *q = recordQueue{
     146            2 :                 buffer:                      make([]recordQueueEntry, initialBufferLen),
     147            2 :                 failoverWriteAndSyncLatency: failoverWriteAndSyncLatency,
     148            2 :         }
     149            2 : }
     150              : 
     151              : // NB: externally synchronized, i.e., no concurrent push calls.
     152              : func (q *recordQueue) push(
     153              :         p []byte,
     154              :         opts SyncOptions,
     155              :         refCount RefCount,
     156              :         writeStart crtime.Mono,
     157              :         latestLogSizeInWriteRecord int64,
     158              :         latestWriterInWriteRecord *record.LogWriter,
     159            2 : ) (index uint32, writer *record.LogWriter, lastLogSize int64) {
     160            2 :         ht := q.headTail.Load()
     161            2 :         h, t := unpackHeadTail(ht)
     162            2 :         n := int(h - t)
     163            2 :         m := len(q.buffer)
     164            2 :         if m == n {
     165            1 :                 // Full
     166            1 :                 m = 2 * n
     167            1 :                 newBuffer := make([]recordQueueEntry, m)
     168            1 :                 for i := int(t); i < int(h); i++ {
     169            1 :                         newBuffer[i%m] = q.buffer[i%n]
     170            1 :                 }
     171            1 :                 q.mu.Lock()
     172            1 :                 q.buffer = newBuffer
     173            1 :                 q.mu.Unlock()
     174              :         }
     175            2 :         q.mu.RLock()
     176            2 :         q.buffer[int(h)%m] = recordQueueEntry{
     177            2 :                 p:          p,
     178            2 :                 opts:       opts,
     179            2 :                 refCount:   refCount,
     180            2 :                 writeStart: writeStart,
     181            2 :         }
     182            2 :         // Reclaim memory for consumed entries. We couldn't do that in pop since
     183            2 :         // multiple consumers are popping using CAS and that immediately transfers
     184            2 :         // ownership to the producer.
     185            2 :         for i := q.lastTailObservedByProducer; i < t; i++ {
     186            2 :                 q.buffer[int(i)%m] = recordQueueEntry{}
     187            2 :         }
     188            2 :         q.lastTailObservedByProducer = t
     189            2 :         q.headTail.Add(1 << headTailBits)
     190            2 :         writer = q.writer
     191            2 :         if writer == latestWriterInWriteRecord {
     192            2 :                 // WriteRecord has written to this writer since the switch.
     193            2 :                 q.lastLogSize = latestLogSizeInWriteRecord
     194            2 :         }
     195              :         // Else writer is a new writer that was switched to, so ignore the
     196              :         // latestLogSizeInWriteRecord.
     197              : 
     198            2 :         lastLogSize = q.lastLogSize
     199            2 :         q.mu.RUnlock()
     200            2 :         return h, writer, lastLogSize
     201              : }
     202              : 
     203            1 : func (q *recordQueue) length() int {
     204            1 :         ht := q.headTail.Load()
     205            1 :         h, t := unpackHeadTail(ht)
     206            1 :         return int(h - t)
     207            1 : }
     208              : 
     209              : // Pops all entries. Must be called only after the last push returns.
     210            2 : func (q *recordQueue) popAll(err error) (numRecords int, numSyncsPopped int) {
     211            2 :         ht := q.headTail.Load()
     212            2 :         h, t := unpackHeadTail(ht)
     213            2 :         n := int(h - t)
     214            2 :         if n == 0 {
     215            2 :                 return 0, 0
     216            2 :         }
     217            1 :         return n, q.pop(h-1, err)
     218              : }
     219              : 
     220              : // Pops all entries up to and including index. The remaining queue is
     221              : // [index+1, head).
     222              : //
     223              : // NB: we could slightly simplify to only have the latest writer be able to
     224              : // pop. This would avoid the CAS below, but it seems better to reduce the
     225              : // amount of queued work regardless of who has successfully written it.
     226            2 : func (q *recordQueue) pop(index uint32, err error) (numSyncsPopped int) {
     227            2 :         now := crtime.NowMono()
     228            2 :         var buf [512]poppedEntry
     229            2 :         tailEntriesToPop := func() (t uint32, numEntriesToPop int) {
     230            2 :                 ht := q.headTail.Load()
     231            2 :                 _, t = unpackHeadTail(ht)
     232            2 :                 tail := int(t)
     233            2 :                 numEntriesToPop = int(index) - tail + 1
     234            2 :                 return t, numEntriesToPop
     235            2 :         }
     236            2 :         q.consumerMu.Lock()
     237            2 :         // numEntriesToPop is a function of index and tail. The value of tail cannot
     238            2 :         // change since consumerMu is held.
     239            2 :         tail, numEntriesToPop := tailEntriesToPop()
     240            2 :         if numEntriesToPop <= 0 {
     241            2 :                 q.consumerMu.Unlock()
     242            2 :                 return 0
     243            2 :         }
     244            2 :         var b []poppedEntry
     245            2 :         if numEntriesToPop <= len(buf) {
     246            2 :                 b = buf[:numEntriesToPop]
     247            2 :         } else {
     248            1 :                 // Do allocation before acquiring the mutex.
     249            1 :                 b = make([]poppedEntry, numEntriesToPop)
     250            1 :         }
     251            2 :         q.mu.RLock()
     252            2 :         n := len(q.buffer)
     253            2 :         for i := 0; i < numEntriesToPop; i++ {
     254            2 :                 // Grab the popped entries before incrementing tail, since that will
     255            2 :                 // release those buffer slots to the producer.
     256            2 :                 idx := (i + int(tail)) % n
     257            2 :                 b[i] = poppedEntry{
     258            2 :                         opts:       q.buffer[idx].opts,
     259            2 :                         refCount:   q.buffer[idx].refCount,
     260            2 :                         writeStart: q.buffer[idx].writeStart,
     261            2 :                 }
     262            2 :         }
     263              :         // Since tail cannot change, we don't need to do a compare-and-swap.
     264            2 :         q.headTail.Add(uint64(numEntriesToPop))
     265            2 :         q.mu.RUnlock()
     266            2 :         q.consumerMu.Unlock()
     267            2 :         addLatencySample := false
     268            2 :         var maxLatency time.Duration
     269            2 :         for i := 0; i < numEntriesToPop; i++ {
     270            2 :                 // Now that we've synced the entry, we can unref it to signal that we
     271            2 :                 // will not read the written byte slice again.
     272            2 :                 if b[i].refCount != nil {
     273            2 :                         b[i].refCount.Unref()
     274            2 :                 }
     275            2 :                 if b[i].opts.Done != nil {
     276            2 :                         numSyncsPopped++
     277            2 :                         if err != nil {
     278            1 :                                 *b[i].opts.Err = err
     279            1 :                         }
     280            2 :                         b[i].opts.Done.Done()
     281            2 :                         latency := now.Sub(b[i].writeStart)
     282            2 :                         if !addLatencySample {
     283            2 :                                 addLatencySample = true
     284            2 :                                 maxLatency = latency
     285            2 :                         } else if maxLatency < latency {
     286            0 :                                 maxLatency = latency
     287            0 :                         }
     288              :                 }
     289              :         }
     290            2 :         if addLatencySample {
     291            2 :                 if maxLatency < 0 {
     292            0 :                         maxLatency = 0
     293            0 :                 }
     294            2 :                 q.failoverWriteAndSyncLatency.Observe(float64(maxLatency))
     295              :         }
     296            2 :         return numSyncsPopped
     297              : }
     298              : 
     299              : func (q *recordQueue) snapshotAndSwitchWriter(
     300              :         writer *record.LogWriter,
     301              :         snapshotFunc func(firstIndex uint32, entries []recordQueueEntry) (logSize int64),
     302            2 : ) {
     303            2 :         q.mu.Lock()
     304            2 :         defer q.mu.Unlock()
     305            2 :         q.writer = writer
     306            2 :         h, t := unpackHeadTail(q.headTail.Load())
     307            2 :         n := h - t
     308            2 :         if n > 0 {
     309            2 :                 m := uint32(len(q.buffer))
     310            2 :                 b := make([]recordQueueEntry, n)
     311            2 :                 for i := t; i < h; i++ {
     312            2 :                         b[i-t] = q.buffer[i%m]
     313            2 :                 }
     314            2 :                 q.lastLogSize = snapshotFunc(t, b)
     315              :         }
     316              : }
     317              : 
     318              : // getLastIndex is used by failoverWriter.Close.
     319            2 : func (q *recordQueue) getLastIndex() (lastIndex int64) {
     320            2 :         h, _ := unpackHeadTail(q.headTail.Load())
     321            2 :         return int64(h) - 1
     322            2 : }
     323              : 
     324              : const headTailBits = 32
     325              : 
     326            2 : func unpackHeadTail(ht uint64) (head, tail uint32) {
     327            2 :         const mask = 1<<headTailBits - 1
     328            2 :         head = uint32((ht >> headTailBits) & mask)
     329            2 :         tail = uint32(ht & mask)
     330            2 :         return head, tail
     331            2 : }
     332              : 
     333              : // Maximum number of physical log files when writing a virtual WAL. Arbitrarily
     334              : // chosen value. Setting this to 2 will not simplify the code. We make this a
     335              : // constant since we want a fixed size array for writer.writers.
     336              : const maxPhysicalLogs = 10
     337              : 
     338              : // failoverWriter is the implementation of Writer in failover mode. No Writer
     339              : // method blocks for IO, except for Close.
     340              : //
     341              : // Loosely speaking, Close blocks until all records are successfully written
     342              : // and synced to some log writer. Monitoring of log writer latency and errors
     343              : // continues after Close is called, which means failoverWriter can be switched
     344              : // to a new log writer after Close is called, to unblock Close.
     345              : //
     346              : // More precisely, Close does not block if there is an error in creating or
     347              : // closing the latest LogWriter when close was called. This is because errors
     348              : // are considered indicative of misconfiguration, and the user of
     349              : // failoverWriter can dampen switching when observing errors (e.g. see
     350              : // failoverMonitor), so close does not assume any liveness of calls to
     351              : // switchToNewDir when such errors occur. Since the caller (see db.go) treats
     352              : // an error on Writer.Close as fatal, this does mean that failoverWriter has
     353              : // limited ability to mask errors (its primary task is to mask high latency).
     354              : type failoverWriter struct {
     355              :         opts failoverWriterOpts
     356              :         q    recordQueue
     357              :         mu   struct {
     358              :                 sync.Mutex
     359              :                 // writers is protected by mu, except for updates to the
     360              :                 // latencyAndErrorRecorder field. WriteRecord does not acquire mu, so the
     361              :                 // protection by mu is for handling concurrent calls to switchToNewDir,
     362              :                 // Close, and getLog.
     363              :                 writers [maxPhysicalLogs]logWriterAndRecorder
     364              : 
     365              :                 // cond is signaled when the latest LogWriter is set in writers (or there
     366              :                 // is a creation error), or when the latest LogWriter is successfully
     367              :                 // closed. It is waited on in Close. We don't use channels and select
     368              :                 // since what Close is waiting on is dynamic based on the local state in
     369              :                 // Close, so using Cond is simpler.
     370              :                 cond *sync.Cond
     371              :                 // nextWriterIndex is advanced before creating the *LogWriter. That is, a
     372              :                 // slot is reserved by taking the current value of nextWriterIndex and
     373              :                 // incrementing it, and then the *LogWriter for that slot is created. When
     374              :                 // newFailoverWriter returns, nextWriterIndex = 1.
     375              :                 //
     376              :                 // The latest *LogWriter is (will be) at nextWriterIndex-1.
     377              :                 //
     378              :                 // INVARIANT: nextWriterIndex <= len(writers)
     379              :                 nextWriterIndex LogNameIndex
     380              :                 closed          bool
     381              :                 // metrics is initialized in Close. Currently we just use the metrics from
     382              :                 // the latest writer after it is closed, since in the common case with
     383              :                 // only one writer, that writer's flush loop will have finished and the
     384              :                 // metrics will be current. With multiple writers, these metrics can be
     385              :                 // quite inaccurate. The WriteThroughput metric includes an IdleDuration,
     386              :                 // which can be high for a writer that was switched away from, and
     387              :                 // therefore not indicative of overall work being done by the
     388              :                 // failoverWriter. The PendingBufferLen and SyncQueueLen are similarly
     389              :                 // inaccurate once there is no more work being given to a writer. We could
     390              :                 // add a method to LogWriter to stop sampling metrics when it is not the
     391              :                 // latest writer. Then we could aggregate all these metrics across all
     392              :                 // writers.
     393              :                 //
     394              :                 // Note that CockroachDB does not use these metrics in any meaningful way.
     395              :                 //
     396              :                 // TODO(sumeer): do the improved solution outlined above.
     397              :                 metrics record.LogWriterMetrics
     398              :         }
     399              :         // State for computing logical offset. The cumulative offset state is in
     400              :         // offset. Each time we call SyncRecordGeneralized from WriteRecord, we
     401              :         // compute the delta from the size returned by this LogWriter now, and the
     402              :         // size returned by this LogWriter in the previous call to
     403              :         // SyncRecordGeneralized. That previous call to SyncRecordGeneralized may
     404              :         // have happened from WriteRecord, or asynchronously during a switch. So
     405              :         // that previous call state requires synchronization and is maintained in
     406              :         // recordQueue. The offset is incremented by this delta without any
     407              :         // synchronization, since we rely on external synchronization (like the
     408              :         // standaloneWriter).
     409              :         logicalOffset struct {
     410              :                 latestWriterInWriteRecord  *record.LogWriter
     411              :                 latestLogSizeInWriteRecord int64
     412              :                 offset                     int64
     413              :                 // Transitions once from false => true when there is a non-nil writer.
     414              :                 notEstimatedOffset bool
     415              :         }
     416              :         psiForWriteRecordBacking record.PendingSyncIndex
     417              :         psiForSwitchBacking      record.PendingSyncIndex
     418              : }
     419              : 
     420              : type logWriterAndRecorder struct {
     421              :         // This may never become non-nil, if when the LogWriter was finally created,
     422              :         // it was no longer the latest writer. Additionally, if there was an error
     423              :         // in creating the writer, w will remain nil and createError will be set.
     424              :         w *record.LogWriter
     425              :         // createError is set if there is an error creating the writer. This is
     426              :         // useful in Close since we need to know when the work for creating the
     427              :         // latest writer is done, whether it resulted in success or not.
     428              :         createError error
     429              :         r           latencyAndErrorRecorder
     430              : 
     431              :         // dir, approxFileSize, synchronouslyClosed are kept for initializing
     432              :         // segmentWithSizeEtc. The approxFileSize is initially set to whatever is
     433              :         // returned by logCreator. When failoverWriter.Close is called,
     434              :         // approxFileSize and synchronouslyClosed may be updated.
     435              :         dir                 Dir
     436              :         approxFileSize      uint64
     437              :         synchronouslyClosed bool
     438              : }
     439              : 
     440              : var _ Writer = &failoverWriter{}
     441              : 
     442              : var _ switchableWriter = &failoverWriter{}
     443              : 
     444              : type failoverWriterOpts struct {
     445              :         wn     NumWAL
     446              :         logger base.Logger
     447              :         timeSource
     448              :         jobID int
     449              :         logCreator
     450              : 
     451              :         // Options that feed into SyncingFileOptions.
     452              :         noSyncOnClose   bool
     453              :         bytesPerSync    int
     454              :         preallocateSize func() int
     455              : 
     456              :         // Options for record.LogWriter.
     457              :         minSyncInterval func() time.Duration
     458              :         fsyncLatency    prometheus.Histogram
     459              :         queueSemChan    chan struct{}
     460              :         stopper         *stopper
     461              : 
     462              :         failoverWriteAndSyncLatency prometheus.Histogram
     463              :         // writerClosed is a callback invoked by the FailoverWriter when it's
     464              :         // closed. It notifies the FailoverManager that the writer is now closed and
     465              :         // propagates information about the various physical segment files that have
     466              :         // been created.
     467              :         //
     468              :         // Note that the asynchronous creation of physical segment files means that
     469              :         // the writerClosed invocation is not guaranteed to include all physical
     470              :         // segment files that will ultimately be created for this logical WAL. If a
     471              :         // new segment file is created after writerClosed is inovked, it will be
     472              :         // propagated to the FailoverManager via the segmentClosed callback.
     473              :         writerClosed func(logicalLogWithSizesEtc)
     474              :         // segmentClosed is a callback invoked by the FailoverWriter when a segment
     475              :         // file creation completes but the writerClosed callback has already been
     476              :         // invoked. It's used to ensure that we reclaim all physical segment files,
     477              :         // including ones that did not complete creation before the Writer was
     478              :         // closed.
     479              :         segmentClosed func(logicalLogWithSizesEtc)
     480              : 
     481              :         writerCreatedForTest chan<- struct{}
     482              : 
     483              :         // writeWALSyncOffsets determines whether to write WAL sync chunk offsets.
     484              :         // The format major version can change (ratchet) at runtime, so this must be
     485              :         // a function rather than a static bool to ensure we use the latest format version.
     486              :         writeWALSyncOffsets func() bool
     487              : }
     488              : 
     489              : func simpleLogCreator(
     490              :         dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
     491            1 : ) (f vfs.File, initialFileSize uint64, err error) {
     492            1 :         filename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li))
     493            1 :         // Create file.
     494            1 :         r.writeStart()
     495            1 :         f, err = dir.FS.Create(filename, "pebble-wal")
     496            1 :         r.writeEnd(err)
     497            1 :         return f, 0, err
     498            1 : }
     499              : 
     500              : type logCreator func(
     501              :         dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
     502              : ) (f vfs.File, initialFileSize uint64, err error)
     503              : 
     504              : func newFailoverWriter(
     505              :         opts failoverWriterOpts, initialDir dirAndFileHandle,
     506            2 : ) (*failoverWriter, error) {
     507            2 :         ww := &failoverWriter{
     508            2 :                 opts: opts,
     509            2 :         }
     510            2 :         ww.q.init(opts.failoverWriteAndSyncLatency)
     511            2 :         ww.mu.cond = sync.NewCond(&ww.mu)
     512            2 :         // The initial record.LogWriter creation also happens via a
     513            2 :         // switchToNewWriter since we don't want it to block newFailoverWriter.
     514            2 :         err := ww.switchToNewDir(initialDir)
     515            2 :         if err != nil {
     516            0 :                 // Switching limit cannot be exceeded when creating.
     517            0 :                 panic(err)
     518              :         }
     519            2 :         return ww, nil
     520              : }
     521              : 
     522              : // WriteRecord implements Writer.
     523              : func (ww *failoverWriter) WriteRecord(
     524              :         p []byte, opts SyncOptions, ref RefCount,
     525            2 : ) (logicalOffset int64, err error) {
     526            2 :         if ref != nil {
     527            2 :                 ref.Ref()
     528            2 :         }
     529            2 :         var writeStart crtime.Mono
     530            2 :         if opts.Done != nil {
     531            2 :                 writeStart = crtime.NowMono()
     532            2 :         }
     533            2 :         recordIndex, writer, lastLogSize := ww.q.push(
     534            2 :                 p,
     535            2 :                 opts,
     536            2 :                 ref,
     537            2 :                 writeStart,
     538            2 :                 ww.logicalOffset.latestLogSizeInWriteRecord,
     539            2 :                 ww.logicalOffset.latestWriterInWriteRecord,
     540            2 :         )
     541            2 :         if writer == nil {
     542            2 :                 // Don't have a record.LogWriter yet, so use an estimate. This estimate
     543            2 :                 // will get overwritten.
     544            2 :                 ww.logicalOffset.offset += int64(len(p))
     545            2 :                 return ww.logicalOffset.offset, nil
     546            2 :         }
     547              :         // INVARIANT: writer != nil.
     548            2 :         notEstimatedOffset := ww.logicalOffset.notEstimatedOffset
     549            2 :         if !notEstimatedOffset {
     550            2 :                 ww.logicalOffset.notEstimatedOffset = true
     551            2 :         }
     552            2 :         ww.psiForWriteRecordBacking = record.PendingSyncIndex{Index: record.NoSyncIndex}
     553            2 :         if opts.Done != nil {
     554            2 :                 ww.psiForWriteRecordBacking.Index = int64(recordIndex)
     555            2 :         }
     556            2 :         ww.logicalOffset.latestLogSizeInWriteRecord, err = writer.SyncRecordGeneralized(p, &ww.psiForWriteRecordBacking)
     557            2 :         ww.logicalOffset.latestWriterInWriteRecord = writer
     558            2 :         if notEstimatedOffset {
     559            2 :                 delta := ww.logicalOffset.latestLogSizeInWriteRecord - lastLogSize
     560            2 :                 ww.logicalOffset.offset += delta
     561            2 :         } else {
     562            2 :                 // Overwrite the estimate. This is a best-effort improvement in that it is
     563            2 :                 // accurate for the common case where writer is the first LogWriter.
     564            2 :                 // Consider a failover scenario where there was no LogWriter for the first
     565            2 :                 // 10 records, so they are all accumulated as an estimate. Then the first
     566            2 :                 // LogWriter successfully writes and syncs the first 5 records and gets
     567            2 :                 // stuck. A switch happens to a second LogWriter that is handed the
     568            2 :                 // remaining 5 records, and the 11th record arrives via a WriteRecord.
     569            2 :                 // The transition from !notEstimatedOffset to notEstimatedOffset will
     570            2 :                 // happen on this 11th record, and the logic here will use the length of
     571            2 :                 // the second LogWriter, that does not reflect the full length.
     572            2 :                 //
     573            2 :                 // TODO(sumeer): try to make this more correct, without adding much more
     574            2 :                 // complexity, and without adding synchronization.
     575            2 :                 ww.logicalOffset.offset = ww.logicalOffset.latestLogSizeInWriteRecord
     576            2 :         }
     577            2 :         return ww.logicalOffset.offset, err
     578              : }
     579              : 
     580              : // switchToNewDir starts switching to dir. It implements switchableWriter. All
     581              : // work is async, and a non-nil error is returned only if the switching limit
     582              : // is exceeded.
     583            2 : func (ww *failoverWriter) switchToNewDir(dir dirAndFileHandle) error {
     584            2 :         ww.mu.Lock()
     585            2 :         // Can have a late switchToNewDir call is the failoverMonitor has not yet
     586            2 :         // been told that the writer is closed. Ignore.
     587            2 :         if ww.mu.closed {
     588            1 :                 ww.mu.Unlock()
     589            1 :                 if ww.opts.writerCreatedForTest != nil {
     590            1 :                         ww.opts.writerCreatedForTest <- struct{}{}
     591            1 :                 }
     592            1 :                 return nil
     593              :         }
     594              :         // writerIndex is the slot for this writer.
     595            2 :         writerIndex := ww.mu.nextWriterIndex
     596            2 :         if int(writerIndex) == len(ww.mu.writers) {
     597            1 :                 ww.mu.Unlock()
     598            1 :                 return errors.Errorf("exceeded switching limit")
     599            1 :         }
     600            2 :         ww.mu.writers[writerIndex].dir = dir.Dir
     601            2 :         ww.mu.nextWriterIndex++
     602            2 :         ww.mu.Unlock()
     603            2 : 
     604            2 :         // Creation is async.
     605            2 :         ww.opts.stopper.runAsync(func() {
     606            2 :                 recorderAndWriter := &ww.mu.writers[writerIndex].r
     607            2 :                 recorderAndWriter.ts = ww.opts.timeSource
     608            2 :                 file, initialFileSize, err := ww.opts.logCreator(
     609            2 :                         dir.Dir, ww.opts.wn, writerIndex, recorderAndWriter, ww.opts.jobID)
     610            2 :                 ww.mu.writers[writerIndex].approxFileSize = initialFileSize
     611            2 :                 // handleErrFunc is called when err != nil. It handles the multiple IO error
     612            2 :                 // cases below.
     613            2 :                 handleErrFunc := func(err error) {
     614            1 :                         if file != nil {
     615            1 :                                 file.Close()
     616            1 :                         }
     617            1 :                         ww.mu.Lock()
     618            1 :                         defer ww.mu.Unlock()
     619            1 :                         ww.mu.writers[writerIndex].createError = err
     620            1 :                         ww.mu.cond.Signal()
     621            1 :                         if ww.opts.writerCreatedForTest != nil {
     622            1 :                                 ww.opts.writerCreatedForTest <- struct{}{}
     623            1 :                         }
     624              :                 }
     625            2 :                 if err != nil {
     626            1 :                         handleErrFunc(err)
     627            1 :                         return
     628            1 :                 }
     629              :                 // Sync dir.
     630            2 :                 recorderAndWriter.writeStart()
     631            2 :                 err = dir.Sync()
     632            2 :                 recorderAndWriter.writeEnd(err)
     633            2 :                 if err != nil {
     634            1 :                         handleErrFunc(err)
     635            1 :                         return
     636            1 :                 }
     637              :                 // Wrap in a syncingFile.
     638            2 :                 syncingFile := vfs.NewSyncingFile(file, vfs.SyncingFileOptions{
     639            2 :                         NoSyncOnClose:   ww.opts.noSyncOnClose,
     640            2 :                         BytesPerSync:    ww.opts.bytesPerSync,
     641            2 :                         PreallocateSize: ww.opts.preallocateSize(),
     642            2 :                 })
     643            2 :                 // Wrap in the latencyAndErrorRecorder.
     644            2 :                 recorderAndWriter.setWriter(syncingFile)
     645            2 : 
     646            2 :                 // Using NumWAL as the DiskFileNum is fine since it is used only as
     647            2 :                 // EOF trailer for safe log recycling. Even though many log files can
     648            2 :                 // map to a single NumWAL, a file used for NumWAL n at index m will
     649            2 :                 // never get recycled for NumWAL n at a later index (since recycling
     650            2 :                 // happens when n as a whole is obsolete).
     651            2 :                 w := record.NewLogWriter(recorderAndWriter, base.DiskFileNum(ww.opts.wn),
     652            2 :                         record.LogWriterConfig{
     653            2 :                                 WALMinSyncInterval:        ww.opts.minSyncInterval,
     654            2 :                                 WALFsyncLatency:           ww.opts.fsyncLatency,
     655            2 :                                 QueueSemChan:              ww.opts.queueSemChan,
     656            2 :                                 ExternalSyncQueueCallback: ww.doneSyncCallback,
     657            2 :                                 WriteWALSyncOffsets:       ww.opts.writeWALSyncOffsets,
     658            2 :                         })
     659            2 :                 closeWriter := func() bool {
     660            2 :                         ww.mu.Lock()
     661            2 :                         defer ww.mu.Unlock()
     662            2 :                         if writerIndex+1 != ww.mu.nextWriterIndex || ww.mu.closed {
     663            2 :                                 // Not the latest writer or the writer was closed while this async
     664            2 :                                 // creation was ongoing.
     665            2 :                                 if ww.opts.writerCreatedForTest != nil {
     666            1 :                                         ww.opts.writerCreatedForTest <- struct{}{}
     667            1 :                                 }
     668            2 :                                 return true
     669              :                         }
     670              :                         // Latest writer.
     671            2 :                         ww.mu.writers[writerIndex].w = w
     672            2 :                         ww.mu.cond.Signal()
     673            2 :                         // NB: snapshotAndSwitchWriter does not block on IO, since
     674            2 :                         // SyncRecordGeneralized does no IO.
     675            2 :                         ww.q.snapshotAndSwitchWriter(w,
     676            2 :                                 func(firstIndex uint32, entries []recordQueueEntry) (logSize int64) {
     677            2 :                                         for i := range entries {
     678            2 :                                                 ww.psiForSwitchBacking = record.PendingSyncIndex{Index: record.NoSyncIndex}
     679            2 :                                                 if entries[i].opts.Done != nil {
     680            2 :                                                         ww.psiForSwitchBacking.Index = int64(firstIndex) + int64(i)
     681            2 :                                                 }
     682            2 :                                                 var err error
     683            2 :                                                 logSize, err = w.SyncRecordGeneralized(entries[i].p, &ww.psiForSwitchBacking)
     684            2 :                                                 if err != nil {
     685            0 :                                                         // TODO(sumeer): log periodically. The err will also surface via
     686            0 :                                                         // the latencyAndErrorRecorder, so if a switch is possible, it
     687            0 :                                                         // will be done.
     688            0 :                                                         ww.opts.logger.Errorf("%s", err)
     689            0 :                                                 }
     690              :                                         }
     691            2 :                                         return logSize
     692              :                                 })
     693            2 :                         if ww.opts.writerCreatedForTest != nil {
     694            1 :                                 ww.opts.writerCreatedForTest <- struct{}{}
     695            1 :                         }
     696            2 :                         return false
     697              :                 }()
     698            2 :                 if closeWriter {
     699            2 :                         // Never wrote anything to this writer so don't care about the
     700            2 :                         // returned error.
     701            2 :                         ww.opts.stopper.runAsync(func() {
     702            2 :                                 _ = w.Close()
     703            2 :                                 // Invoke the segmentClosed callback to propagate knowledge that
     704            2 :                                 // there's an obsolete segment file we should clean up. Note
     705            2 :                                 // that the file may be occupying non-negligible disk space even
     706            2 :                                 // though we never wrote to it due to preallocation.
     707            2 :                                 ww.opts.segmentClosed(logicalLogWithSizesEtc{
     708            2 :                                         num: ww.opts.wn,
     709            2 :                                         segments: []segmentWithSizeEtc{
     710            2 :                                                 {
     711            2 :                                                         segment: segment{
     712            2 :                                                                 logNameIndex: LogNameIndex(writerIndex),
     713            2 :                                                                 dir:          dir.Dir,
     714            2 :                                                         },
     715            2 :                                                         approxFileSize:      initialFileSize,
     716            2 :                                                         synchronouslyClosed: false,
     717            2 :                                                 },
     718            2 :                                         },
     719            2 :                                 })
     720            2 :                         })
     721              :                 }
     722              :         })
     723            2 :         return nil
     724              : }
     725              : 
     726              : // doneSyncCallback is the record.ExternalSyncQueueCallback called by
     727              : // record.LogWriter.
     728              : //
     729              : // recordQueue is popped from only when some work requests a sync (and
     730              : // successfully syncs). In the worst case, if no syncs are requested, we could
     731              : // queue all the records needed to fill up a memtable in the recordQueue. This
     732              : // can have two negative effects: (a) in the case of failover, we need to
     733              : // replay all the data in the current mutable memtable, which takes more time,
     734              : // (b) the memory usage is proportional to the size of the memtable. We ignore
     735              : // these negatives since, (a) users like CockroachDB regularly sync, and (b)
     736              : // the default memtable size is only 64MB.
     737            2 : func (ww *failoverWriter) doneSyncCallback(doneSync record.PendingSyncIndex, err error) {
     738            2 :         if err != nil {
     739            1 :                 // Don't pop anything since we can retry after switching to a new
     740            1 :                 // LogWriter.
     741            1 :                 return
     742            1 :         }
     743              :         // NB: harmless after Close returns since numSyncsPopped will be 0.
     744            2 :         numSyncsPopped := ww.q.pop(uint32(doneSync.Index), err)
     745            2 :         if ww.opts.queueSemChan != nil {
     746            2 :                 for i := 0; i < numSyncsPopped; i++ {
     747            2 :                         <-ww.opts.queueSemChan
     748            2 :                 }
     749              :         }
     750              : }
     751              : 
     752              : // ongoingLatencyOrErrorForCurDir implements switchableWriter.
     753            2 : func (ww *failoverWriter) ongoingLatencyOrErrorForCurDir() (time.Duration, error) {
     754            2 :         r := ww.recorderForCurDir()
     755            2 :         if r == nil {
     756            1 :                 return 0, nil
     757            1 :         }
     758            2 :         return r.ongoingLatencyOrError()
     759              : }
     760              : 
     761              : // For internal use and testing.
     762            2 : func (ww *failoverWriter) recorderForCurDir() *latencyAndErrorRecorder {
     763            2 :         ww.mu.Lock()
     764            2 :         defer ww.mu.Unlock()
     765            2 :         if ww.mu.closed {
     766            1 :                 return nil
     767            1 :         }
     768            2 :         return &ww.mu.writers[ww.mu.nextWriterIndex-1].r
     769              : }
     770              : 
     771              : // Close implements Writer.
     772              : //
     773              : // NB: getOngoingLatencyOrErrorForLatestWriter and switchToNewDir can be
     774              : // called after Close is called, and there is also a possibility that they get
     775              : // called after Close returns and before failoverMonitor knows that the
     776              : // failoverWriter is closed.
     777              : //
     778              : // doneSyncCallback can be called anytime after Close returns since there
     779              : // could be stuck writes that finish arbitrarily later.
     780              : //
     781              : // See the long comment about Close behavior where failoverWriter is declared.
     782            2 : func (ww *failoverWriter) Close() (logicalOffset int64, err error) {
     783            2 :         offset, err := ww.closeInternal()
     784            2 :         ww.opts.writerClosed(ww.getLog())
     785            2 :         return offset, err
     786            2 : }
     787              : 
     788            2 : func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) {
     789            2 :         logicalOffset = ww.logicalOffset.offset
     790            2 :         // [0, closeCalledCount) have had LogWriter.Close called (though may not
     791            2 :         // have finished) or the LogWriter will never be non-nil. Either way, they
     792            2 :         // have been "processed".
     793            2 :         closeCalledCount := LogNameIndex(0)
     794            2 :         // lastWriterState is the state for the last writer, for which we are
     795            2 :         // waiting for LogWriter.Close to finish or for creation to be unsuccessful.
     796            2 :         // What is considered the last writer can change. All state is protected by
     797            2 :         // ww.mu.
     798            2 :         type lastWriterState struct {
     799            2 :                 index   LogNameIndex
     800            2 :                 closed  bool
     801            2 :                 err     error
     802            2 :                 metrics record.LogWriterMetrics
     803            2 :         }
     804            2 :         var lastWriter lastWriterState
     805            2 :         lastRecordIndex := record.PendingSyncIndex{Index: ww.q.getLastIndex()}
     806            2 :         ww.mu.Lock()
     807            2 :         defer ww.mu.Unlock()
     808            2 :         numWriters := ww.mu.nextWriterIndex
     809            2 :         // Every iteration starts and ends with the mutex held.
     810            2 :         //
     811            2 :         // Invariant: numWriters >= 1.
     812            2 :         //
     813            2 :         // We will loop until we have closed the lastWriter (and use
     814            2 :         // lastWriter.err). We also need to call close on all LogWriters
     815            2 :         // that will not close themselves, i.e., those that have already been
     816            2 :         // created and installed in failoverWriter.writers (this set may change
     817            2 :         // while failoverWriter.Close runs).
     818            2 :         for !lastWriter.closed || numWriters > lastWriter.index+1 {
     819            2 :                 if numWriters > closeCalledCount {
     820            2 :                         // lastWriter.index may or may not have advanced. If it has advanced, we
     821            2 :                         // need to reinitialize lastWriterState. If it hasn't advanced, and
     822            2 :                         // numWriters > closeCalledCount, we know that we haven't called close
     823            2 :                         // on it, so nothing in lastWriterState needs to be retained. For
     824            2 :                         // simplicity, we overwrite in both cases.
     825            2 :                         lastWriter = lastWriterState{
     826            2 :                                 index: numWriters - 1,
     827            2 :                         }
     828            2 :                         // Try to process [closeCalledCount, numWriters). Will surely process
     829            2 :                         // [closeCalledCount, numWriters-1), since those writers are either done
     830            2 :                         // initializing, or will close themselves. The writer at numWriters-1 we
     831            2 :                         // can only process if it is done initializing, else we will iterate
     832            2 :                         // again.
     833            2 :                         for i := closeCalledCount; i < numWriters; i++ {
     834            2 :                                 w := ww.mu.writers[i].w
     835            2 :                                 cErr := ww.mu.writers[i].createError
     836            2 :                                 // Is the current index the last writer. If yes, this is also the last
     837            2 :                                 // loop iteration.
     838            2 :                                 isLastWriter := i == lastWriter.index
     839            2 :                                 if w != nil {
     840            2 :                                         // Can close it, so extend closeCalledCount.
     841            2 :                                         closeCalledCount = i + 1
     842            2 :                                         size := uint64(w.Size())
     843            2 :                                         if ww.mu.writers[i].approxFileSize < size {
     844            2 :                                                 ww.mu.writers[i].approxFileSize = size
     845            2 :                                         }
     846            2 :                                         if isLastWriter {
     847            2 :                                                 // We may care about its error and when it finishes closing.
     848            2 :                                                 index := i
     849            2 :                                                 ww.opts.stopper.runAsync(func() {
     850            2 :                                                         // Last writer(s) (since new writers can be created and become
     851            2 :                                                         // last, as we iterate) are guaranteed to have seen the last
     852            2 :                                                         // record (since it was queued before Close was called). It is
     853            2 :                                                         // possible that a writer got created after the last record was
     854            2 :                                                         // dequeued and before this fact was realized by Close. In that
     855            2 :                                                         // case we will harmlessly tell it that it synced that last
     856            2 :                                                         // record, though it has already been written and synced by
     857            2 :                                                         // another writer.
     858            2 :                                                         err := w.CloseWithLastQueuedRecord(lastRecordIndex)
     859            2 :                                                         ww.mu.Lock()
     860            2 :                                                         defer ww.mu.Unlock()
     861            2 :                                                         if lastWriter.index == index {
     862            2 :                                                                 lastWriter.closed = true
     863            2 :                                                                 lastWriter.err = err
     864            2 :                                                                 lastWriter.metrics = w.Metrics()
     865            2 :                                                                 ww.mu.cond.Signal()
     866            2 :                                                         }
     867              :                                                 })
     868            2 :                                         } else {
     869            2 :                                                 // Don't care about the returned error since all the records we
     870            2 :                                                 // relied on this writer for were already successfully written.
     871            2 :                                                 ww.opts.stopper.runAsync(func() {
     872            2 :                                                         _ = w.CloseWithLastQueuedRecord(record.PendingSyncIndex{Index: record.NoSyncIndex})
     873            2 :                                                 })
     874              :                                         }
     875            2 :                                 } else if cErr != nil {
     876            1 :                                         // Have processed it, so extend closeCalledCount.
     877            1 :                                         closeCalledCount = i + 1
     878            1 :                                         if isLastWriter {
     879            1 :                                                 lastWriter.closed = true
     880            1 :                                                 lastWriter.err = cErr
     881            1 :                                                 lastWriter.metrics = record.LogWriterMetrics{}
     882            1 :                                         }
     883              :                                         // Else, ignore.
     884            2 :                                 } else {
     885            2 :                                         if !isLastWriter {
     886            2 :                                                 // Not last writer, so will close itself.
     887            2 :                                                 closeCalledCount = i + 1
     888            2 :                                         }
     889              :                                         // Else, last writer, so we may have to close it.
     890              :                                 }
     891              :                         }
     892              :                 }
     893            2 :                 if !lastWriter.closed {
     894            2 :                         // Either waiting for creation of last writer or waiting for the close
     895            2 :                         // to finish, or something else to become the last writer.
     896            2 :                         //
     897            2 :                         // It is possible that what we think of as the last writer (lastWriter)
     898            2 :                         // closes itself while ww.mu is no longer held here, and a new LogWriter
     899            2 :                         // is created too. All the records are synced, but the real last writer
     900            2 :                         // may still be writing some records. Specifically, consider the
     901            2 :                         // following sequence while this wait does not hold the mutex:
     902            2 :                         //
     903            2 :                         // - recordQueue has an entry, with index 50, that does not require a
     904            2 :                         //   sync.
     905            2 :                         // - Last writer created at index 10 and entry 50 is handed to it.
     906            2 :                         // - lastWriter.index is still 9 and it closes itself and signals this
     907            2 :                         //   cond. It has written entry 50 and synced (since close syncs).
     908            2 :                         // - The wait completes.
     909            2 :                         //
     910            2 :                         // Now the writer at index 10 will never be closed and will never sync.
     911            2 :                         // A crash can cause some part of what it writes to be lost. Note that
     912            2 :                         // there is no data loss, but there are some unfortunate consequences:
     913            2 :                         //
     914            2 :                         // - We never closed a file descriptor.
     915            2 :                         // - virtualWALReader.NextRecord can return an error on finding a
     916            2 :                         //   malformed chunk in the last writer (at index 10) instead of
     917            2 :                         //   swallowing the error. This can cause DB.Open to fail.
     918            2 :                         //
     919            2 :                         // To avoid this, we grab the latest value of numWriters on reacquiring
     920            2 :                         // the mutex, and will continue looping until the writer at index 10 is
     921            2 :                         // closed (or writer at index 11 is created).
     922            2 :                         ww.mu.cond.Wait()
     923            2 :                         numWriters = ww.mu.nextWriterIndex
     924            2 :                 }
     925              :         }
     926            2 :         if ww.mu.writers[lastWriter.index].w != nil {
     927            2 :                 // This permits log recycling.
     928            2 :                 ww.mu.writers[lastWriter.index].synchronouslyClosed = true
     929            2 :         }
     930            2 :         err = lastWriter.err
     931            2 :         ww.mu.metrics = lastWriter.metrics
     932            2 :         ww.mu.closed = true
     933            2 :         n, m := ww.q.popAll(err)
     934            2 :         if err == nil && (n > 0 || m > 0) {
     935            0 :                 panic(errors.AssertionFailedf("no error but recordQueue had %d records and %d syncs", n, m))
     936              :         }
     937            2 :         return logicalOffset, err
     938              : }
     939              : 
     940              : // Metrics implements writer.
     941            2 : func (ww *failoverWriter) Metrics() record.LogWriterMetrics {
     942            2 :         ww.mu.Lock()
     943            2 :         defer ww.mu.Unlock()
     944            2 :         return ww.mu.metrics
     945            2 : }
     946              : 
     947              : // getLog can be called at any time, including after Close returns.
     948            2 : func (ww *failoverWriter) getLog() logicalLogWithSizesEtc {
     949            2 :         ww.mu.Lock()
     950            2 :         defer ww.mu.Unlock()
     951            2 :         ll := logicalLogWithSizesEtc{
     952            2 :                 num: ww.opts.wn,
     953            2 :         }
     954            2 :         for i := range ww.mu.writers {
     955            2 :                 if ww.mu.writers[i].w != nil {
     956            2 :                         ll.segments = append(ll.segments, segmentWithSizeEtc{
     957            2 :                                 segment: segment{
     958            2 :                                         logNameIndex: LogNameIndex(i),
     959            2 :                                         dir:          ww.mu.writers[i].dir,
     960            2 :                                 },
     961            2 :                                 approxFileSize:      ww.mu.writers[i].approxFileSize,
     962            2 :                                 synchronouslyClosed: ww.mu.writers[i].synchronouslyClosed,
     963            2 :                         })
     964            2 :                 }
     965              :         }
     966            2 :         return ll
     967              : }
     968              : 
     969              : // latencyAndErrorRecorder records ongoing write and sync operations and errors
     970              : // in those operations. record.LogWriter cannot continue functioning after any
     971              : // error, so all errors are considered permanent.
     972              : //
     973              : // writeStart/writeEnd are used directly when creating a file. After the file
     974              : // is successfully created, setWriter turns latencyAndErrorRecorder into an
     975              : // implementation of writerSyncerCloser that will record for the Write and
     976              : // Sync methods.
     977              : type latencyAndErrorRecorder struct {
     978              :         ts                    timeSource
     979              :         ongoingOperationStart atomic.Int64
     980              :         error                 atomic.Pointer[error]
     981              :         writerSyncerCloser
     982              : }
     983              : 
     984              : type writerSyncerCloser interface {
     985              :         io.Writer
     986              :         io.Closer
     987              :         Sync() error
     988              : }
     989              : 
     990            2 : func (r *latencyAndErrorRecorder) writeStart() {
     991            2 :         r.ongoingOperationStart.Store(r.ts.now().UnixNano())
     992            2 : }
     993              : 
     994            2 : func (r *latencyAndErrorRecorder) writeEnd(err error) {
     995            2 :         if err != nil {
     996            1 :                 ptr := &err
     997            1 :                 r.error.Store(ptr)
     998            1 :         }
     999            2 :         r.ongoingOperationStart.Store(0)
    1000              : }
    1001              : 
    1002            2 : func (r *latencyAndErrorRecorder) setWriter(w writerSyncerCloser) {
    1003            2 :         r.writerSyncerCloser = w
    1004            2 : }
    1005              : 
    1006            2 : func (r *latencyAndErrorRecorder) ongoingLatencyOrError() (time.Duration, error) {
    1007            2 :         startTime := r.ongoingOperationStart.Load()
    1008            2 :         var latency time.Duration
    1009            2 :         if startTime != 0 {
    1010            2 :                 l := r.ts.now().UnixNano() - startTime
    1011            2 :                 if l < 0 {
    1012            0 :                         l = 0
    1013            0 :                 }
    1014            2 :                 latency = time.Duration(l)
    1015              :         }
    1016            2 :         errPtr := r.error.Load()
    1017            2 :         var err error
    1018            2 :         if errPtr != nil {
    1019            1 :                 err = *errPtr
    1020            1 :         }
    1021            2 :         return latency, err
    1022              : }
    1023              : 
    1024              : // Sync implements writerSyncerCloser.
    1025            2 : func (r *latencyAndErrorRecorder) Sync() error {
    1026            2 :         r.writeStart()
    1027            2 :         err := r.writerSyncerCloser.Sync()
    1028            2 :         r.writeEnd(err)
    1029            2 :         return err
    1030            2 : }
    1031              : 
    1032              : // Write implements io.Writer.
    1033            2 : func (r *latencyAndErrorRecorder) Write(p []byte) (n int, err error) {
    1034            2 :         r.writeStart()
    1035            2 :         n, err = r.writerSyncerCloser.Write(p)
    1036            2 :         r.writeEnd(err)
    1037            2 :         return n, err
    1038            2 : }
        

Generated by: LCOV version 2.0-1