LCOV - code coverage report
Current view: top level - pebble/wal - failover_writer.go (source / functions) Hit Total Coverage
Test: 2025-01-06 08:17Z fca2fd50 - tests only.lcov Lines: 557 571 97.5 %
Date: 2025-01-06 08:19:00 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package wal
       6             : 
       7             : import (
       8             :         "io"
       9             :         "sync"
      10             :         "sync/atomic"
      11             :         "time"
      12             : 
      13             :         "github.com/cockroachdb/crlib/crtime"
      14             :         "github.com/cockroachdb/errors"
      15             :         "github.com/cockroachdb/pebble/internal/base"
      16             :         "github.com/cockroachdb/pebble/record"
      17             :         "github.com/cockroachdb/pebble/vfs"
      18             :         "github.com/prometheus/client_golang/prometheus"
      19             : )
      20             : 
      21             : // recordQueueEntry is an entry in recordQueue.
      22             : type recordQueueEntry struct {
      23             :         p          []byte
      24             :         opts       SyncOptions
      25             :         refCount   RefCount
      26             :         writeStart crtime.Mono
      27             : }
      28             : 
      29             : type poppedEntry struct {
      30             :         opts       SyncOptions
      31             :         refCount   RefCount
      32             :         writeStart crtime.Mono
      33             : }
      34             : 
      35             : const initialBufferLen = 8192
      36             : 
      37             : // recordQueue is a variable-size single-producer multiple-consumer queue. It
      38             : // is not lock-free, but most operations only need mu.RLock. It needs a mutex
      39             : // to grow the size, since there is no upper bound on the number of queued
      40             : // records (which are all the records that are not synced, and will need to be
      41             : // written again in case of failover). Additionally, it needs a mutex to
      42             : // atomically grab a snapshot of the queued records and provide them to a new
      43             : // LogWriter that is being switched to.
      44             : type recordQueue struct {
      45             :         // Only held for reading for all pop operations and most push operations.
      46             :         // Held for writing when buffer needs to be grown or when switching to a new
      47             :         // writer.
      48             :         mu sync.RWMutex
      49             : 
      50             :         // queue is [tail, head). tail is the oldest entry and head is the index for
      51             :         // the next entry.
      52             :         //
      53             :         // Consumers: atomically read and write tail in pop. This is not the usual
      54             :         // kind of queue consumer since they already know the index that they are
      55             :         // popping exists, hence don't need to look at head.
      56             :         //
      57             :         // Producer: atomically reads tail in push. Writes to head.
      58             :         //
      59             :         // Based on the above we only need tail to be atomic. However, the producer
      60             :         // also populates entries in buffer, whose values need to be seen by the
      61             :         // consumers when doing a pop, which means they need to synchronize using a
      62             :         // release and acquire memory barrier pair, where the push does the release
      63             :         // and the pop does the acquire. For this reason we make head also atomic
      64             :         // and merge head and tail into a single atomic, so that the store of head
      65             :         // in push and the load of tail in pop accomplishes this release-acquire
      66             :         // pair.
      67             :         //
      68             :         // We initially implemented competition between multiple consumers solely
      69             :         // via atomic read-write of the tail using compare-and-swap (CAS). Since the
      70             :         // atomic read-write to tail in pop releases those buffer entries for reuse
      71             :         // to the producer, the consumer needs to grab the contents of
      72             :         // recordQueueEntry that it needs to do callbacks etc. (specifically the
      73             :         // contents corresponding to poppedEntry), *before* it succeeds with the
      74             :         // atomic read-write. This introduced a false data race in the Golang data
      75             :         // race detector
      76             :         // https://github.com/cockroachdb/pebble/issues/3380#issuecomment-1981188174.
      77             :         // Consider the case where the queue is [10,20), and consumer C1 is trying
      78             :         // to pop [10,12) and consumer C2 is trying to pop [10,14). The following
      79             :         // interleaving can happen:
      80             :         //
      81             :         // [C1] reads head=10, tail=20
      82             :         // [C2] reads head=10, tail=20
      83             :         // [C1] reads buffer contents [10,12) and makes local copy
      84             :         // [C1] CAS to make the queue [12,20)
      85             :         // [C2] reads buffer contents [10,14) and makes local copy, concurrently
      86             :         //      with producer writing to 10, 11. *
      87             :         // [C2] CAS fails for popping [10,14), so revises to [12,14), and succeeds
      88             :         //      in CAS. C2 only uses the contents it read into the local copy for
      89             :         //      [12,14).
      90             :         //
      91             :         // * is a false data race since C2 is later deciding what contents it should
      92             :         // use from among the contents it read, based on what indices it
      93             :         // successfully popped. Unfortunately, we don't have a way to annotate the
      94             :         // code to tell the data race detector to ignore this false positive. So we
      95             :         // need to strengthen the synchronization to prevent such false positives.
      96             :         // We observe that usually a consumer will be popping a batch of entries
      97             :         // (based on a single successful fsync), and the number of consumers will be
      98             :         // small (usually 1). In comparison, producers can be highly concurrent (due
      99             :         // to workload concurrency). We don't want consumers to compete for a mutex
     100             :         // with producers, but we can afford to have multiple consumers compete for
     101             :         // a mutex. So we fix this false data race by using consumerMu to force
     102             :         // single-threaded popping.
     103             :         //
     104             :         // An alternative would be to pass the information contained in poppedEntry
     105             :         // to the LogWriter, so that it can pass it back when popping (so we don't
     106             :         // have to retrieve it from the recordQueue.buffer). We would still need
     107             :         // recordQueue.buffer, since writer switching needs those entries to be
     108             :         // replayed. We don't consider this solution for the same reason we replaced
     109             :         // record.pendingSyncsWithSyncQueue with
     110             :         // record.pendingSyncsWithHighestSyncIndex for the failoverWriter code path
     111             :         // -- we cannot bound the queue in the LogWriter by record.SyncConcurrency:
     112             :         // say SyncConcurrency was 4096, and LogWriter1's queue was full, and we
     113             :         // switched to LogWriter2, to which we replayed the same records and filled
     114             :         // up the queue. Then if LogWriter1 unblocks and pops all the 4096 entries,
     115             :         // the commit pipeline can send another 4096 entries, while LogWriter2 is
     116             :         // still blocked on trying to write and sync the previous 4096 entries. This
     117             :         // will overflow the queue in LogWriter2.
     118             :         //
     119             :         // All updates to headTail hold mu at least for reading. So when mu is held
     120             :         // for writing, there is a guarantee that headTail is not being updated.
     121             :         //
     122             :         // head is most-significant 32 bits and tail is least-significant 32 bits.
     123             :         headTail atomic.Uint64
     124             : 
     125             :         consumerMu sync.Mutex
     126             : 
     127             :         // Access to buffer requires at least RLock.
     128             :         buffer []recordQueueEntry
     129             : 
     130             :         lastTailObservedByProducer uint32
     131             : 
     132             :         // Read requires RLock.
     133             :         writer *record.LogWriter
     134             : 
     135             :         // When writer != nil, this is the return value of the last call to
     136             :         // SyncRecordGeneralized. It is updated in (a) WriteRecord calls push, using
     137             :         // only RLock (since WriteRecord is externally synchronized), (b)
     138             :         // snapshotAndSwitchWriter, using Lock. (b) excludes (a).
     139             :         lastLogSize int64
     140             : 
     141             :         failoverWriteAndSyncLatency prometheus.Histogram
     142             : }
     143             : 
     144           1 : func (q *recordQueue) init(failoverWriteAndSyncLatency prometheus.Histogram) {
     145           1 :         *q = recordQueue{
     146           1 :                 buffer:                      make([]recordQueueEntry, initialBufferLen),
     147           1 :                 failoverWriteAndSyncLatency: failoverWriteAndSyncLatency,
     148           1 :         }
     149           1 : }
     150             : 
     151             : // NB: externally synchronized, i.e., no concurrent push calls.
     152             : func (q *recordQueue) push(
     153             :         p []byte,
     154             :         opts SyncOptions,
     155             :         refCount RefCount,
     156             :         writeStart crtime.Mono,
     157             :         latestLogSizeInWriteRecord int64,
     158             :         latestWriterInWriteRecord *record.LogWriter,
     159           1 : ) (index uint32, writer *record.LogWriter, lastLogSize int64) {
     160           1 :         ht := q.headTail.Load()
     161           1 :         h, t := unpackHeadTail(ht)
     162           1 :         n := int(h - t)
     163           1 :         m := len(q.buffer)
     164           1 :         if m == n {
     165           1 :                 // Full
     166           1 :                 m = 2 * n
     167           1 :                 newBuffer := make([]recordQueueEntry, m)
     168           1 :                 for i := int(t); i < int(h); i++ {
     169           1 :                         newBuffer[i%m] = q.buffer[i%n]
     170           1 :                 }
     171           1 :                 q.mu.Lock()
     172           1 :                 q.buffer = newBuffer
     173           1 :                 q.mu.Unlock()
     174             :         }
     175           1 :         q.mu.RLock()
     176           1 :         q.buffer[int(h)%m] = recordQueueEntry{
     177           1 :                 p:          p,
     178           1 :                 opts:       opts,
     179           1 :                 refCount:   refCount,
     180           1 :                 writeStart: writeStart,
     181           1 :         }
     182           1 :         // Reclaim memory for consumed entries. We couldn't do that in pop since
     183           1 :         // multiple consumers are popping using CAS and that immediately transfers
     184           1 :         // ownership to the producer.
     185           1 :         for i := q.lastTailObservedByProducer; i < t; i++ {
     186           1 :                 q.buffer[int(i)%m] = recordQueueEntry{}
     187           1 :         }
     188           1 :         q.lastTailObservedByProducer = t
     189           1 :         q.headTail.Add(1 << headTailBits)
     190           1 :         writer = q.writer
     191           1 :         if writer == latestWriterInWriteRecord {
     192           1 :                 // WriteRecord has written to this writer since the switch.
     193           1 :                 q.lastLogSize = latestLogSizeInWriteRecord
     194           1 :         }
     195             :         // Else writer is a new writer that was switched to, so ignore the
     196             :         // latestLogSizeInWriteRecord.
     197             : 
     198           1 :         lastLogSize = q.lastLogSize
     199           1 :         q.mu.RUnlock()
     200           1 :         return h, writer, lastLogSize
     201             : }
     202             : 
     203           1 : func (q *recordQueue) length() int {
     204           1 :         ht := q.headTail.Load()
     205           1 :         h, t := unpackHeadTail(ht)
     206           1 :         return int(h - t)
     207           1 : }
     208             : 
     209             : // Pops all entries. Must be called only after the last push returns.
     210           1 : func (q *recordQueue) popAll(err error) (numRecords int, numSyncsPopped int) {
     211           1 :         ht := q.headTail.Load()
     212           1 :         h, t := unpackHeadTail(ht)
     213           1 :         n := int(h - t)
     214           1 :         if n == 0 {
     215           1 :                 return 0, 0
     216           1 :         }
     217           1 :         return n, q.pop(h-1, err)
     218             : }
     219             : 
     220             : // Pops all entries up to and including index. The remaining queue is
     221             : // [index+1, head).
     222             : //
     223             : // NB: we could slightly simplify to only have the latest writer be able to
     224             : // pop. This would avoid the CAS below, but it seems better to reduce the
     225             : // amount of queued work regardless of who has successfully written it.
     226           1 : func (q *recordQueue) pop(index uint32, err error) (numSyncsPopped int) {
     227           1 :         now := crtime.NowMono()
     228           1 :         var buf [512]poppedEntry
     229           1 :         tailEntriesToPop := func() (t uint32, numEntriesToPop int) {
     230           1 :                 ht := q.headTail.Load()
     231           1 :                 _, t = unpackHeadTail(ht)
     232           1 :                 tail := int(t)
     233           1 :                 numEntriesToPop = int(index) - tail + 1
     234           1 :                 return t, numEntriesToPop
     235           1 :         }
     236           1 :         q.consumerMu.Lock()
     237           1 :         // numEntriesToPop is a function of index and tail. The value of tail cannot
     238           1 :         // change since consumerMu is held.
     239           1 :         tail, numEntriesToPop := tailEntriesToPop()
     240           1 :         if numEntriesToPop <= 0 {
     241           1 :                 q.consumerMu.Unlock()
     242           1 :                 return 0
     243           1 :         }
     244           1 :         var b []poppedEntry
     245           1 :         if numEntriesToPop <= len(buf) {
     246           1 :                 b = buf[:numEntriesToPop]
     247           1 :         } else {
     248           1 :                 // Do allocation before acquiring the mutex.
     249           1 :                 b = make([]poppedEntry, numEntriesToPop)
     250           1 :         }
     251           1 :         q.mu.RLock()
     252           1 :         n := len(q.buffer)
     253           1 :         for i := 0; i < numEntriesToPop; i++ {
     254           1 :                 // Grab the popped entries before incrementing tail, since that will
     255           1 :                 // release those buffer slots to the producer.
     256           1 :                 idx := (i + int(tail)) % n
     257           1 :                 b[i] = poppedEntry{
     258           1 :                         opts:       q.buffer[idx].opts,
     259           1 :                         refCount:   q.buffer[idx].refCount,
     260           1 :                         writeStart: q.buffer[idx].writeStart,
     261           1 :                 }
     262           1 :         }
     263             :         // Since tail cannot change, we don't need to do a compare-and-swap.
     264           1 :         q.headTail.Add(uint64(numEntriesToPop))
     265           1 :         q.mu.RUnlock()
     266           1 :         q.consumerMu.Unlock()
     267           1 :         addLatencySample := false
     268           1 :         var maxLatency time.Duration
     269           1 :         for i := 0; i < numEntriesToPop; i++ {
     270           1 :                 // Now that we've synced the entry, we can unref it to signal that we
     271           1 :                 // will not read the written byte slice again.
     272           1 :                 if b[i].refCount != nil {
     273           1 :                         b[i].refCount.Unref()
     274           1 :                 }
     275           1 :                 if b[i].opts.Done != nil {
     276           1 :                         numSyncsPopped++
     277           1 :                         if err != nil {
     278           1 :                                 *b[i].opts.Err = err
     279           1 :                         }
     280           1 :                         b[i].opts.Done.Done()
     281           1 :                         latency := now.Sub(b[i].writeStart)
     282           1 :                         if !addLatencySample {
     283           1 :                                 addLatencySample = true
     284           1 :                                 maxLatency = latency
     285           1 :                         } else if maxLatency < latency {
     286           0 :                                 maxLatency = latency
     287           0 :                         }
     288             :                 }
     289             :         }
     290           1 :         if addLatencySample {
     291           1 :                 if maxLatency < 0 {
     292           0 :                         maxLatency = 0
     293           0 :                 }
     294           1 :                 q.failoverWriteAndSyncLatency.Observe(float64(maxLatency))
     295             :         }
     296           1 :         return numSyncsPopped
     297             : }
     298             : 
     299             : func (q *recordQueue) snapshotAndSwitchWriter(
     300             :         writer *record.LogWriter,
     301             :         snapshotFunc func(firstIndex uint32, entries []recordQueueEntry) (logSize int64),
     302           1 : ) {
     303           1 :         q.mu.Lock()
     304           1 :         defer q.mu.Unlock()
     305           1 :         q.writer = writer
     306           1 :         h, t := unpackHeadTail(q.headTail.Load())
     307           1 :         n := h - t
     308           1 :         if n > 0 {
     309           1 :                 m := uint32(len(q.buffer))
     310           1 :                 b := make([]recordQueueEntry, n)
     311           1 :                 for i := t; i < h; i++ {
     312           1 :                         b[i-t] = q.buffer[i%m]
     313           1 :                 }
     314           1 :                 q.lastLogSize = snapshotFunc(t, b)
     315             :         }
     316             : }
     317             : 
     318             : // getLastIndex is used by failoverWriter.Close.
     319           1 : func (q *recordQueue) getLastIndex() (lastIndex int64) {
     320           1 :         h, _ := unpackHeadTail(q.headTail.Load())
     321           1 :         return int64(h) - 1
     322           1 : }
     323             : 
     324             : const headTailBits = 32
     325             : 
     326           1 : func unpackHeadTail(ht uint64) (head, tail uint32) {
     327           1 :         const mask = 1<<headTailBits - 1
     328           1 :         head = uint32((ht >> headTailBits) & mask)
     329           1 :         tail = uint32(ht & mask)
     330           1 :         return head, tail
     331           1 : }
     332             : 
     333             : // Maximum number of physical log files when writing a virtual WAL. Arbitrarily
     334             : // chosen value. Setting this to 2 will not simplify the code. We make this a
     335             : // constant since we want a fixed size array for writer.writers.
     336             : const maxPhysicalLogs = 10
     337             : 
     338             : // failoverWriter is the implementation of Writer in failover mode. No Writer
     339             : // method blocks for IO, except for Close.
     340             : //
     341             : // Loosely speaking, Close blocks until all records are successfully written
     342             : // and synced to some log writer. Monitoring of log writer latency and errors
     343             : // continues after Close is called, which means failoverWriter can be switched
     344             : // to a new log writer after Close is called, to unblock Close.
     345             : //
     346             : // More precisely, Close does not block if there is an error in creating or
     347             : // closing the latest LogWriter when close was called. This is because errors
     348             : // are considered indicative of misconfiguration, and the user of
     349             : // failoverWriter can dampen switching when observing errors (e.g. see
     350             : // failoverMonitor), so close does not assume any liveness of calls to
     351             : // switchToNewDir when such errors occur. Since the caller (see db.go) treats
     352             : // an error on Writer.Close as fatal, this does mean that failoverWriter has
     353             : // limited ability to mask errors (its primary task is to mask high latency).
     354             : type failoverWriter struct {
     355             :         opts failoverWriterOpts
     356             :         q    recordQueue
     357             :         mu   struct {
     358             :                 sync.Mutex
     359             :                 // writers is protected by mu, except for updates to the
     360             :                 // latencyAndErrorRecorder field. WriteRecord does not acquire mu, so the
     361             :                 // protection by mu is for handling concurrent calls to switchToNewDir,
     362             :                 // Close, and getLog.
     363             :                 writers [maxPhysicalLogs]logWriterAndRecorder
     364             : 
     365             :                 // cond is signaled when the latest LogWriter is set in writers (or there
     366             :                 // is a creation error), or when the latest LogWriter is successfully
     367             :                 // closed. It is waited on in Close. We don't use channels and select
     368             :                 // since what Close is waiting on is dynamic based on the local state in
     369             :                 // Close, so using Cond is simpler.
     370             :                 cond *sync.Cond
     371             :                 // nextWriterIndex is advanced before creating the *LogWriter. That is, a
     372             :                 // slot is reserved by taking the current value of nextWriterIndex and
     373             :                 // incrementing it, and then the *LogWriter for that slot is created. When
     374             :                 // newFailoverWriter returns, nextWriterIndex = 1.
     375             :                 //
     376             :                 // The latest *LogWriter is (will be) at nextWriterIndex-1.
     377             :                 //
     378             :                 // INVARIANT: nextWriterIndex <= len(writers)
     379             :                 nextWriterIndex LogNameIndex
     380             :                 closed          bool
     381             :                 // metrics is initialized in Close. Currently we just use the metrics from
     382             :                 // the latest writer after it is closed, since in the common case with
     383             :                 // only one writer, that writer's flush loop will have finished and the
     384             :                 // metrics will be current. With multiple writers, these metrics can be
     385             :                 // quite inaccurate. The WriteThroughput metric includes an IdleDuration,
     386             :                 // which can be high for a writer that was switched away from, and
     387             :                 // therefore not indicative of overall work being done by the
     388             :                 // failoverWriter. The PendingBufferLen and SyncQueueLen are similarly
     389             :                 // inaccurate once there is no more work being given to a writer. We could
     390             :                 // add a method to LogWriter to stop sampling metrics when it is not the
     391             :                 // latest writer. Then we could aggregate all these metrics across all
     392             :                 // writers.
     393             :                 //
     394             :                 // Note that CockroachDB does not use these metrics in any meaningful way.
     395             :                 //
     396             :                 // TODO(sumeer): do the improved solution outlined above.
     397             :                 metrics record.LogWriterMetrics
     398             :         }
     399             :         // State for computing logical offset. The cumulative offset state is in
     400             :         // offset. Each time we call SyncRecordGeneralized from WriteRecord, we
     401             :         // compute the delta from the size returned by this LogWriter now, and the
     402             :         // size returned by this LogWriter in the previous call to
     403             :         // SyncRecordGeneralized. That previous call to SyncRecordGeneralized may
     404             :         // have happened from WriteRecord, or asynchronously during a switch. So
     405             :         // that previous call state requires synchronization and is maintained in
     406             :         // recordQueue. The offset is incremented by this delta without any
     407             :         // synchronization, since we rely on external synchronization (like the
     408             :         // standaloneWriter).
     409             :         logicalOffset struct {
     410             :                 latestWriterInWriteRecord  *record.LogWriter
     411             :                 latestLogSizeInWriteRecord int64
     412             :                 offset                     int64
     413             :                 // Transitions once from false => true when there is a non-nil writer.
     414             :                 notEstimatedOffset bool
     415             :         }
     416             :         psiForWriteRecordBacking record.PendingSyncIndex
     417             :         psiForSwitchBacking      record.PendingSyncIndex
     418             : }
     419             : 
     420             : type logWriterAndRecorder struct {
     421             :         // This may never become non-nil, if when the LogWriter was finally created,
     422             :         // it was no longer the latest writer. Additionally, if there was an error
     423             :         // in creating the writer, w will remain nil and createError will be set.
     424             :         w *record.LogWriter
     425             :         // createError is set if there is an error creating the writer. This is
     426             :         // useful in Close since we need to know when the work for creating the
     427             :         // latest writer is done, whether it resulted in success or not.
     428             :         createError error
     429             :         r           latencyAndErrorRecorder
     430             : 
     431             :         // dir, approxFileSize, synchronouslyClosed are kept for initializing
     432             :         // segmentWithSizeEtc. The approxFileSize is initially set to whatever is
     433             :         // returned by logCreator. When failoverWriter.Close is called,
     434             :         // approxFileSize and synchronouslyClosed may be updated.
     435             :         dir                 Dir
     436             :         approxFileSize      uint64
     437             :         synchronouslyClosed bool
     438             : }
     439             : 
     440             : var _ Writer = &failoverWriter{}
     441             : 
     442             : var _ switchableWriter = &failoverWriter{}
     443             : 
     444             : type failoverWriterOpts struct {
     445             :         wn     NumWAL
     446             :         logger base.Logger
     447             :         timeSource
     448             :         jobID int
     449             :         logCreator
     450             : 
     451             :         // Options that feed into SyncingFileOptions.
     452             :         noSyncOnClose   bool
     453             :         bytesPerSync    int
     454             :         preallocateSize func() int
     455             : 
     456             :         // Options for record.LogWriter.
     457             :         minSyncInterval func() time.Duration
     458             :         fsyncLatency    prometheus.Histogram
     459             :         queueSemChan    chan struct{}
     460             :         stopper         *stopper
     461             : 
     462             :         failoverWriteAndSyncLatency prometheus.Histogram
     463             :         writerClosed                func(logicalLogWithSizesEtc)
     464             : 
     465             :         writerCreatedForTest chan<- struct{}
     466             : }
     467             : 
     468             : func simpleLogCreator(
     469             :         dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
     470           1 : ) (f vfs.File, initialFileSize uint64, err error) {
     471           1 :         filename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li))
     472           1 :         // Create file.
     473           1 :         r.writeStart()
     474           1 :         f, err = dir.FS.Create(filename, "pebble-wal")
     475           1 :         r.writeEnd(err)
     476           1 :         return f, 0, err
     477           1 : }
     478             : 
     479             : type logCreator func(
     480             :         dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
     481             : ) (f vfs.File, initialFileSize uint64, err error)
     482             : 
     483             : func newFailoverWriter(
     484             :         opts failoverWriterOpts, initialDir dirAndFileHandle,
     485           1 : ) (*failoverWriter, error) {
     486           1 :         ww := &failoverWriter{
     487           1 :                 opts: opts,
     488           1 :         }
     489           1 :         ww.q.init(opts.failoverWriteAndSyncLatency)
     490           1 :         ww.mu.cond = sync.NewCond(&ww.mu)
     491           1 :         // The initial record.LogWriter creation also happens via a
     492           1 :         // switchToNewWriter since we don't want it to block newFailoverWriter.
     493           1 :         err := ww.switchToNewDir(initialDir)
     494           1 :         if err != nil {
     495           0 :                 // Switching limit cannot be exceeded when creating.
     496           0 :                 panic(err)
     497             :         }
     498           1 :         return ww, nil
     499             : }
     500             : 
     501             : // WriteRecord implements Writer.
     502             : func (ww *failoverWriter) WriteRecord(
     503             :         p []byte, opts SyncOptions, ref RefCount,
     504           1 : ) (logicalOffset int64, err error) {
     505           1 :         if ref != nil {
     506           1 :                 ref.Ref()
     507           1 :         }
     508           1 :         var writeStart crtime.Mono
     509           1 :         if opts.Done != nil {
     510           1 :                 writeStart = crtime.NowMono()
     511           1 :         }
     512           1 :         recordIndex, writer, lastLogSize := ww.q.push(
     513           1 :                 p,
     514           1 :                 opts,
     515           1 :                 ref,
     516           1 :                 writeStart,
     517           1 :                 ww.logicalOffset.latestLogSizeInWriteRecord,
     518           1 :                 ww.logicalOffset.latestWriterInWriteRecord,
     519           1 :         )
     520           1 :         if writer == nil {
     521           1 :                 // Don't have a record.LogWriter yet, so use an estimate. This estimate
     522           1 :                 // will get overwritten.
     523           1 :                 ww.logicalOffset.offset += int64(len(p))
     524           1 :                 return ww.logicalOffset.offset, nil
     525           1 :         }
     526             :         // INVARIANT: writer != nil.
     527           1 :         notEstimatedOffset := ww.logicalOffset.notEstimatedOffset
     528           1 :         if !notEstimatedOffset {
     529           1 :                 ww.logicalOffset.notEstimatedOffset = true
     530           1 :         }
     531           1 :         ww.psiForWriteRecordBacking = record.PendingSyncIndex{Index: record.NoSyncIndex}
     532           1 :         if opts.Done != nil {
     533           1 :                 ww.psiForWriteRecordBacking.Index = int64(recordIndex)
     534           1 :         }
     535           1 :         ww.logicalOffset.latestLogSizeInWriteRecord, err = writer.SyncRecordGeneralized(p, &ww.psiForWriteRecordBacking)
     536           1 :         ww.logicalOffset.latestWriterInWriteRecord = writer
     537           1 :         if notEstimatedOffset {
     538           1 :                 delta := ww.logicalOffset.latestLogSizeInWriteRecord - lastLogSize
     539           1 :                 ww.logicalOffset.offset += delta
     540           1 :         } else {
     541           1 :                 // Overwrite the estimate. This is a best-effort improvement in that it is
     542           1 :                 // accurate for the common case where writer is the first LogWriter.
     543           1 :                 // Consider a failover scenario where there was no LogWriter for the first
     544           1 :                 // 10 records, so they are all accumulated as an estimate. Then the first
     545           1 :                 // LogWriter successfully writes and syncs the first 5 records and gets
     546           1 :                 // stuck. A switch happens to a second LogWriter that is handed the
     547           1 :                 // remaining 5 records, and the 11th record arrives via a WriteRecord.
     548           1 :                 // The transition from !notEstimatedOffset to notEstimatedOffset will
     549           1 :                 // happen on this 11th record, and the logic here will use the length of
     550           1 :                 // the second LogWriter, that does not reflect the full length.
     551           1 :                 //
     552           1 :                 // TODO(sumeer): try to make this more correct, without adding much more
     553           1 :                 // complexity, and without adding synchronization.
     554           1 :                 ww.logicalOffset.offset = ww.logicalOffset.latestLogSizeInWriteRecord
     555           1 :         }
     556           1 :         return ww.logicalOffset.offset, err
     557             : }
     558             : 
     559             : // switchToNewDir starts switching to dir. It implements switchableWriter. All
     560             : // work is async, and a non-nil error is returned only if the switching limit
     561             : // is exceeded.
     562           1 : func (ww *failoverWriter) switchToNewDir(dir dirAndFileHandle) error {
     563           1 :         ww.mu.Lock()
     564           1 :         // Can have a late switchToNewDir call is the failoverMonitor has not yet
     565           1 :         // been told that the writer is closed. Ignore.
     566           1 :         if ww.mu.closed {
     567           1 :                 ww.mu.Unlock()
     568           1 :                 if ww.opts.writerCreatedForTest != nil {
     569           1 :                         ww.opts.writerCreatedForTest <- struct{}{}
     570           1 :                 }
     571           1 :                 return nil
     572             :         }
     573             :         // writerIndex is the slot for this writer.
     574           1 :         writerIndex := ww.mu.nextWriterIndex
     575           1 :         if int(writerIndex) == len(ww.mu.writers) {
     576           1 :                 ww.mu.Unlock()
     577           1 :                 return errors.Errorf("exceeded switching limit")
     578           1 :         }
     579           1 :         ww.mu.writers[writerIndex].dir = dir.Dir
     580           1 :         ww.mu.nextWriterIndex++
     581           1 :         ww.mu.Unlock()
     582           1 : 
     583           1 :         // Creation is async.
     584           1 :         ww.opts.stopper.runAsync(func() {
     585           1 :                 recorderAndWriter := &ww.mu.writers[writerIndex].r
     586           1 :                 recorderAndWriter.ts = ww.opts.timeSource
     587           1 :                 file, initialFileSize, err := ww.opts.logCreator(
     588           1 :                         dir.Dir, ww.opts.wn, writerIndex, recorderAndWriter, ww.opts.jobID)
     589           1 :                 ww.mu.writers[writerIndex].approxFileSize = initialFileSize
     590           1 :                 // handleErrFunc is called when err != nil. It handles the multiple IO error
     591           1 :                 // cases below.
     592           1 :                 handleErrFunc := func(err error) {
     593           1 :                         if file != nil {
     594           1 :                                 file.Close()
     595           1 :                         }
     596           1 :                         ww.mu.Lock()
     597           1 :                         defer ww.mu.Unlock()
     598           1 :                         ww.mu.writers[writerIndex].createError = err
     599           1 :                         ww.mu.cond.Signal()
     600           1 :                         if ww.opts.writerCreatedForTest != nil {
     601           1 :                                 ww.opts.writerCreatedForTest <- struct{}{}
     602           1 :                         }
     603             :                 }
     604           1 :                 if err != nil {
     605           1 :                         handleErrFunc(err)
     606           1 :                         return
     607           1 :                 }
     608             :                 // Sync dir.
     609           1 :                 recorderAndWriter.writeStart()
     610           1 :                 err = dir.Sync()
     611           1 :                 recorderAndWriter.writeEnd(err)
     612           1 :                 if err != nil {
     613           1 :                         handleErrFunc(err)
     614           1 :                         return
     615           1 :                 }
     616             :                 // Wrap in a syncingFile.
     617           1 :                 syncingFile := vfs.NewSyncingFile(file, vfs.SyncingFileOptions{
     618           1 :                         NoSyncOnClose:   ww.opts.noSyncOnClose,
     619           1 :                         BytesPerSync:    ww.opts.bytesPerSync,
     620           1 :                         PreallocateSize: ww.opts.preallocateSize(),
     621           1 :                 })
     622           1 :                 // Wrap in the latencyAndErrorRecorder.
     623           1 :                 recorderAndWriter.setWriter(syncingFile)
     624           1 : 
     625           1 :                 // Using NumWAL as the DiskFileNum is fine since it is used only as
     626           1 :                 // EOF trailer for safe log recycling. Even though many log files can
     627           1 :                 // map to a single NumWAL, a file used for NumWAL n at index m will
     628           1 :                 // never get recycled for NumWAL n at a later index (since recycling
     629           1 :                 // happens when n as a whole is obsolete).
     630           1 :                 w := record.NewLogWriter(recorderAndWriter, base.DiskFileNum(ww.opts.wn),
     631           1 :                         record.LogWriterConfig{
     632           1 :                                 WALMinSyncInterval:        ww.opts.minSyncInterval,
     633           1 :                                 WALFsyncLatency:           ww.opts.fsyncLatency,
     634           1 :                                 QueueSemChan:              ww.opts.queueSemChan,
     635           1 :                                 ExternalSyncQueueCallback: ww.doneSyncCallback,
     636           1 :                         })
     637           1 :                 closeWriter := func() bool {
     638           1 :                         ww.mu.Lock()
     639           1 :                         defer ww.mu.Unlock()
     640           1 :                         if writerIndex+1 != ww.mu.nextWriterIndex || ww.mu.closed {
     641           1 :                                 // Not the latest writer or the writer was closed while this async
     642           1 :                                 // creation was ongoing.
     643           1 :                                 if ww.opts.writerCreatedForTest != nil {
     644           1 :                                         ww.opts.writerCreatedForTest <- struct{}{}
     645           1 :                                 }
     646           1 :                                 return true
     647             :                         }
     648             :                         // Latest writer.
     649           1 :                         ww.mu.writers[writerIndex].w = w
     650           1 :                         ww.mu.cond.Signal()
     651           1 :                         // NB: snapshotAndSwitchWriter does not block on IO, since
     652           1 :                         // SyncRecordGeneralized does no IO.
     653           1 :                         ww.q.snapshotAndSwitchWriter(w,
     654           1 :                                 func(firstIndex uint32, entries []recordQueueEntry) (logSize int64) {
     655           1 :                                         for i := range entries {
     656           1 :                                                 ww.psiForSwitchBacking = record.PendingSyncIndex{Index: record.NoSyncIndex}
     657           1 :                                                 if entries[i].opts.Done != nil {
     658           1 :                                                         ww.psiForSwitchBacking.Index = int64(firstIndex) + int64(i)
     659           1 :                                                 }
     660           1 :                                                 var err error
     661           1 :                                                 logSize, err = w.SyncRecordGeneralized(entries[i].p, &ww.psiForSwitchBacking)
     662           1 :                                                 if err != nil {
     663           0 :                                                         // TODO(sumeer): log periodically. The err will also surface via
     664           0 :                                                         // the latencyAndErrorRecorder, so if a switch is possible, it
     665           0 :                                                         // will be done.
     666           0 :                                                         ww.opts.logger.Errorf("%s", err)
     667           0 :                                                 }
     668             :                                         }
     669           1 :                                         return logSize
     670             :                                 })
     671           1 :                         if ww.opts.writerCreatedForTest != nil {
     672           1 :                                 ww.opts.writerCreatedForTest <- struct{}{}
     673           1 :                         }
     674           1 :                         return false
     675             :                 }()
     676           1 :                 if closeWriter {
     677           1 :                         // Never wrote anything to this writer so don't care about the
     678           1 :                         // returned error.
     679           1 :                         ww.opts.stopper.runAsync(func() {
     680           1 :                                 _ = w.Close()
     681           1 :                                 // TODO(sumeer): consider deleting this file too, since
     682           1 :                                 // failoverWriter.Close may not wait for it. This is going to be
     683           1 :                                 // extremely rare, so the risk of garbage empty files piling up is
     684           1 :                                 // extremely low. Say failover happens daily and of those cases we
     685           1 :                                 // have to be very unlucky and the close happens while a failover was
     686           1 :                                 // ongoing and the previous LogWriter successfully wrote everything
     687           1 :                                 // (say 1% probability if we want to be pessimistic). A garbage file
     688           1 :                                 // every 100 days. Restarts will delete that garbage.
     689           1 :                         })
     690             :                 }
     691             :         })
     692           1 :         return nil
     693             : }
     694             : 
     695             : // doneSyncCallback is the record.ExternalSyncQueueCallback called by
     696             : // record.LogWriter.
     697             : //
     698             : // recordQueue is popped from only when some work requests a sync (and
     699             : // successfully syncs). In the worst case, if no syncs are requested, we could
     700             : // queue all the records needed to fill up a memtable in the recordQueue. This
     701             : // can have two negative effects: (a) in the case of failover, we need to
     702             : // replay all the data in the current mutable memtable, which takes more time,
     703             : // (b) the memory usage is proportional to the size of the memtable. We ignore
     704             : // these negatives since, (a) users like CockroachDB regularly sync, and (b)
     705             : // the default memtable size is only 64MB.
     706           1 : func (ww *failoverWriter) doneSyncCallback(doneSync record.PendingSyncIndex, err error) {
     707           1 :         if err != nil {
     708           1 :                 // Don't pop anything since we can retry after switching to a new
     709           1 :                 // LogWriter.
     710           1 :                 return
     711           1 :         }
     712             :         // NB: harmless after Close returns since numSyncsPopped will be 0.
     713           1 :         numSyncsPopped := ww.q.pop(uint32(doneSync.Index), err)
     714           1 :         if ww.opts.queueSemChan != nil {
     715           1 :                 for i := 0; i < numSyncsPopped; i++ {
     716           1 :                         <-ww.opts.queueSemChan
     717           1 :                 }
     718             :         }
     719             : }
     720             : 
     721             : // ongoingLatencyOrErrorForCurDir implements switchableWriter.
     722           1 : func (ww *failoverWriter) ongoingLatencyOrErrorForCurDir() (time.Duration, error) {
     723           1 :         r := ww.recorderForCurDir()
     724           1 :         if r == nil {
     725           1 :                 return 0, nil
     726           1 :         }
     727           1 :         return r.ongoingLatencyOrError()
     728             : }
     729             : 
     730             : // For internal use and testing.
     731           1 : func (ww *failoverWriter) recorderForCurDir() *latencyAndErrorRecorder {
     732           1 :         ww.mu.Lock()
     733           1 :         defer ww.mu.Unlock()
     734           1 :         if ww.mu.closed {
     735           1 :                 return nil
     736           1 :         }
     737           1 :         return &ww.mu.writers[ww.mu.nextWriterIndex-1].r
     738             : }
     739             : 
     740             : // Close implements Writer.
     741             : //
     742             : // NB: getOngoingLatencyOrErrorForLatestWriter and switchToNewDir can be
     743             : // called after Close is called, and there is also a possibility that they get
     744             : // called after Close returns and before failoverMonitor knows that the
     745             : // failoverWriter is closed.
     746             : //
     747             : // doneSyncCallback can be called anytime after Close returns since there
     748             : // could be stuck writes that finish arbitrarily later.
     749             : //
     750             : // See the long comment about Close behavior where failoverWriter is declared.
     751           1 : func (ww *failoverWriter) Close() (logicalOffset int64, err error) {
     752           1 :         offset, err := ww.closeInternal()
     753           1 :         ww.opts.writerClosed(ww.getLog())
     754           1 :         return offset, err
     755           1 : }
     756             : 
     757           1 : func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) {
     758           1 :         logicalOffset = ww.logicalOffset.offset
     759           1 :         // [0, closeCalledCount) have had LogWriter.Close called (though may not
     760           1 :         // have finished) or the LogWriter will never be non-nil. Either way, they
     761           1 :         // have been "processed".
     762           1 :         closeCalledCount := LogNameIndex(0)
     763           1 :         // lastWriterState is the state for the last writer, for which we are
     764           1 :         // waiting for LogWriter.Close to finish or for creation to be unsuccessful.
     765           1 :         // What is considered the last writer can change. All state is protected by
     766           1 :         // ww.mu.
     767           1 :         type lastWriterState struct {
     768           1 :                 index   LogNameIndex
     769           1 :                 closed  bool
     770           1 :                 err     error
     771           1 :                 metrics record.LogWriterMetrics
     772           1 :         }
     773           1 :         var lastWriter lastWriterState
     774           1 :         lastRecordIndex := record.PendingSyncIndex{Index: ww.q.getLastIndex()}
     775           1 :         ww.mu.Lock()
     776           1 :         defer ww.mu.Unlock()
     777           1 :         numWriters := ww.mu.nextWriterIndex
     778           1 :         // Every iteration starts and ends with the mutex held.
     779           1 :         //
     780           1 :         // Invariant: numWriters >= 1.
     781           1 :         //
     782           1 :         // We will loop until we have closed the lastWriter (and use
     783           1 :         // lastWriter.err). We also need to call close on all LogWriters
     784           1 :         // that will not close themselves, i.e., those that have already been
     785           1 :         // created and installed in failoverWriter.writers (this set may change
     786           1 :         // while failoverWriter.Close runs).
     787           1 :         for !lastWriter.closed || numWriters > lastWriter.index+1 {
     788           1 :                 if numWriters > closeCalledCount {
     789           1 :                         // lastWriter.index may or may not have advanced. If it has advanced, we
     790           1 :                         // need to reinitialize lastWriterState. If it hasn't advanced, and
     791           1 :                         // numWriters > closeCalledCount, we know that we haven't called close
     792           1 :                         // on it, so nothing in lastWriterState needs to be retained. For
     793           1 :                         // simplicity, we overwrite in both cases.
     794           1 :                         lastWriter = lastWriterState{
     795           1 :                                 index: numWriters - 1,
     796           1 :                         }
     797           1 :                         // Try to process [closeCalledCount, numWriters). Will surely process
     798           1 :                         // [closeCalledCount, numWriters-1), since those writers are either done
     799           1 :                         // initializing, or will close themselves. The writer at numWriters-1 we
     800           1 :                         // can only process if it is done initializing, else we will iterate
     801           1 :                         // again.
     802           1 :                         for i := closeCalledCount; i < numWriters; i++ {
     803           1 :                                 w := ww.mu.writers[i].w
     804           1 :                                 cErr := ww.mu.writers[i].createError
     805           1 :                                 // Is the current index the last writer. If yes, this is also the last
     806           1 :                                 // loop iteration.
     807           1 :                                 isLastWriter := i == lastWriter.index
     808           1 :                                 if w != nil {
     809           1 :                                         // Can close it, so extend closeCalledCount.
     810           1 :                                         closeCalledCount = i + 1
     811           1 :                                         size := uint64(w.Size())
     812           1 :                                         if ww.mu.writers[i].approxFileSize < size {
     813           1 :                                                 ww.mu.writers[i].approxFileSize = size
     814           1 :                                         }
     815           1 :                                         if isLastWriter {
     816           1 :                                                 // We may care about its error and when it finishes closing.
     817           1 :                                                 index := i
     818           1 :                                                 ww.opts.stopper.runAsync(func() {
     819           1 :                                                         // Last writer(s) (since new writers can be created and become
     820           1 :                                                         // last, as we iterate) are guaranteed to have seen the last
     821           1 :                                                         // record (since it was queued before Close was called). It is
     822           1 :                                                         // possible that a writer got created after the last record was
     823           1 :                                                         // dequeued and before this fact was realized by Close. In that
     824           1 :                                                         // case we will harmlessly tell it that it synced that last
     825           1 :                                                         // record, though it has already been written and synced by
     826           1 :                                                         // another writer.
     827           1 :                                                         err := w.CloseWithLastQueuedRecord(lastRecordIndex)
     828           1 :                                                         ww.mu.Lock()
     829           1 :                                                         defer ww.mu.Unlock()
     830           1 :                                                         if lastWriter.index == index {
     831           1 :                                                                 lastWriter.closed = true
     832           1 :                                                                 lastWriter.err = err
     833           1 :                                                                 lastWriter.metrics = w.Metrics()
     834           1 :                                                                 ww.mu.cond.Signal()
     835           1 :                                                         }
     836             :                                                 })
     837           1 :                                         } else {
     838           1 :                                                 // Don't care about the returned error since all the records we
     839           1 :                                                 // relied on this writer for were already successfully written.
     840           1 :                                                 ww.opts.stopper.runAsync(func() {
     841           1 :                                                         _ = w.CloseWithLastQueuedRecord(record.PendingSyncIndex{Index: record.NoSyncIndex})
     842           1 :                                                 })
     843             :                                         }
     844           1 :                                 } else if cErr != nil {
     845           1 :                                         // Have processed it, so extend closeCalledCount.
     846           1 :                                         closeCalledCount = i + 1
     847           1 :                                         if isLastWriter {
     848           1 :                                                 lastWriter.closed = true
     849           1 :                                                 lastWriter.err = cErr
     850           1 :                                                 lastWriter.metrics = record.LogWriterMetrics{}
     851           1 :                                         }
     852             :                                         // Else, ignore.
     853           1 :                                 } else {
     854           1 :                                         if !isLastWriter {
     855           1 :                                                 // Not last writer, so will close itself.
     856           1 :                                                 closeCalledCount = i + 1
     857           1 :                                         }
     858             :                                         // Else, last writer, so we may have to close it.
     859             :                                 }
     860             :                         }
     861             :                 }
     862           1 :                 if !lastWriter.closed {
     863           1 :                         // Either waiting for creation of last writer or waiting for the close
     864           1 :                         // to finish, or something else to become the last writer.
     865           1 :                         //
     866           1 :                         // It is possible that what we think of as the last writer (lastWriter)
     867           1 :                         // closes itself while ww.mu is no longer held here, and a new LogWriter
     868           1 :                         // is created too. All the records are synced, but the real last writer
     869           1 :                         // may still be writing some records. Specifically, consider the
     870           1 :                         // following sequence while this wait does not hold the mutex:
     871           1 :                         //
     872           1 :                         // - recordQueue has an entry, with index 50, that does not require a
     873           1 :                         //   sync.
     874           1 :                         // - Last writer created at index 10 and entry 50 is handed to it.
     875           1 :                         // - lastWriter.index is still 9 and it closes itself and signals this
     876           1 :                         //   cond. It has written entry 50 and synced (since close syncs).
     877           1 :                         // - The wait completes.
     878           1 :                         //
     879           1 :                         // Now the writer at index 10 will never be closed and will never sync.
     880           1 :                         // A crash can cause some part of what it writes to be lost. Note that
     881           1 :                         // there is no data loss, but there are some unfortunate consequences:
     882           1 :                         //
     883           1 :                         // - We never closed a file descriptor.
     884           1 :                         // - virtualWALReader.NextRecord can return an error on finding a
     885           1 :                         //   malformed chunk in the last writer (at index 10) instead of
     886           1 :                         //   swallowing the error. This can cause DB.Open to fail.
     887           1 :                         //
     888           1 :                         // To avoid this, we grab the latest value of numWriters on reacquiring
     889           1 :                         // the mutex, and will continue looping until the writer at index 10 is
     890           1 :                         // closed (or writer at index 11 is created).
     891           1 :                         ww.mu.cond.Wait()
     892           1 :                         numWriters = ww.mu.nextWriterIndex
     893           1 :                 }
     894             :         }
     895           1 :         if ww.mu.writers[lastWriter.index].w != nil {
     896           1 :                 // This permits log recycling.
     897           1 :                 ww.mu.writers[lastWriter.index].synchronouslyClosed = true
     898           1 :         }
     899           1 :         err = lastWriter.err
     900           1 :         ww.mu.metrics = lastWriter.metrics
     901           1 :         ww.mu.closed = true
     902           1 :         n, m := ww.q.popAll(err)
     903           1 :         if err == nil && (n > 0 || m > 0) {
     904           0 :                 panic(errors.AssertionFailedf("no error but recordQueue had %d records and %d syncs", n, m))
     905             :         }
     906           1 :         return logicalOffset, err
     907             : }
     908             : 
     909             : // Metrics implements writer.
     910           1 : func (ww *failoverWriter) Metrics() record.LogWriterMetrics {
     911           1 :         ww.mu.Lock()
     912           1 :         defer ww.mu.Unlock()
     913           1 :         return ww.mu.metrics
     914           1 : }
     915             : 
     916             : // getLog can be called at any time, including after Close returns.
     917           1 : func (ww *failoverWriter) getLog() logicalLogWithSizesEtc {
     918           1 :         ww.mu.Lock()
     919           1 :         defer ww.mu.Unlock()
     920           1 :         ll := logicalLogWithSizesEtc{
     921           1 :                 num: ww.opts.wn,
     922           1 :         }
     923           1 :         for i := range ww.mu.writers {
     924           1 :                 if ww.mu.writers[i].w != nil {
     925           1 :                         ll.segments = append(ll.segments, segmentWithSizeEtc{
     926           1 :                                 segment: segment{
     927           1 :                                         logNameIndex: LogNameIndex(i),
     928           1 :                                         dir:          ww.mu.writers[i].dir,
     929           1 :                                 },
     930           1 :                                 approxFileSize:      ww.mu.writers[i].approxFileSize,
     931           1 :                                 synchronouslyClosed: ww.mu.writers[i].synchronouslyClosed,
     932           1 :                         })
     933           1 :                 }
     934             :         }
     935           1 :         return ll
     936             : }
     937             : 
     938             : // latencyAndErrorRecorder records ongoing write and sync operations and errors
     939             : // in those operations. record.LogWriter cannot continue functioning after any
     940             : // error, so all errors are considered permanent.
     941             : //
     942             : // writeStart/writeEnd are used directly when creating a file. After the file
     943             : // is successfully created, setWriter turns latencyAndErrorRecorder into an
     944             : // implementation of writerSyncerCloser that will record for the Write and
     945             : // Sync methods.
     946             : type latencyAndErrorRecorder struct {
     947             :         ts                    timeSource
     948             :         ongoingOperationStart atomic.Int64
     949             :         error                 atomic.Pointer[error]
     950             :         writerSyncerCloser
     951             : }
     952             : 
     953             : type writerSyncerCloser interface {
     954             :         io.Writer
     955             :         io.Closer
     956             :         Sync() error
     957             : }
     958             : 
     959           1 : func (r *latencyAndErrorRecorder) writeStart() {
     960           1 :         r.ongoingOperationStart.Store(r.ts.now().UnixNano())
     961           1 : }
     962             : 
     963           1 : func (r *latencyAndErrorRecorder) writeEnd(err error) {
     964           1 :         if err != nil {
     965           1 :                 ptr := &err
     966           1 :                 r.error.Store(ptr)
     967           1 :         }
     968           1 :         r.ongoingOperationStart.Store(0)
     969             : }
     970             : 
     971           1 : func (r *latencyAndErrorRecorder) setWriter(w writerSyncerCloser) {
     972           1 :         r.writerSyncerCloser = w
     973           1 : }
     974             : 
     975           1 : func (r *latencyAndErrorRecorder) ongoingLatencyOrError() (time.Duration, error) {
     976           1 :         startTime := r.ongoingOperationStart.Load()
     977           1 :         var latency time.Duration
     978           1 :         if startTime != 0 {
     979           1 :                 l := r.ts.now().UnixNano() - startTime
     980           1 :                 if l < 0 {
     981           0 :                         l = 0
     982           0 :                 }
     983           1 :                 latency = time.Duration(l)
     984             :         }
     985           1 :         errPtr := r.error.Load()
     986           1 :         var err error
     987           1 :         if errPtr != nil {
     988           1 :                 err = *errPtr
     989           1 :         }
     990           1 :         return latency, err
     991             : }
     992             : 
     993             : // Sync implements writerSyncerCloser.
     994           1 : func (r *latencyAndErrorRecorder) Sync() error {
     995           1 :         r.writeStart()
     996           1 :         err := r.writerSyncerCloser.Sync()
     997           1 :         r.writeEnd(err)
     998           1 :         return err
     999           1 : }
    1000             : 
    1001             : // Write implements io.Writer.
    1002           1 : func (r *latencyAndErrorRecorder) Write(p []byte) (n int, err error) {
    1003           1 :         r.writeStart()
    1004           1 :         n, err = r.writerSyncerCloser.Write(p)
    1005           1 :         r.writeEnd(err)
    1006           1 :         return n, err
    1007           1 : }

Generated by: LCOV version 1.14