LCOV - code coverage report
Current view: top level - pebble/wal - failover_writer.go (source / functions) Hit Total Coverage
Test: 2024-06-20 08:16Z 71d59d67 - tests only.lcov Lines: 521 538 96.8 %
Date: 2024-06-20 08:17:14 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package wal
       6             : 
       7             : import (
       8             :         "io"
       9             :         "sync"
      10             :         "sync/atomic"
      11             :         "time"
      12             : 
      13             :         "github.com/cockroachdb/errors"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/record"
      16             :         "github.com/cockroachdb/pebble/vfs"
      17             :         "github.com/prometheus/client_golang/prometheus"
      18             : )
      19             : 
      20             : // recordQueueEntry is an entry in recordQueue.
      21             : type recordQueueEntry struct {
      22             :         p                   []byte
      23             :         opts                SyncOptions
      24             :         refCount            RefCount
      25             :         writeStartUnixNanos int64
      26             : }
      27             : 
      28             : type poppedEntry struct {
      29             :         opts                SyncOptions
      30             :         refCount            RefCount
      31             :         writeStartUnixNanos int64
      32             : }
      33             : 
      34             : const initialBufferLen = 8192
      35             : 
      36             : // recordQueue is a variable-size single-producer multiple-consumer queue. It
      37             : // is not lock-free, but most operations only need mu.RLock. It needs a mutex
      38             : // to grow the size, since there is no upper bound on the number of queued
      39             : // records (which are all the records that are not synced, and will need to be
      40             : // written again in case of failover). Additionally, it needs a mutex to
      41             : // atomically grab a snapshot of the queued records and provide them to a new
      42             : // LogWriter that is being switched to.
      43             : type recordQueue struct {
      44             :         // Only held for reading for all pop operations and most push operations.
      45             :         // Held for writing when buffer needs to be grown or when switching to a new
      46             :         // writer.
      47             :         mu sync.RWMutex
      48             : 
      49             :         // queue is [tail, head). tail is the oldest entry and head is the index for
      50             :         // the next entry.
      51             :         //
      52             :         // Consumers: atomically read and write tail in pop. This is not the usual
      53             :         // kind of queue consumer since they already know the index that they are
      54             :         // popping exists, hence don't need to look at head.
      55             :         //
      56             :         // Producer: atomically reads tail in push. Writes to head.
      57             :         //
      58             :         // Based on the above we only need tail to be atomic. However, the producer
      59             :         // also populates entries in buffer, whose values need to be seen by the
      60             :         // consumers when doing a pop, which means they need to synchronize using a
      61             :         // release and acquire memory barrier pair, where the push does the release
      62             :         // and the pop does the acquire. For this reason we make head also atomic
      63             :         // and merge head and tail into a single atomic, so that the store of head
      64             :         // in push and the load of tail in pop accomplishes this release-acquire
      65             :         // pair.
      66             :         //
      67             :         // We initially implemented competition between multiple consumers solely
      68             :         // via atomic read-write of the tail using compare-and-swap (CAS). Since the
      69             :         // atomic read-write to tail in pop releases those buffer entries for reuse
      70             :         // to the producer, the consumer needs to grab the contents of
      71             :         // recordQueueEntry that it needs to do callbacks etc. (specifically the
      72             :         // contents corresponding to poppedEntry), *before* it succeeds with the
      73             :         // atomic read-write. This introduced a false data race in the Golang data
      74             :         // race detector
      75             :         // https://github.com/cockroachdb/pebble/issues/3380#issuecomment-1981188174.
      76             :         // Consider the case where the queue is [10,20), and consumer C1 is trying
      77             :         // to pop [10,12) and consumer C2 is trying to pop [10,14). The following
      78             :         // interleaving can happen:
      79             :         //
      80             :         // [C1] reads head=10, tail=20
      81             :         // [C2] reads head=10, tail=20
      82             :         // [C1] reads buffer contents [10,12) and makes local copy
      83             :         // [C1] CAS to make the queue [12,20)
      84             :         // [C2] reads buffer contents [10,14) and makes local copy, concurrently
      85             :         //      with producer writing to 10, 11. *
      86             :         // [C2] CAS fails for popping [10,14), so revises to [12,14), and succeeds
      87             :         //      in CAS. C2 only uses the contents it read into the local copy for
      88             :         //      [12,14).
      89             :         //
      90             :         // * is a false data race since C2 is later deciding what contents it should
      91             :         // use from among the contents it read, based on what indices it
      92             :         // successfully popped. Unfortunately, we don't have a way to annotate the
      93             :         // code to tell the data race detector to ignore this false positive. So we
      94             :         // need to strengthen the synchronization to prevent such false positives.
      95             :         // We observe that usually a consumer will be popping a batch of entries
      96             :         // (based on a single successful fsync), and the number of consumers will be
      97             :         // small (usually 1). In comparison, producers can be highly concurrent (due
      98             :         // to workload concurrency). We don't want consumers to compete for a mutex
      99             :         // with producers, but we can afford to have multiple consumers compete for
     100             :         // a mutex. So we fix this false data race by using consumerMu to force
     101             :         // single-threaded popping.
     102             :         //
     103             :         // An alternative would be to pass the information contained in poppedEntry
     104             :         // to the LogWriter, so that it can pass it back when popping (so we don't
     105             :         // have to retrieve it from the recordQueue.buffer). We would still need
     106             :         // recordQueue.buffer, since writer switching needs those entries to be
     107             :         // replayed. We don't consider this solution for the same reason we replaced
     108             :         // record.pendingSyncsWithSyncQueue with
     109             :         // record.pendingSyncsWithHighestSyncIndex for the failoverWriter code path
     110             :         // -- we cannot bound the queue in the LogWriter by record.SyncConcurrency:
     111             :         // say SyncConcurrency was 4096, and LogWriter1's queue was full, and we
     112             :         // switched to LogWriter2, to which we replayed the same records and filled
     113             :         // up the queue. Then if LogWriter1 unblocks and pops all the 4096 entries,
     114             :         // the commit pipeline can send another 4096 entries, while LogWriter2 is
     115             :         // still blocked on trying to write and sync the previous 4096 entries. This
     116             :         // will overflow the queue in LogWriter2.
     117             :         //
     118             :         // All updates to headTail hold mu at least for reading. So when mu is held
     119             :         // for writing, there is a guarantee that headTail is not being updated.
     120             :         //
     121             :         // head is most-significant 32 bits and tail is least-significant 32 bits.
     122             :         headTail atomic.Uint64
     123             : 
     124             :         consumerMu sync.Mutex
     125             : 
     126             :         // Access to buffer requires at least RLock.
     127             :         buffer []recordQueueEntry
     128             : 
     129             :         lastTailObservedByProducer uint32
     130             : 
     131             :         // Read requires RLock.
     132             :         writer *record.LogWriter
     133             : 
     134             :         // When writer != nil, this is the return value of the last call to
     135             :         // SyncRecordGeneralized. It is updated in (a) WriteRecord calls push, using
     136             :         // only RLock (since WriteRecord is externally synchronized), (b)
     137             :         // snapshotAndSwitchWriter, using Lock. (b) excludes (a).
     138             :         lastLogSize int64
     139             : 
     140             :         failoverWriteAndSyncLatency prometheus.Histogram
     141             : }
     142             : 
     143           1 : func (q *recordQueue) init(failoverWriteAndSyncLatency prometheus.Histogram) {
     144           1 :         *q = recordQueue{
     145           1 :                 buffer:                      make([]recordQueueEntry, initialBufferLen),
     146           1 :                 failoverWriteAndSyncLatency: failoverWriteAndSyncLatency,
     147           1 :         }
     148           1 : }
     149             : 
     150             : // NB: externally synchronized, i.e., no concurrent push calls.
     151             : func (q *recordQueue) push(
     152             :         p []byte,
     153             :         opts SyncOptions,
     154             :         refCount RefCount,
     155             :         writeStartUnixNanos int64,
     156             :         latestLogSizeInWriteRecord int64,
     157             :         latestWriterInWriteRecord *record.LogWriter,
     158           1 : ) (index uint32, writer *record.LogWriter, lastLogSize int64) {
     159           1 :         ht := q.headTail.Load()
     160           1 :         h, t := unpackHeadTail(ht)
     161           1 :         n := int(h - t)
     162           1 :         m := len(q.buffer)
     163           1 :         if m == n {
     164           1 :                 // Full
     165           1 :                 m = 2 * n
     166           1 :                 newBuffer := make([]recordQueueEntry, m)
     167           1 :                 for i := int(t); i < int(h); i++ {
     168           1 :                         newBuffer[i%m] = q.buffer[i%n]
     169           1 :                 }
     170           1 :                 q.mu.Lock()
     171           1 :                 q.buffer = newBuffer
     172           1 :                 q.mu.Unlock()
     173             :         }
     174           1 :         q.mu.RLock()
     175           1 :         q.buffer[int(h)%m] = recordQueueEntry{
     176           1 :                 p:                   p,
     177           1 :                 opts:                opts,
     178           1 :                 refCount:            refCount,
     179           1 :                 writeStartUnixNanos: writeStartUnixNanos,
     180           1 :         }
     181           1 :         // Reclaim memory for consumed entries. We couldn't do that in pop since
     182           1 :         // multiple consumers are popping using CAS and that immediately transfers
     183           1 :         // ownership to the producer.
     184           1 :         for i := q.lastTailObservedByProducer; i < t; i++ {
     185           1 :                 q.buffer[int(i)%m] = recordQueueEntry{}
     186           1 :         }
     187           1 :         q.lastTailObservedByProducer = t
     188           1 :         q.headTail.Add(1 << headTailBits)
     189           1 :         writer = q.writer
     190           1 :         if writer == latestWriterInWriteRecord {
     191           1 :                 // WriteRecord has written to this writer since the switch.
     192           1 :                 q.lastLogSize = latestLogSizeInWriteRecord
     193           1 :         }
     194             :         // Else writer is a new writer that was switched to, so ignore the
     195             :         // latestLogSizeInWriteRecord.
     196             : 
     197           1 :         lastLogSize = q.lastLogSize
     198           1 :         q.mu.RUnlock()
     199           1 :         return h, writer, lastLogSize
     200             : }
     201             : 
     202           1 : func (q *recordQueue) length() int {
     203           1 :         ht := q.headTail.Load()
     204           1 :         h, t := unpackHeadTail(ht)
     205           1 :         return int(h - t)
     206           1 : }
     207             : 
     208             : // Pops all entries. Must be called only after the last push returns.
     209           1 : func (q *recordQueue) popAll(err error) (numRecords int, numSyncsPopped int) {
     210           1 :         ht := q.headTail.Load()
     211           1 :         h, t := unpackHeadTail(ht)
     212           1 :         n := int(h - t)
     213           1 :         if n == 0 {
     214           1 :                 return 0, 0
     215           1 :         }
     216           1 :         return n, q.pop(h-1, err)
     217             : }
     218             : 
     219             : // Pops all entries up to and including index. The remaining queue is
     220             : // [index+1, head).
     221             : //
     222             : // NB: we could slightly simplify to only have the latest writer be able to
     223             : // pop. This would avoid the CAS below, but it seems better to reduce the
     224             : // amount of queued work regardless of who has successfully written it.
     225           1 : func (q *recordQueue) pop(index uint32, err error) (numSyncsPopped int) {
     226           1 :         nowUnixNanos := time.Now().UnixNano()
     227           1 :         var buf [512]poppedEntry
     228           1 :         tailEntriesToPop := func() (t uint32, numEntriesToPop int) {
     229           1 :                 ht := q.headTail.Load()
     230           1 :                 _, t = unpackHeadTail(ht)
     231           1 :                 tail := int(t)
     232           1 :                 numEntriesToPop = int(index) - tail + 1
     233           1 :                 return t, numEntriesToPop
     234           1 :         }
     235           1 :         q.consumerMu.Lock()
     236           1 :         // numEntriesToPop is a function of index and tail. The value of tail cannot
     237           1 :         // change since consumerMu is held.
     238           1 :         tail, numEntriesToPop := tailEntriesToPop()
     239           1 :         if numEntriesToPop <= 0 {
     240           1 :                 q.consumerMu.Unlock()
     241           1 :                 return 0
     242           1 :         }
     243           1 :         var b []poppedEntry
     244           1 :         if numEntriesToPop <= len(buf) {
     245           1 :                 b = buf[:numEntriesToPop]
     246           1 :         } else {
     247           1 :                 // Do allocation before acquiring the mutex.
     248           1 :                 b = make([]poppedEntry, numEntriesToPop)
     249           1 :         }
     250           1 :         q.mu.RLock()
     251           1 :         n := len(q.buffer)
     252           1 :         for i := 0; i < numEntriesToPop; i++ {
     253           1 :                 // Grab the popped entries before incrementing tail, since that will
     254           1 :                 // release those buffer slots to the producer.
     255           1 :                 idx := (i + int(tail)) % n
     256           1 :                 b[i] = poppedEntry{
     257           1 :                         opts:                q.buffer[idx].opts,
     258           1 :                         refCount:            q.buffer[idx].refCount,
     259           1 :                         writeStartUnixNanos: q.buffer[idx].writeStartUnixNanos,
     260           1 :                 }
     261           1 :         }
     262             :         // Since tail cannot change, we don't need to do a compare-and-swap.
     263           1 :         q.headTail.Add(uint64(numEntriesToPop))
     264           1 :         q.mu.RUnlock()
     265           1 :         q.consumerMu.Unlock()
     266           1 :         addLatencySample := false
     267           1 :         var maxLatencyNanos int64
     268           1 :         for i := 0; i < numEntriesToPop; i++ {
     269           1 :                 // Now that we've synced the entry, we can unref it to signal that we
     270           1 :                 // will not read the written byte slice again.
     271           1 :                 if b[i].refCount != nil {
     272           1 :                         b[i].refCount.Unref()
     273           1 :                 }
     274           1 :                 if b[i].opts.Done != nil {
     275           1 :                         numSyncsPopped++
     276           1 :                         if err != nil {
     277           1 :                                 *b[i].opts.Err = err
     278           1 :                         }
     279           1 :                         b[i].opts.Done.Done()
     280           1 :                         latency := nowUnixNanos - b[i].writeStartUnixNanos
     281           1 :                         if !addLatencySample {
     282           1 :                                 addLatencySample = true
     283           1 :                                 maxLatencyNanos = latency
     284           1 :                         } else if maxLatencyNanos < latency {
     285           0 :                                 maxLatencyNanos = latency
     286           0 :                         }
     287             :                 }
     288             :         }
     289           1 :         if addLatencySample {
     290           1 :                 if maxLatencyNanos < 0 {
     291           0 :                         maxLatencyNanos = 0
     292           0 :                 }
     293           1 :                 q.failoverWriteAndSyncLatency.Observe(float64(maxLatencyNanos))
     294             :         }
     295           1 :         return numSyncsPopped
     296             : }
     297             : 
     298             : func (q *recordQueue) snapshotAndSwitchWriter(
     299             :         writer *record.LogWriter,
     300             :         snapshotFunc func(firstIndex uint32, entries []recordQueueEntry) (logSize int64),
     301           1 : ) {
     302           1 :         q.mu.Lock()
     303           1 :         defer q.mu.Unlock()
     304           1 :         q.writer = writer
     305           1 :         h, t := unpackHeadTail(q.headTail.Load())
     306           1 :         n := h - t
     307           1 :         if n > 0 {
     308           1 :                 m := uint32(len(q.buffer))
     309           1 :                 b := make([]recordQueueEntry, n)
     310           1 :                 for i := t; i < h; i++ {
     311           1 :                         b[i-t] = q.buffer[i%m]
     312           1 :                 }
     313           1 :                 q.lastLogSize = snapshotFunc(t, b)
     314             :         }
     315             : }
     316             : 
     317             : // getLastIndex is used by failoverWriter.Close.
     318           1 : func (q *recordQueue) getLastIndex() (lastIndex int64) {
     319           1 :         h, _ := unpackHeadTail(q.headTail.Load())
     320           1 :         return int64(h) - 1
     321           1 : }
     322             : 
     323             : const headTailBits = 32
     324             : 
     325           1 : func unpackHeadTail(ht uint64) (head, tail uint32) {
     326           1 :         const mask = 1<<headTailBits - 1
     327           1 :         head = uint32((ht >> headTailBits) & mask)
     328           1 :         tail = uint32(ht & mask)
     329           1 :         return head, tail
     330           1 : }
     331             : 
     332             : // Maximum number of physical log files when writing a virtual WAL. Arbitrarily
     333             : // chosen value. Setting this to 2 will not simplify the code. We make this a
     334             : // constant since we want a fixed size array for writer.writers.
     335             : const maxPhysicalLogs = 10
     336             : 
     337             : // failoverWriter is the implementation of Writer in failover mode. No Writer
     338             : // method blocks for IO, except for Close.
     339             : //
     340             : // Loosely speaking, Close blocks until all records are successfully written
     341             : // and synced to some log writer. Monitoring of log writer latency and errors
     342             : // continues after Close is called, which means failoverWriter can be switched
     343             : // to a new log writer after Close is called, to unblock Close.
     344             : //
     345             : // More precisely, Close does not block if there is an error in creating or
     346             : // closing the latest LogWriter when close was called. This is because errors
     347             : // are considered indicative of misconfiguration, and the user of
     348             : // failoverWriter can dampen switching when observing errors (e.g. see
     349             : // failoverMonitor), so close does not assume any liveness of calls to
     350             : // switchToNewDir when such errors occur. Since the caller (see db.go) treats
     351             : // an error on Writer.Close as fatal, this does mean that failoverWriter has
     352             : // limited ability to mask errors (its primary task is to mask high latency).
     353             : type failoverWriter struct {
     354             :         opts failoverWriterOpts
     355             :         q    recordQueue
     356             :         mu   struct {
     357             :                 sync.Mutex
     358             :                 // writers is protected by mu, except for updates to the
     359             :                 // latencyAndErrorRecorder field. WriteRecord does not acquire mu, so the
     360             :                 // protection by mu is for handling concurrent calls to switchToNewDir,
     361             :                 // Close, and getLog.
     362             :                 writers [maxPhysicalLogs]logWriterAndRecorder
     363             : 
     364             :                 // cond is signaled when the latest LogWriter is set in writers (or there
     365             :                 // is a creation error), or when the latest LogWriter is successfully
     366             :                 // closed. It is waited on in Close. We don't use channels and select
     367             :                 // since what Close is waiting on is dynamic based on the local state in
     368             :                 // Close, so using Cond is simpler.
     369             :                 cond *sync.Cond
     370             :                 // nextWriterIndex is advanced before creating the *LogWriter. That is, a
     371             :                 // slot is reserved by taking the current value of nextWriterIndex and
     372             :                 // incrementing it, and then the *LogWriter for that slot is created. When
     373             :                 // newFailoverWriter returns, nextWriterIndex = 1.
     374             :                 //
     375             :                 // The latest *LogWriter is (will be) at nextWriterIndex-1.
     376             :                 //
     377             :                 // INVARIANT: nextWriterIndex <= len(writers)
     378             :                 nextWriterIndex LogNameIndex
     379             :                 closed          bool
     380             :                 // metrics is initialized in Close. Currently we just use the metrics from
     381             :                 // the latest writer after it is closed, since in the common case with
     382             :                 // only one writer, that writer's flush loop will have finished and the
     383             :                 // metrics will be current. With multiple writers, these metrics can be
     384             :                 // quite inaccurate. The WriteThroughput metric includes an IdleDuration,
     385             :                 // which can be high for a writer that was switched away from, and
     386             :                 // therefore not indicative of overall work being done by the
     387             :                 // failoverWriter. The PendingBufferLen and SyncQueueLen are similarly
     388             :                 // inaccurate once there is no more work being given to a writer. We could
     389             :                 // add a method to LogWriter to stop sampling metrics when it is not the
     390             :                 // latest writer. Then we could aggregate all these metrics across all
     391             :                 // writers.
     392             :                 //
     393             :                 // Note that CockroachDB does not use these metrics in any meaningful way.
     394             :                 //
     395             :                 // TODO(sumeer): do the improved solution outlined above.
     396             :                 metrics record.LogWriterMetrics
     397             :         }
     398             :         // State for computing logical offset. The cumulative offset state is in
     399             :         // offset. Each time we call SyncRecordGeneralized from WriteRecord, we
     400             :         // compute the delta from the size returned by this LogWriter now, and the
     401             :         // size returned by this LogWriter in the previous call to
     402             :         // SyncRecordGeneralized. That previous call to SyncRecordGeneralized may
     403             :         // have happened from WriteRecord, or asynchronously during a switch. So
     404             :         // that previous call state requires synchronization and is maintained in
     405             :         // recordQueue. The offset is incremented by this delta without any
     406             :         // synchronization, since we rely on external synchronization (like the
     407             :         // standaloneWriter).
     408             :         logicalOffset struct {
     409             :                 latestWriterInWriteRecord  *record.LogWriter
     410             :                 latestLogSizeInWriteRecord int64
     411             :                 offset                     int64
     412             :                 // Transitions once from false => true when there is a non-nil writer.
     413             :                 notEstimatedOffset bool
     414             :         }
     415             :         psiForWriteRecordBacking record.PendingSyncIndex
     416             :         psiForSwitchBacking      record.PendingSyncIndex
     417             : }
     418             : 
     419             : type logWriterAndRecorder struct {
     420             :         // This may never become non-nil, if when the LogWriter was finally created,
     421             :         // it was no longer the latest writer. Additionally, if there was an error
     422             :         // in creating the writer, w will remain nil and createError will be set.
     423             :         w *record.LogWriter
     424             :         // createError is set if there is an error creating the writer. This is
     425             :         // useful in Close since we need to know when the work for creating the
     426             :         // latest writer is done, whether it resulted in success or not.
     427             :         createError error
     428             :         r           latencyAndErrorRecorder
     429             : 
     430             :         // dir, approxFileSize, synchronouslyClosed are kept for initializing
     431             :         // segmentWithSizeEtc. The approxFileSize is initially set to whatever is
     432             :         // returned by logCreator. When failoverWriter.Close is called,
     433             :         // approxFileSize and synchronouslyClosed may be updated.
     434             :         dir                 Dir
     435             :         approxFileSize      uint64
     436             :         synchronouslyClosed bool
     437             : }
     438             : 
     439             : var _ Writer = &failoverWriter{}
     440             : 
     441             : var _ switchableWriter = &failoverWriter{}
     442             : 
     443             : type failoverWriterOpts struct {
     444             :         wn     NumWAL
     445             :         logger base.Logger
     446             :         timeSource
     447             :         jobID int
     448             :         logCreator
     449             : 
     450             :         // Options that feed into SyncingFileOptions.
     451             :         noSyncOnClose   bool
     452             :         bytesPerSync    int
     453             :         preallocateSize func() int
     454             : 
     455             :         // Options for record.LogWriter.
     456             :         minSyncInterval func() time.Duration
     457             :         fsyncLatency    prometheus.Histogram
     458             :         queueSemChan    chan struct{}
     459             :         stopper         *stopper
     460             : 
     461             :         failoverWriteAndSyncLatency prometheus.Histogram
     462             :         writerClosed                func(logicalLogWithSizesEtc)
     463             : 
     464             :         writerCreatedForTest chan<- struct{}
     465             : }
     466             : 
     467             : func simpleLogCreator(
     468             :         dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
     469           1 : ) (f vfs.File, initialFileSize uint64, err error) {
     470           1 :         filename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li))
     471           1 :         // Create file.
     472           1 :         r.writeStart()
     473           1 :         f, err = dir.FS.Create(filename, "pebble-wal")
     474           1 :         r.writeEnd(err)
     475           1 :         return f, 0, err
     476           1 : }
     477             : 
     478             : type logCreator func(
     479             :         dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
     480             : ) (f vfs.File, initialFileSize uint64, err error)
     481             : 
     482             : func newFailoverWriter(
     483             :         opts failoverWriterOpts, initialDir dirAndFileHandle,
     484           1 : ) (*failoverWriter, error) {
     485           1 :         ww := &failoverWriter{
     486           1 :                 opts: opts,
     487           1 :         }
     488           1 :         ww.q.init(opts.failoverWriteAndSyncLatency)
     489           1 :         ww.mu.cond = sync.NewCond(&ww.mu)
     490           1 :         // The initial record.LogWriter creation also happens via a
     491           1 :         // switchToNewWriter since we don't want it to block newFailoverWriter.
     492           1 :         err := ww.switchToNewDir(initialDir)
     493           1 :         if err != nil {
     494           0 :                 // Switching limit cannot be exceeded when creating.
     495           0 :                 panic(err)
     496             :         }
     497           1 :         return ww, nil
     498             : }
     499             : 
     500             : // WriteRecord implements Writer.
     501             : func (ww *failoverWriter) WriteRecord(
     502             :         p []byte, opts SyncOptions, ref RefCount,
     503           1 : ) (logicalOffset int64, err error) {
     504           1 :         if ref != nil {
     505           1 :                 ref.Ref()
     506           1 :         }
     507           1 :         var writeStartUnixNanos int64
     508           1 :         if opts.Done != nil {
     509           1 :                 writeStartUnixNanos = time.Now().UnixNano()
     510           1 :         }
     511           1 :         recordIndex, writer, lastLogSize := ww.q.push(
     512           1 :                 p,
     513           1 :                 opts,
     514           1 :                 ref,
     515           1 :                 writeStartUnixNanos,
     516           1 :                 ww.logicalOffset.latestLogSizeInWriteRecord,
     517           1 :                 ww.logicalOffset.latestWriterInWriteRecord,
     518           1 :         )
     519           1 :         if writer == nil {
     520           1 :                 // Don't have a record.LogWriter yet, so use an estimate. This estimate
     521           1 :                 // will get overwritten.
     522           1 :                 ww.logicalOffset.offset += int64(len(p))
     523           1 :                 return ww.logicalOffset.offset, nil
     524           1 :         }
     525             :         // INVARIANT: writer != nil.
     526           1 :         notEstimatedOffset := ww.logicalOffset.notEstimatedOffset
     527           1 :         if !notEstimatedOffset {
     528           1 :                 ww.logicalOffset.notEstimatedOffset = true
     529           1 :         }
     530           1 :         ww.psiForWriteRecordBacking = record.PendingSyncIndex{Index: record.NoSyncIndex}
     531           1 :         if opts.Done != nil {
     532           1 :                 ww.psiForWriteRecordBacking.Index = int64(recordIndex)
     533           1 :         }
     534           1 :         ww.logicalOffset.latestLogSizeInWriteRecord, err = writer.SyncRecordGeneralized(p, &ww.psiForWriteRecordBacking)
     535           1 :         ww.logicalOffset.latestWriterInWriteRecord = writer
     536           1 :         if notEstimatedOffset {
     537           1 :                 delta := ww.logicalOffset.latestLogSizeInWriteRecord - lastLogSize
     538           1 :                 ww.logicalOffset.offset += delta
     539           1 :         } else {
     540           1 :                 // Overwrite the estimate. This is a best-effort improvement in that it is
     541           1 :                 // accurate for the common case where writer is the first LogWriter.
     542           1 :                 // Consider a failover scenario where there was no LogWriter for the first
     543           1 :                 // 10 records, so they are all accumulated as an estimate. Then the first
     544           1 :                 // LogWriter successfully writes and syncs the first 5 records and gets
     545           1 :                 // stuck. A switch happens to a second LogWriter that is handed the
     546           1 :                 // remaining 5 records, and the 11th record arrives via a WriteRecord.
     547           1 :                 // The transition from !notEstimatedOffset to notEstimatedOffset will
     548           1 :                 // happen on this 11th record, and the logic here will use the length of
     549           1 :                 // the second LogWriter, that does not reflect the full length.
     550           1 :                 //
     551           1 :                 // TODO(sumeer): try to make this more correct, without adding much more
     552           1 :                 // complexity, and without adding synchronization.
     553           1 :                 ww.logicalOffset.offset = ww.logicalOffset.latestLogSizeInWriteRecord
     554           1 :         }
     555           1 :         return ww.logicalOffset.offset, err
     556             : }
     557             : 
     558             : // switchToNewDir starts switching to dir. It implements switchableWriter. All
     559             : // work is async, and a non-nil error is returned only if the switching limit
     560             : // is exceeded.
     561           1 : func (ww *failoverWriter) switchToNewDir(dir dirAndFileHandle) error {
     562           1 :         ww.mu.Lock()
     563           1 :         // Can have a late switchToNewDir call is the failoverMonitor has not yet
     564           1 :         // been told that the writer is closed. Ignore.
     565           1 :         if ww.mu.closed {
     566           1 :                 ww.mu.Unlock()
     567           1 :                 if ww.opts.writerCreatedForTest != nil {
     568           1 :                         ww.opts.writerCreatedForTest <- struct{}{}
     569           1 :                 }
     570           1 :                 return nil
     571             :         }
     572             :         // writerIndex is the slot for this writer.
     573           1 :         writerIndex := ww.mu.nextWriterIndex
     574           1 :         if int(writerIndex) == len(ww.mu.writers) {
     575           1 :                 ww.mu.Unlock()
     576           1 :                 return errors.Errorf("exceeded switching limit")
     577           1 :         }
     578           1 :         ww.mu.writers[writerIndex].dir = dir.Dir
     579           1 :         ww.mu.nextWriterIndex++
     580           1 :         ww.mu.Unlock()
     581           1 : 
     582           1 :         // Creation is async.
     583           1 :         ww.opts.stopper.runAsync(func() {
     584           1 :                 recorderAndWriter := &ww.mu.writers[writerIndex].r
     585           1 :                 recorderAndWriter.ts = ww.opts.timeSource
     586           1 :                 file, initialFileSize, err := ww.opts.logCreator(
     587           1 :                         dir.Dir, ww.opts.wn, writerIndex, recorderAndWriter, ww.opts.jobID)
     588           1 :                 ww.mu.writers[writerIndex].approxFileSize = initialFileSize
     589           1 :                 // handleErrFunc is called when err != nil. It handles the multiple IO error
     590           1 :                 // cases below.
     591           1 :                 handleErrFunc := func(err error) {
     592           1 :                         if file != nil {
     593           1 :                                 file.Close()
     594           1 :                         }
     595           1 :                         ww.mu.Lock()
     596           1 :                         defer ww.mu.Unlock()
     597           1 :                         ww.mu.writers[writerIndex].createError = err
     598           1 :                         ww.mu.cond.Signal()
     599           1 :                         if ww.opts.writerCreatedForTest != nil {
     600           1 :                                 ww.opts.writerCreatedForTest <- struct{}{}
     601           1 :                         }
     602             :                 }
     603           1 :                 if err != nil {
     604           1 :                         handleErrFunc(err)
     605           1 :                         return
     606           1 :                 }
     607             :                 // Sync dir.
     608           1 :                 recorderAndWriter.writeStart()
     609           1 :                 err = dir.Sync()
     610           1 :                 recorderAndWriter.writeEnd(err)
     611           1 :                 if err != nil {
     612           1 :                         handleErrFunc(err)
     613           1 :                         return
     614           1 :                 }
     615             :                 // Wrap in a syncingFile.
     616           1 :                 syncingFile := vfs.NewSyncingFile(file, vfs.SyncingFileOptions{
     617           1 :                         NoSyncOnClose:   ww.opts.noSyncOnClose,
     618           1 :                         BytesPerSync:    ww.opts.bytesPerSync,
     619           1 :                         PreallocateSize: ww.opts.preallocateSize(),
     620           1 :                 })
     621           1 :                 // Wrap in the latencyAndErrorRecorder.
     622           1 :                 recorderAndWriter.setWriter(syncingFile)
     623           1 : 
     624           1 :                 // Using NumWAL as the DiskFileNum is fine since it is used only as
     625           1 :                 // EOF trailer for safe log recycling. Even though many log files can
     626           1 :                 // map to a single NumWAL, a file used for NumWAL n at index m will
     627           1 :                 // never get recycled for NumWAL n at a later index (since recycling
     628           1 :                 // happens when n as a whole is obsolete).
     629           1 :                 w := record.NewLogWriter(recorderAndWriter, base.DiskFileNum(ww.opts.wn),
     630           1 :                         record.LogWriterConfig{
     631           1 :                                 WALMinSyncInterval:        ww.opts.minSyncInterval,
     632           1 :                                 WALFsyncLatency:           ww.opts.fsyncLatency,
     633           1 :                                 QueueSemChan:              ww.opts.queueSemChan,
     634           1 :                                 ExternalSyncQueueCallback: ww.doneSyncCallback,
     635           1 :                         })
     636           1 :                 closeWriter := func() bool {
     637           1 :                         ww.mu.Lock()
     638           1 :                         defer ww.mu.Unlock()
     639           1 :                         if writerIndex+1 != ww.mu.nextWriterIndex || ww.mu.closed {
     640           1 :                                 // Not the latest writer or the writer was closed while this async
     641           1 :                                 // creation was ongoing.
     642           1 :                                 if ww.opts.writerCreatedForTest != nil {
     643           1 :                                         ww.opts.writerCreatedForTest <- struct{}{}
     644           1 :                                 }
     645           1 :                                 return true
     646             :                         }
     647             :                         // Latest writer.
     648           1 :                         ww.mu.writers[writerIndex].w = w
     649           1 :                         ww.mu.cond.Signal()
     650           1 :                         // NB: snapshotAndSwitchWriter does not block on IO, since
     651           1 :                         // SyncRecordGeneralized does no IO.
     652           1 :                         ww.q.snapshotAndSwitchWriter(w,
     653           1 :                                 func(firstIndex uint32, entries []recordQueueEntry) (logSize int64) {
     654           1 :                                         for i := range entries {
     655           1 :                                                 ww.psiForSwitchBacking = record.PendingSyncIndex{Index: record.NoSyncIndex}
     656           1 :                                                 if entries[i].opts.Done != nil {
     657           1 :                                                         ww.psiForSwitchBacking.Index = int64(firstIndex) + int64(i)
     658           1 :                                                 }
     659           1 :                                                 var err error
     660           1 :                                                 logSize, err = w.SyncRecordGeneralized(entries[i].p, &ww.psiForSwitchBacking)
     661           1 :                                                 if err != nil {
     662           0 :                                                         // TODO(sumeer): log periodically. The err will also surface via
     663           0 :                                                         // the latencyAndErrorRecorder, so if a switch is possible, it
     664           0 :                                                         // will be done.
     665           0 :                                                         ww.opts.logger.Errorf("%s", err)
     666           0 :                                                 }
     667             :                                         }
     668           1 :                                         return logSize
     669             :                                 })
     670           1 :                         if ww.opts.writerCreatedForTest != nil {
     671           1 :                                 ww.opts.writerCreatedForTest <- struct{}{}
     672           1 :                         }
     673           1 :                         return false
     674             :                 }()
     675           1 :                 if closeWriter {
     676           1 :                         // Never wrote anything to this writer so don't care about the
     677           1 :                         // returned error.
     678           1 :                         ww.opts.stopper.runAsync(func() {
     679           1 :                                 _ = w.Close()
     680           1 :                                 // TODO(sumeer): consider deleting this file too, since
     681           1 :                                 // failoverWriter.Close may not wait for it. This is going to be
     682           1 :                                 // extremely rare, so the risk of garbage empty files piling up is
     683           1 :                                 // extremely low. Say failover happens daily and of those cases we
     684           1 :                                 // have to be very unlucky and the close happens while a failover was
     685           1 :                                 // ongoing and the previous LogWriter successfully wrote everything
     686           1 :                                 // (say 1% probability if we want to be pessimistic). A garbage file
     687           1 :                                 // every 100 days. Restarts will delete that garbage.
     688           1 :                         })
     689             :                 }
     690             :         })
     691           1 :         return nil
     692             : }
     693             : 
     694             : // doneSyncCallback is the record.ExternalSyncQueueCallback called by
     695             : // record.LogWriter.
     696             : //
     697             : // recordQueue is popped from only when some work requests a sync (and
     698             : // successfully syncs). In the worst case, if no syncs are requested, we could
     699             : // queue all the records needed to fill up a memtable in the recordQueue. This
     700             : // can have two negative effects: (a) in the case of failover, we need to
     701             : // replay all the data in the current mutable memtable, which takes more time,
     702             : // (b) the memory usage is proportional to the size of the memtable. We ignore
     703             : // these negatives since, (a) users like CockroachDB regularly sync, and (b)
     704             : // the default memtable size is only 64MB.
     705           1 : func (ww *failoverWriter) doneSyncCallback(doneSync record.PendingSyncIndex, err error) {
     706           1 :         if err != nil {
     707           1 :                 // Don't pop anything since we can retry after switching to a new
     708           1 :                 // LogWriter.
     709           1 :                 return
     710           1 :         }
     711             :         // NB: harmless after Close returns since numSyncsPopped will be 0.
     712           1 :         numSyncsPopped := ww.q.pop(uint32(doneSync.Index), err)
     713           1 :         if ww.opts.queueSemChan != nil {
     714           1 :                 for i := 0; i < numSyncsPopped; i++ {
     715           1 :                         <-ww.opts.queueSemChan
     716           1 :                 }
     717             :         }
     718             : }
     719             : 
     720             : // ongoingLatencyOrErrorForCurDir implements switchableWriter.
     721           1 : func (ww *failoverWriter) ongoingLatencyOrErrorForCurDir() (time.Duration, error) {
     722           1 :         r := ww.recorderForCurDir()
     723           1 :         if r == nil {
     724           0 :                 return 0, nil
     725           0 :         }
     726           1 :         return r.ongoingLatencyOrError()
     727             : }
     728             : 
     729             : // For internal use and testing.
     730           1 : func (ww *failoverWriter) recorderForCurDir() *latencyAndErrorRecorder {
     731           1 :         ww.mu.Lock()
     732           1 :         defer ww.mu.Unlock()
     733           1 :         if ww.mu.closed {
     734           0 :                 return nil
     735           0 :         }
     736           1 :         return &ww.mu.writers[ww.mu.nextWriterIndex-1].r
     737             : }
     738             : 
     739             : // Close implements Writer.
     740             : //
     741             : // NB: getOngoingLatencyOrErrorForLatestWriter and switchToNewDir can be
     742             : // called after Close is called, and there is also a possibility that they get
     743             : // called after Close returns and before failoverMonitor knows that the
     744             : // failoverWriter is closed.
     745             : //
     746             : // doneSyncCallback can be called anytime after Close returns since there
     747             : // could be stuck writes that finish arbitrarily later.
     748             : //
     749             : // See the long comment about Close behavior where failoverWriter is declared.
     750           1 : func (ww *failoverWriter) Close() (logicalOffset int64, err error) {
     751           1 :         offset, err := ww.closeInternal()
     752           1 :         ww.opts.writerClosed(ww.getLog())
     753           1 :         return offset, err
     754           1 : }
     755             : 
     756           1 : func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) {
     757           1 :         logicalOffset = ww.logicalOffset.offset
     758           1 :         // [0, closeCalledCount) have had LogWriter.Close called (though may not
     759           1 :         // have finished) or the LogWriter will never be non-nil. Either way, they
     760           1 :         // have been "processed".
     761           1 :         closeCalledCount := LogNameIndex(0)
     762           1 :         // lastWriterState is the state for the last writer, for which we are
     763           1 :         // waiting for LogWriter.Close to finish or for creation to be unsuccessful.
     764           1 :         // What is considered the last writer can change. All state is protected by
     765           1 :         // ww.mu.
     766           1 :         type lastWriterState struct {
     767           1 :                 index   LogNameIndex
     768           1 :                 closed  bool
     769           1 :                 err     error
     770           1 :                 metrics record.LogWriterMetrics
     771           1 :         }
     772           1 :         var lastWriter lastWriterState
     773           1 :         lastRecordIndex := record.PendingSyncIndex{Index: ww.q.getLastIndex()}
     774           1 :         ww.mu.Lock()
     775           1 :         defer ww.mu.Unlock()
     776           1 :         // Every iteration starts and ends with the mutex held.
     777           1 :         //
     778           1 :         // Invariant: ww.mu.nextWriterIndex >= 1.
     779           1 :         //
     780           1 :         // We will loop until we have closed the lastWriter (and use
     781           1 :         // lastWriter.err). We also need to call close on all LogWriters
     782           1 :         // that will not close themselves, i.e., those that have already been
     783           1 :         // created and installed in failoverWriter.writers (this set may change
     784           1 :         // while failoverWriter.Close runs).
     785           1 :         for !lastWriter.closed {
     786           1 :                 numWriters := ww.mu.nextWriterIndex
     787           1 :                 if numWriters > closeCalledCount {
     788           1 :                         // INVARIANT: numWriters > closeCalledCount.
     789           1 :                         lastWriter = lastWriterState{
     790           1 :                                 index: numWriters - 1,
     791           1 :                         }
     792           1 :                         // Try to process [closeCalledCount, numWriters). Will surely process
     793           1 :                         // [closeCalledCount, numWriters-1), since those writers are either done
     794           1 :                         // initializing, or will close themselves. The writer at numWriters-1 we
     795           1 :                         // can only process if it is done initializing, else we will iterate
     796           1 :                         // again.
     797           1 :                         for i := closeCalledCount; i < numWriters; i++ {
     798           1 :                                 w := ww.mu.writers[i].w
     799           1 :                                 cErr := ww.mu.writers[i].createError
     800           1 :                                 // Is the current index the last writer. If yes, this is also the last
     801           1 :                                 // loop iteration.
     802           1 :                                 isLastWriter := i == lastWriter.index
     803           1 :                                 if w != nil {
     804           1 :                                         // Can close it, so extend closeCalledCount.
     805           1 :                                         closeCalledCount = i + 1
     806           1 :                                         size := uint64(w.Size())
     807           1 :                                         if ww.mu.writers[i].approxFileSize < size {
     808           1 :                                                 ww.mu.writers[i].approxFileSize = size
     809           1 :                                         }
     810           1 :                                         if isLastWriter {
     811           1 :                                                 // We may care about its error and when it finishes closing.
     812           1 :                                                 index := i
     813           1 :                                                 ww.opts.stopper.runAsync(func() {
     814           1 :                                                         // Last writer(s) (since new writers can be created and become
     815           1 :                                                         // last, as we iterate) are guaranteed to have seen the last
     816           1 :                                                         // record (since it was queued before Close was called). It is
     817           1 :                                                         // possible that a writer got created after the last record was
     818           1 :                                                         // dequeued and before this fact was realized by Close. In that
     819           1 :                                                         // case we will harmlessly tell it that it synced that last
     820           1 :                                                         // record, though it has already been written and synced by
     821           1 :                                                         // another writer.
     822           1 :                                                         err := w.CloseWithLastQueuedRecord(lastRecordIndex)
     823           1 :                                                         ww.mu.Lock()
     824           1 :                                                         defer ww.mu.Unlock()
     825           1 :                                                         if lastWriter.index == index {
     826           1 :                                                                 lastWriter.closed = true
     827           1 :                                                                 lastWriter.err = err
     828           1 :                                                                 lastWriter.metrics = w.Metrics()
     829           1 :                                                                 ww.mu.cond.Signal()
     830           1 :                                                         }
     831             :                                                 })
     832           1 :                                         } else {
     833           1 :                                                 // Don't care about the returned error since all the records we
     834           1 :                                                 // relied on this writer for were already successfully written.
     835           1 :                                                 ww.opts.stopper.runAsync(func() {
     836           1 :                                                         _ = w.CloseWithLastQueuedRecord(record.PendingSyncIndex{Index: record.NoSyncIndex})
     837           1 :                                                 })
     838             :                                         }
     839           1 :                                 } else if cErr != nil {
     840           1 :                                         // Have processed it, so extend closeCalledCount.
     841           1 :                                         closeCalledCount = i + 1
     842           1 :                                         if isLastWriter {
     843           1 :                                                 lastWriter.closed = true
     844           1 :                                                 lastWriter.err = cErr
     845           1 :                                                 lastWriter.metrics = record.LogWriterMetrics{}
     846           1 :                                         }
     847             :                                         // Else, ignore.
     848           1 :                                 } else {
     849           1 :                                         if !isLastWriter {
     850           1 :                                                 // Not last writer, so will close itself.
     851           1 :                                                 closeCalledCount = i + 1
     852           1 :                                         }
     853             :                                         // Else, last writer, so we may have to close it.
     854             :                                 }
     855             :                         }
     856             :                 }
     857           1 :                 if !lastWriter.closed {
     858           1 :                         // Either waiting for creation of last writer or waiting for the close
     859           1 :                         // to finish, or something else to become the last writer.
     860           1 :                         ww.mu.cond.Wait()
     861           1 :                 }
     862             :         }
     863           1 :         if ww.mu.writers[lastWriter.index].w != nil {
     864           1 :                 // This permits log recycling.
     865           1 :                 ww.mu.writers[lastWriter.index].synchronouslyClosed = true
     866           1 :         }
     867           1 :         err = lastWriter.err
     868           1 :         ww.mu.metrics = lastWriter.metrics
     869           1 :         ww.mu.closed = true
     870           1 :         _, _ = ww.q.popAll(err)
     871           1 :         return logicalOffset, err
     872             : }
     873             : 
     874             : // Metrics implements writer.
     875           1 : func (ww *failoverWriter) Metrics() record.LogWriterMetrics {
     876           1 :         ww.mu.Lock()
     877           1 :         defer ww.mu.Unlock()
     878           1 :         return ww.mu.metrics
     879           1 : }
     880             : 
     881             : // getLog can be called at any time, including after Close returns.
     882           1 : func (ww *failoverWriter) getLog() logicalLogWithSizesEtc {
     883           1 :         ww.mu.Lock()
     884           1 :         defer ww.mu.Unlock()
     885           1 :         ll := logicalLogWithSizesEtc{
     886           1 :                 num: ww.opts.wn,
     887           1 :         }
     888           1 :         for i := range ww.mu.writers {
     889           1 :                 if ww.mu.writers[i].w != nil {
     890           1 :                         ll.segments = append(ll.segments, segmentWithSizeEtc{
     891           1 :                                 segment: segment{
     892           1 :                                         logNameIndex: LogNameIndex(i),
     893           1 :                                         dir:          ww.mu.writers[i].dir,
     894           1 :                                 },
     895           1 :                                 approxFileSize:      ww.mu.writers[i].approxFileSize,
     896           1 :                                 synchronouslyClosed: ww.mu.writers[i].synchronouslyClosed,
     897           1 :                         })
     898           1 :                 }
     899             :         }
     900           1 :         return ll
     901             : }
     902             : 
     903             : // latencyAndErrorRecorder records ongoing write and sync operations and errors
     904             : // in those operations. record.LogWriter cannot continue functioning after any
     905             : // error, so all errors are considered permanent.
     906             : //
     907             : // writeStart/writeEnd are used directly when creating a file. After the file
     908             : // is successfully created, setWriter turns latencyAndErrorRecorder into an
     909             : // implementation of writerSyncerCloser that will record for the Write and
     910             : // Sync methods.
     911             : type latencyAndErrorRecorder struct {
     912             :         ts                    timeSource
     913             :         ongoingOperationStart atomic.Int64
     914             :         error                 atomic.Pointer[error]
     915             :         writerSyncerCloser
     916             : }
     917             : 
     918             : type writerSyncerCloser interface {
     919             :         io.Writer
     920             :         io.Closer
     921             :         Sync() error
     922             : }
     923             : 
     924           1 : func (r *latencyAndErrorRecorder) writeStart() {
     925           1 :         r.ongoingOperationStart.Store(r.ts.now().UnixNano())
     926           1 : }
     927             : 
     928           1 : func (r *latencyAndErrorRecorder) writeEnd(err error) {
     929           1 :         if err != nil {
     930           1 :                 ptr := &err
     931           1 :                 r.error.Store(ptr)
     932           1 :         }
     933           1 :         r.ongoingOperationStart.Store(0)
     934             : }
     935             : 
     936           1 : func (r *latencyAndErrorRecorder) setWriter(w writerSyncerCloser) {
     937           1 :         r.writerSyncerCloser = w
     938           1 : }
     939             : 
     940           1 : func (r *latencyAndErrorRecorder) ongoingLatencyOrError() (time.Duration, error) {
     941           1 :         startTime := r.ongoingOperationStart.Load()
     942           1 :         var latency time.Duration
     943           1 :         if startTime != 0 {
     944           1 :                 l := r.ts.now().UnixNano() - startTime
     945           1 :                 if l < 0 {
     946           0 :                         l = 0
     947           0 :                 }
     948           1 :                 latency = time.Duration(l)
     949             :         }
     950           1 :         errPtr := r.error.Load()
     951           1 :         var err error
     952           1 :         if errPtr != nil {
     953           1 :                 err = *errPtr
     954           1 :         }
     955           1 :         return latency, err
     956             : }
     957             : 
     958             : // Sync implements writerSyncerCloser.
     959           1 : func (r *latencyAndErrorRecorder) Sync() error {
     960           1 :         r.writeStart()
     961           1 :         err := r.writerSyncerCloser.Sync()
     962           1 :         r.writeEnd(err)
     963           1 :         return err
     964           1 : }
     965             : 
     966             : // Write implements io.Writer.
     967           1 : func (r *latencyAndErrorRecorder) Write(p []byte) (n int, err error) {
     968           1 :         r.writeStart()
     969           1 :         n, err = r.writerSyncerCloser.Write(p)
     970           1 :         r.writeEnd(err)
     971           1 :         return n, err
     972           1 : }

Generated by: LCOV version 1.14