LCOV - code coverage report
Current view: top level - pebble/wal/wal - failover_manager.go (source / functions) Coverage Total Hit
Test: 2025-01-29 08:16Z 73edf739 - meta test only.lcov Lines: 86.9 % 541 470
Test Date: 2025-01-29 08:17:55 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package wal
       6              : 
       7              : import (
       8              :         "fmt"
       9              :         "io"
      10              :         "math/rand/v2"
      11              :         "os"
      12              :         "sync"
      13              :         "time"
      14              : 
      15              :         "github.com/cockroachdb/errors"
      16              :         "github.com/cockroachdb/pebble/internal/base"
      17              :         "github.com/cockroachdb/pebble/internal/invariants"
      18              :         "github.com/cockroachdb/pebble/vfs"
      19              : )
      20              : 
      21              : // dirProber probes the primary dir, until it is confirmed to be healthy. If
      22              : // it doesn't have enough samples, it is deemed to be unhealthy. It is only
      23              : // used for failback to the primary.
      24              : type dirProber struct {
      25              :         fs vfs.FS
      26              :         // The full path of the file to use for the probe. The probe is destructive
      27              :         // in that it deletes and (re)creates the file.
      28              :         filename string
      29              :         // The probing interval, when enabled.
      30              :         interval time.Duration
      31              :         timeSource
      32              :         // buf holds the random bytes that are written during the probe.
      33              :         buf [100 << 10]byte
      34              :         // enabled is signaled to enable and disable probing. The initial state of
      35              :         // the prober is disabled.
      36              :         enabled chan bool
      37              :         mu      struct {
      38              :                 sync.Mutex
      39              :                 // Circular buffer of history samples.
      40              :                 history [probeHistoryLength]time.Duration
      41              :                 // The history is in [firstProbeIndex, nextProbeIndex).
      42              :                 firstProbeIndex int
      43              :                 nextProbeIndex  int
      44              :         }
      45              :         iterationForTesting chan<- struct{}
      46              : }
      47              : 
      48              : const probeHistoryLength = 128
      49              : 
      50              : // Large value.
      51              : const failedProbeDuration = 24 * 60 * 60 * time.Second
      52              : 
      53              : // init takes a stopper in order to connect the dirProber's long-running
      54              : // goroutines with the stopper's wait group, but the dirProber has its own
      55              : // stop() method that should be invoked to trigger the shutdown.
      56              : func (p *dirProber) init(
      57              :         fs vfs.FS,
      58              :         filename string,
      59              :         interval time.Duration,
      60              :         stopper *stopper,
      61              :         ts timeSource,
      62              :         notifyIterationForTesting chan<- struct{},
      63            1 : ) {
      64            1 :         *p = dirProber{
      65            1 :                 fs:                  fs,
      66            1 :                 filename:            filename,
      67            1 :                 interval:            interval,
      68            1 :                 timeSource:          ts,
      69            1 :                 enabled:             make(chan bool),
      70            1 :                 iterationForTesting: notifyIterationForTesting,
      71            1 :         }
      72            1 :         // Random bytes for writing, to defeat any FS compression optimization.
      73            1 :         for i := range p.buf {
      74            1 :                 p.buf[i] = byte(rand.Uint32())
      75            1 :         }
      76              :         // dirProber has an explicit stop() method instead of listening on
      77              :         // stopper.shouldQuiesce. This structure helps negotiate the shutdown
      78              :         // between failoverMonitor and dirProber. If the dirProber independently
      79              :         // listened to shouldQuiesce, it might exit before the failover monitor. The
      80              :         // [enable|disable]Probing methods require sending on a channel expecting
      81              :         // that the dirProber goroutine will receive on the same channel. If the
      82              :         // dirProber noticed the queiscence and exited first, the failoverMonitor
      83              :         // could deadlock waiting for a goroutine that no longer exists.
      84              :         //
      85              :         // Instead, shutdown of the dirProber is coordinated by the failoverMonitor:
      86              :         //   - the failoverMonitor listens to stopper.shouldQuiesce.
      87              :         //   - when the failoverMonitor is exiting, it will no longer attempt to
      88              :         //   interact with the dirProber. Only then does it invoke dirProber.stop.
      89            1 :         stopper.runAsync(p.probeLoop)
      90              : }
      91              : 
      92            1 : func (p *dirProber) probeLoop() {
      93            1 :         ticker := p.timeSource.newTicker(p.interval)
      94            1 :         ticker.stop()
      95            1 :         tickerCh := ticker.ch()
      96            1 :         shouldContinue := true
      97            1 :         var enabled bool
      98            1 :         for shouldContinue {
      99            1 :                 select {
     100            1 :                 case <-tickerCh:
     101            1 :                         if !enabled {
     102            1 :                                 // Could have a tick waiting before we disabled it. Ignore.
     103            1 :                                 continue
     104              :                         }
     105            1 :                         probeDur := func() time.Duration {
     106            1 :                                 // Delete, create, write, sync.
     107            1 :                                 start := p.timeSource.now()
     108            1 :                                 _ = p.fs.Remove(p.filename)
     109            1 :                                 f, err := p.fs.Create(p.filename, "pebble-wal")
     110            1 :                                 if err != nil {
     111            0 :                                         return failedProbeDuration
     112            0 :                                 }
     113            1 :                                 defer f.Close()
     114            1 :                                 n, err := f.Write(p.buf[:])
     115            1 :                                 if err != nil {
     116            0 :                                         return failedProbeDuration
     117            0 :                                 }
     118            1 :                                 if n != len(p.buf) {
     119            0 :                                         panic("invariant violation")
     120              :                                 }
     121            1 :                                 err = f.Sync()
     122            1 :                                 if err != nil {
     123            0 :                                         return failedProbeDuration
     124            0 :                                 }
     125            1 :                                 return p.timeSource.now().Sub(start)
     126              :                         }()
     127            1 :                         p.mu.Lock()
     128            1 :                         nextIndex := p.mu.nextProbeIndex % probeHistoryLength
     129            1 :                         p.mu.history[nextIndex] = probeDur
     130            1 :                         p.mu.nextProbeIndex++
     131            1 :                         numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex
     132            1 :                         if numSamples > probeHistoryLength {
     133            1 :                                 // Length has exceeded capacity, i.e., overwritten the first probe.
     134            1 :                                 p.mu.firstProbeIndex++
     135            1 :                         }
     136            1 :                         p.mu.Unlock()
     137              : 
     138            1 :                 case enabled, shouldContinue = <-p.enabled:
     139            1 :                         if !enabled || !shouldContinue {
     140            1 :                                 ticker.stop()
     141            1 :                                 p.mu.Lock()
     142            1 :                                 p.mu.firstProbeIndex = 0
     143            1 :                                 p.mu.nextProbeIndex = 0
     144            1 :                                 p.mu.Unlock()
     145            1 :                         } else {
     146            1 :                                 ticker.reset(p.interval)
     147            1 :                         }
     148              :                 }
     149            1 :                 if p.iterationForTesting != nil {
     150            0 :                         p.iterationForTesting <- struct{}{}
     151            0 :                 }
     152              :         }
     153              : }
     154              : 
     155            1 : func (p *dirProber) enableProbing() {
     156            1 :         p.enabled <- true
     157            1 : }
     158              : 
     159            1 : func (p *dirProber) disableProbing() {
     160            1 :         p.enabled <- false
     161            1 : }
     162              : 
     163            1 : func (p *dirProber) stop() {
     164            1 :         close(p.enabled)
     165            1 : }
     166              : 
     167            1 : func (p *dirProber) getMeanMax(interval time.Duration) (time.Duration, time.Duration) {
     168            1 :         p.mu.Lock()
     169            1 :         defer p.mu.Unlock()
     170            1 :         numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex
     171            1 :         samplesNeeded := int((interval + p.interval - 1) / p.interval)
     172            1 :         if samplesNeeded == 0 {
     173            0 :                 panic("interval is too short")
     174            1 :         } else if samplesNeeded > probeHistoryLength {
     175            0 :                 panic("interval is too long")
     176              :         }
     177            1 :         if samplesNeeded > numSamples {
     178            1 :                 // Not enough samples, so assume not yet healthy.
     179            1 :                 return failedProbeDuration, failedProbeDuration
     180            1 :         }
     181            1 :         offset := numSamples - samplesNeeded
     182            1 :         var sum, max time.Duration
     183            1 :         for i := p.mu.firstProbeIndex + offset; i < p.mu.nextProbeIndex; i++ {
     184            1 :                 sampleDur := p.mu.history[i%probeHistoryLength]
     185            1 :                 sum += sampleDur
     186            1 :                 if max < sampleDur {
     187            1 :                         max = sampleDur
     188            1 :                 }
     189              :         }
     190            1 :         mean := sum / time.Duration(samplesNeeded)
     191            1 :         return mean, max
     192              : }
     193              : 
     194              : type dirIndex int
     195              : 
     196              : const (
     197              :         primaryDirIndex dirIndex = iota
     198              :         secondaryDirIndex
     199              :         numDirIndices
     200              : )
     201              : 
     202              : type dirAndFileHandle struct {
     203              :         Dir
     204              :         vfs.File
     205              : }
     206              : 
     207              : // switchableWriter is a subset of failoverWriter needed by failoverMonitor.
     208              : type switchableWriter interface {
     209              :         switchToNewDir(dirAndFileHandle) error
     210              :         ongoingLatencyOrErrorForCurDir() (time.Duration, error)
     211              : }
     212              : 
     213              : type failoverMonitorOptions struct {
     214              :         // The primary and secondary dir.
     215              :         dirs [numDirIndices]dirAndFileHandle
     216              : 
     217              :         FailoverOptions
     218              :         stopper *stopper
     219              : }
     220              : 
     221              : // failoverMonitor monitors the latency and error observed by the
     222              : // switchableWriter, and does failover by switching the dir. It also monitors
     223              : // the primary dir for failback.
     224              : type failoverMonitor struct {
     225              :         opts   failoverMonitorOptions
     226              :         prober dirProber
     227              :         mu     struct {
     228              :                 sync.Mutex
     229              :                 // dirIndex and lastFailbackTime are only modified by monitorLoop. They
     230              :                 // are protected by the mutex for concurrent reads.
     231              : 
     232              :                 // dirIndex is the directory to use by writer (if non-nil), or when the
     233              :                 // writer becomes non-nil.
     234              :                 dirIndex
     235              :                 // The time at which the monitor last failed back to the primary. This is
     236              :                 // only relevant when dirIndex == primaryDirIndex.
     237              :                 lastFailBackTime time.Time
     238              :                 // The current failoverWriter, exposed via a narrower interface.
     239              :                 writer switchableWriter
     240              : 
     241              :                 // Stats.
     242              :                 dirSwitchCount              int64
     243              :                 lastAccumulateIntoDurations time.Time
     244              :                 primaryWriteDuration        time.Duration
     245              :                 secondaryWriteDuration      time.Duration
     246              :         }
     247              : }
     248              : 
     249            1 : func newFailoverMonitor(opts failoverMonitorOptions) *failoverMonitor {
     250            1 :         m := &failoverMonitor{
     251            1 :                 opts: opts,
     252            1 :         }
     253            1 :         m.mu.lastAccumulateIntoDurations = opts.timeSource.now()
     254            1 :         m.prober.init(opts.dirs[primaryDirIndex].FS,
     255            1 :                 opts.dirs[primaryDirIndex].FS.PathJoin(opts.dirs[primaryDirIndex].Dirname, "probe-file"),
     256            1 :                 opts.PrimaryDirProbeInterval, opts.stopper, opts.timeSource, opts.proberIterationForTesting)
     257            1 :         opts.stopper.runAsync(func() {
     258            1 :                 m.monitorLoop(opts.stopper.shouldQuiesce())
     259            1 :         })
     260            1 :         return m
     261              : }
     262              : 
     263              : // Called when previous writer is closed
     264            1 : func (m *failoverMonitor) noWriter() {
     265            1 :         m.mu.Lock()
     266            1 :         defer m.mu.Unlock()
     267            1 :         m.mu.writer = nil
     268            1 :         m.accumulateDurationLocked(m.opts.timeSource.now())
     269            1 : }
     270              : 
     271              : // writerCreateFunc is allowed to return nil, if there is an error. It is not
     272              : // the responsibility of failoverMonitor to handle that error. So this should
     273              : // not be due to an IO error (which failoverMonitor is interested in).
     274            1 : func (m *failoverMonitor) newWriter(writerCreateFunc func(dir dirAndFileHandle) switchableWriter) {
     275            1 :         m.mu.Lock()
     276            1 :         defer m.mu.Unlock()
     277            1 :         if m.mu.writer != nil {
     278            0 :                 panic("previous writer not closed")
     279              :         }
     280            1 :         m.mu.writer = writerCreateFunc(m.opts.dirs[m.mu.dirIndex])
     281              : }
     282              : 
     283            1 : func (m *failoverMonitor) elevateWriteStallThresholdForFailover() bool {
     284            1 :         m.mu.Lock()
     285            1 :         defer m.mu.Unlock()
     286            1 :         if m.mu.dirIndex == secondaryDirIndex {
     287            1 :                 return true
     288            1 :         }
     289            1 :         intervalSinceFailedback := m.opts.timeSource.now().Sub(m.mu.lastFailBackTime)
     290            1 :         return intervalSinceFailedback < m.opts.ElevatedWriteStallThresholdLag
     291              : }
     292              : 
     293            1 : func (m *failoverMonitor) accumulateDurationLocked(now time.Time) {
     294            1 :         dur := now.Sub(m.mu.lastAccumulateIntoDurations)
     295            1 :         if invariants.Enabled && dur < 0 {
     296            0 :                 panic(errors.AssertionFailedf("time regressed: last accumulated %s; now is %s",
     297            0 :                         m.mu.lastAccumulateIntoDurations, now))
     298              :         }
     299            1 :         m.mu.lastAccumulateIntoDurations = now
     300            1 :         if m.mu.dirIndex == primaryDirIndex {
     301            1 :                 m.mu.primaryWriteDuration += dur
     302            1 :                 return
     303            1 :         }
     304            1 :         m.mu.secondaryWriteDuration += dur
     305              : }
     306              : 
     307            1 : func (m *failoverMonitor) stats() FailoverStats {
     308            1 :         m.mu.Lock()
     309            1 :         defer m.mu.Unlock()
     310            1 :         m.accumulateDurationLocked(m.opts.timeSource.now())
     311            1 :         return FailoverStats{
     312            1 :                 DirSwitchCount:         m.mu.dirSwitchCount,
     313            1 :                 PrimaryWriteDuration:   m.mu.primaryWriteDuration,
     314            1 :                 SecondaryWriteDuration: m.mu.secondaryWriteDuration,
     315            1 :         }
     316            1 : }
     317              : 
     318              : // lastWriterInfo is state maintained in the monitorLoop for the latest
     319              : // switchable writer. It is mainly used to dampen the switching.
     320              : type lastWriterInfo struct {
     321              :         writer                 switchableWriter
     322              :         numSwitches            int
     323              :         ongoingLatencyAtSwitch time.Duration
     324              :         errorCounts            [numDirIndices]int
     325              : }
     326              : 
     327            1 : func (m *failoverMonitor) monitorLoop(shouldQuiesce <-chan struct{}) {
     328            1 :         ticker := m.opts.timeSource.newTicker(m.opts.UnhealthySamplingInterval)
     329            1 :         if m.opts.monitorIterationForTesting != nil {
     330            0 :                 m.opts.monitorIterationForTesting <- struct{}{}
     331            0 :         }
     332            1 :         tickerCh := ticker.ch()
     333            1 :         dirIndex := primaryDirIndex
     334            1 :         var lastWriter lastWriterInfo
     335            1 :         for {
     336            1 :                 select {
     337            1 :                 case <-shouldQuiesce:
     338            1 :                         ticker.stop()
     339            1 :                         m.prober.stop()
     340            1 :                         return
     341            1 :                 case <-tickerCh:
     342            1 :                         writerOngoingLatency, writerErr := func() (time.Duration, error) {
     343            1 :                                 m.mu.Lock()
     344            1 :                                 defer m.mu.Unlock()
     345            1 :                                 if m.mu.writer != lastWriter.writer {
     346            1 :                                         lastWriter = lastWriterInfo{writer: m.mu.writer}
     347            1 :                                 }
     348            1 :                                 if lastWriter.writer == nil {
     349            1 :                                         return 0, nil
     350            1 :                                 }
     351            1 :                                 return lastWriter.writer.ongoingLatencyOrErrorForCurDir()
     352              :                         }()
     353            1 :                         switchDir := false
     354            1 :                         // Arbitrary value.
     355            1 :                         const highSecondaryErrorCountThreshold = 2
     356            1 :                         // We don't consider a switch if currently using the primary dir and the
     357            1 :                         // secondary dir has high enough errors. It is more likely that someone
     358            1 :                         // has misconfigured a secondary e.g. wrong permissions or not enough
     359            1 :                         // disk space. We only remember the error history in the context of the
     360            1 :                         // lastWriter since an operator can fix the underlying misconfiguration.
     361            1 :                         unhealthyThreshold, failoverEnabled := m.opts.UnhealthyOperationLatencyThreshold()
     362            1 : 
     363            1 :                         if !(lastWriter.errorCounts[secondaryDirIndex] >= highSecondaryErrorCountThreshold &&
     364            1 :                                 dirIndex == primaryDirIndex) && failoverEnabled {
     365            1 :                                 // Switching heuristics. Subject to change based on real world experience.
     366            1 :                                 if writerErr != nil {
     367            0 :                                         // An error causes an immediate switch, since a LogWriter with an
     368            0 :                                         // error is useless.
     369            0 :                                         lastWriter.errorCounts[dirIndex]++
     370            0 :                                         switchDir = true
     371            1 :                                 } else if writerOngoingLatency > unhealthyThreshold {
     372            1 :                                         // Arbitrary value.
     373            1 :                                         const switchImmediatelyCountThreshold = 2
     374            1 :                                         // High latency. Switch immediately if the number of switches that
     375            1 :                                         // have been done is below the threshold, since that gives us an
     376            1 :                                         // observation of both dirs. Once above that threshold, decay the
     377            1 :                                         // switch rate by increasing the observed latency needed for a
     378            1 :                                         // switch.
     379            1 :                                         if lastWriter.numSwitches < switchImmediatelyCountThreshold ||
     380            1 :                                                 writerOngoingLatency > 2*lastWriter.ongoingLatencyAtSwitch {
     381            1 :                                                 switchDir = true
     382            1 :                                                 lastWriter.ongoingLatencyAtSwitch = writerOngoingLatency
     383            1 :                                         }
     384              :                                         // Else high latency, but not high enough yet to motivate switch.
     385            1 :                                 } else if dirIndex == secondaryDirIndex {
     386            1 :                                         // The writer looks healthy. We can still switch if the writer is using the
     387            1 :                                         // secondary dir and the primary is healthy again.
     388            1 :                                         primaryMean, primaryMax := m.prober.getMeanMax(m.opts.HealthyInterval)
     389            1 :                                         if primaryMean < m.opts.HealthyProbeLatencyThreshold &&
     390            1 :                                                 primaryMax < m.opts.HealthyProbeLatencyThreshold {
     391            1 :                                                 switchDir = true
     392            1 :                                         }
     393              :                                 }
     394              :                         }
     395            1 :                         if switchDir {
     396            1 :                                 lastWriter.numSwitches++
     397            1 :                                 if dirIndex == secondaryDirIndex {
     398            1 :                                         // Switching back to primary, so don't need to probe to see if
     399            1 :                                         // primary is healthy.
     400            1 :                                         m.prober.disableProbing()
     401            1 :                                         dirIndex = primaryDirIndex
     402            1 :                                 } else {
     403            1 :                                         m.prober.enableProbing()
     404            1 :                                         dirIndex = secondaryDirIndex
     405            1 :                                 }
     406            1 :                                 dir := m.opts.dirs[dirIndex]
     407            1 :                                 m.mu.Lock()
     408            1 :                                 now := m.opts.timeSource.now()
     409            1 :                                 m.accumulateDurationLocked(now)
     410            1 :                                 m.mu.dirIndex = dirIndex
     411            1 :                                 m.mu.dirSwitchCount++
     412            1 :                                 if dirIndex == primaryDirIndex {
     413            1 :                                         m.mu.lastFailBackTime = now
     414            1 :                                 }
     415            1 :                                 if m.mu.writer != nil {
     416            1 :                                         m.mu.writer.switchToNewDir(dir)
     417            1 :                                 }
     418            1 :                                 m.mu.Unlock()
     419              :                         }
     420              :                 }
     421            1 :                 if m.opts.monitorStateForTesting != nil {
     422            0 :                         m.opts.monitorStateForTesting(lastWriter.numSwitches, lastWriter.ongoingLatencyAtSwitch)
     423            0 :                 }
     424            1 :                 if m.opts.monitorIterationForTesting != nil {
     425            0 :                         m.opts.monitorIterationForTesting <- struct{}{}
     426            0 :                 }
     427              :         }
     428              : }
     429              : 
     430              : type logicalLogWithSizesEtc struct {
     431              :         num      NumWAL
     432              :         segments []segmentWithSizeEtc
     433              : }
     434              : 
     435              : type segmentWithSizeEtc struct {
     436              :         segment
     437              :         approxFileSize      uint64
     438              :         synchronouslyClosed bool
     439              : }
     440              : 
     441              : type failoverManager struct {
     442              :         opts Options
     443              :         // initialObsolete holds the set of DeletableLogs that formed the logs
     444              :         // passed into Init. The initialObsolete logs are all obsolete. Once
     445              :         // returned via Manager.Obsolete, initialObsolete is cleared. The
     446              :         // initialObsolete logs are stored separately from mu.queue because they may
     447              :         // include logs that were NOT created by the standalone manager, and
     448              :         // multiple physical log files may form one logical WAL.
     449              :         initialObsolete []DeletableLog
     450              : 
     451              :         // TODO(jackson/sumeer): read-path etc.
     452              : 
     453              :         dirHandles [numDirIndices]vfs.File
     454              :         stopper    *stopper
     455              :         monitor    *failoverMonitor
     456              :         mu         struct {
     457              :                 sync.Mutex
     458              :                 closedWALs []logicalLogWithSizesEtc
     459              :                 ww         *failoverWriter
     460              :         }
     461              :         recycler LogRecycler
     462              :         // Due to async creation of files in failoverWriter, multiple goroutines can
     463              :         // concurrently try to get a file from the recycler. This mutex protects the
     464              :         // logRecycler.{Peek,Pop} pair.
     465              :         recyclerPeekPopMu sync.Mutex
     466              : }
     467              : 
     468              : var _ Manager = &failoverManager{}
     469              : 
     470              : // TODO(sumeer):
     471              : // - log deletion: if record.LogWriter did not close yet, the cleaner may
     472              : //   get an error when deleting or renaming (only under windows?).
     473              : 
     474              : // init implements Manager.
     475            1 : func (wm *failoverManager) init(o Options, initial Logs) error {
     476            1 :         if o.timeSource == nil {
     477            1 :                 o.timeSource = defaultTime{}
     478            1 :         }
     479            1 :         o.FailoverOptions.EnsureDefaults()
     480            1 : 
     481            1 :         // Synchronously ensure that we're able to write to the secondary before we
     482            1 :         // proceed. An operator doesn't want to encounter an issue writing to the
     483            1 :         // secondary the first time there's a need to failover. We write a bit of
     484            1 :         // metadata to a file in the secondary's directory.
     485            1 :         f, err := o.Secondary.FS.Create(o.Secondary.FS.PathJoin(o.Secondary.Dirname, "failover_source"), "pebble-wal")
     486            1 :         if err != nil {
     487            0 :                 return errors.Newf("failed to write to WAL secondary dir: %v", err)
     488            0 :         }
     489            1 :         if _, err := io.WriteString(f, fmt.Sprintf("primary: %s\nprocess start: %s\n",
     490            1 :                 o.Primary.Dirname,
     491            1 :                 time.Now(),
     492            1 :         )); err != nil {
     493            0 :                 return errors.Newf("failed to write metadata to WAL secondary dir: %v", err)
     494            0 :         }
     495            1 :         if err := errors.CombineErrors(f.Sync(), f.Close()); err != nil {
     496            0 :                 return err
     497            0 :         }
     498              : 
     499            1 :         stopper := newStopper()
     500            1 :         var dirs [numDirIndices]dirAndFileHandle
     501            1 :         for i, dir := range []Dir{o.Primary, o.Secondary} {
     502            1 :                 dirs[i].Dir = dir
     503            1 :                 f, err := dir.FS.OpenDir(dir.Dirname)
     504            1 :                 if err != nil {
     505            0 :                         return err
     506            0 :                 }
     507            1 :                 dirs[i].File = f
     508              :         }
     509            1 :         fmOpts := failoverMonitorOptions{
     510            1 :                 dirs:            dirs,
     511            1 :                 FailoverOptions: o.FailoverOptions,
     512            1 :                 stopper:         stopper,
     513            1 :         }
     514            1 :         monitor := newFailoverMonitor(fmOpts)
     515            1 :         *wm = failoverManager{
     516            1 :                 opts:       o,
     517            1 :                 dirHandles: [numDirIndices]vfs.File{dirs[primaryDirIndex].File, dirs[secondaryDirIndex].File},
     518            1 :                 stopper:    stopper,
     519            1 :                 monitor:    monitor,
     520            1 :         }
     521            1 :         wm.recycler.Init(o.MaxNumRecyclableLogs)
     522            1 :         for _, ll := range initial {
     523            1 :                 if wm.recycler.MinRecycleLogNum() <= ll.Num {
     524            1 :                         wm.recycler.SetMinRecycleLogNum(ll.Num + 1)
     525            1 :                 }
     526            1 :                 var err error
     527            1 :                 wm.initialObsolete, err = appendDeletableLogs(wm.initialObsolete, ll)
     528            1 :                 if err != nil {
     529            0 :                         return err
     530            0 :                 }
     531              :         }
     532            1 :         return nil
     533              : }
     534              : 
     535              : // List implements Manager.
     536            1 : func (wm *failoverManager) List() Logs {
     537            1 :         wm.mu.Lock()
     538            1 :         defer wm.mu.Unlock()
     539            1 :         n := len(wm.mu.closedWALs)
     540            1 :         if wm.mu.ww != nil {
     541            1 :                 n++
     542            1 :         }
     543            1 :         wals := make(Logs, n)
     544            1 :         setLogicalLog := func(index int, llse logicalLogWithSizesEtc) {
     545            1 :                 segments := make([]segment, len(llse.segments))
     546            1 :                 for j := range llse.segments {
     547            1 :                         segments[j] = llse.segments[j].segment
     548            1 :                 }
     549            1 :                 wals[index] = LogicalLog{
     550            1 :                         Num:      llse.num,
     551            1 :                         segments: segments,
     552            1 :                 }
     553              :         }
     554            1 :         for i, llse := range wm.mu.closedWALs {
     555            1 :                 setLogicalLog(i, llse)
     556            1 :         }
     557            1 :         if wm.mu.ww != nil {
     558            1 :                 setLogicalLog(n-1, wm.mu.ww.getLog())
     559            1 :         }
     560            1 :         return wals
     561              : }
     562              : 
     563              : // Obsolete implements Manager.
     564              : func (wm *failoverManager) Obsolete(
     565              :         minUnflushedNum NumWAL, noRecycle bool,
     566            1 : ) (toDelete []DeletableLog, err error) {
     567            1 :         wm.mu.Lock()
     568            1 :         defer wm.mu.Unlock()
     569            1 : 
     570            1 :         // If this is the first call to Obsolete after Open, we may have deletable
     571            1 :         // logs outside the queue.
     572            1 :         toDelete, wm.initialObsolete = wm.initialObsolete, nil
     573            1 : 
     574            1 :         i := 0
     575            1 :         for ; i < len(wm.mu.closedWALs); i++ {
     576            1 :                 ll := wm.mu.closedWALs[i]
     577            1 :                 if ll.num >= minUnflushedNum {
     578            1 :                         break
     579              :                 }
     580              :                 // Recycle only the primary at logNameIndex=0, if there was no failover,
     581              :                 // and synchronously closed. It may not be safe to recycle a file that is
     582              :                 // still being written to. And recycling when there was a failover may
     583              :                 // fill up the recycler with smaller log files. The restriction regarding
     584              :                 // logNameIndex=0 is because logRecycler.Peek only exposes the
     585              :                 // DiskFileNum, and we need to use that to construct the path -- we could
     586              :                 // remove this restriction by changing the logRecycler interface, but we
     587              :                 // don't bother.
     588            1 :                 canRecycle := !noRecycle && len(ll.segments) == 1 && ll.segments[0].synchronouslyClosed &&
     589            1 :                         ll.segments[0].logNameIndex == 0 &&
     590            1 :                         ll.segments[0].dir == wm.opts.Primary
     591            1 :                 if !canRecycle || !wm.recycler.Add(base.FileInfo{
     592            1 :                         FileNum:  base.DiskFileNum(ll.num),
     593            1 :                         FileSize: ll.segments[0].approxFileSize,
     594            1 :                 }) {
     595            1 :                         for _, s := range ll.segments {
     596            1 :                                 toDelete = append(toDelete, DeletableLog{
     597            1 :                                         FS:             s.dir.FS,
     598            1 :                                         Path:           s.dir.FS.PathJoin(s.dir.Dirname, makeLogFilename(ll.num, s.logNameIndex)),
     599            1 :                                         NumWAL:         ll.num,
     600            1 :                                         ApproxFileSize: s.approxFileSize,
     601            1 :                                 })
     602            1 :                         }
     603              :                 }
     604              :         }
     605            1 :         wm.mu.closedWALs = wm.mu.closedWALs[i:]
     606            1 :         return toDelete, nil
     607              : }
     608              : 
     609              : // Create implements Manager.
     610            1 : func (wm *failoverManager) Create(wn NumWAL, jobID int) (Writer, error) {
     611            1 :         func() {
     612            1 :                 wm.mu.Lock()
     613            1 :                 defer wm.mu.Unlock()
     614            1 :                 if wm.mu.ww != nil {
     615            0 :                         panic("previous wal.Writer not closed")
     616              :                 }
     617              :         }()
     618            1 :         fwOpts := failoverWriterOpts{
     619            1 :                 wn:                          wn,
     620            1 :                 logger:                      wm.opts.Logger,
     621            1 :                 timeSource:                  wm.opts.timeSource,
     622            1 :                 jobID:                       jobID,
     623            1 :                 logCreator:                  wm.logCreator,
     624            1 :                 noSyncOnClose:               wm.opts.NoSyncOnClose,
     625            1 :                 bytesPerSync:                wm.opts.BytesPerSync,
     626            1 :                 preallocateSize:             wm.opts.PreallocateSize,
     627            1 :                 minSyncInterval:             wm.opts.MinSyncInterval,
     628            1 :                 fsyncLatency:                wm.opts.FsyncLatency,
     629            1 :                 queueSemChan:                wm.opts.QueueSemChan,
     630            1 :                 stopper:                     wm.stopper,
     631            1 :                 failoverWriteAndSyncLatency: wm.opts.FailoverWriteAndSyncLatency,
     632            1 :                 writerClosed:                wm.writerClosed,
     633            1 :                 writerCreatedForTest:        wm.opts.logWriterCreatedForTesting,
     634            1 :         }
     635            1 :         var err error
     636            1 :         var ww *failoverWriter
     637            1 :         writerCreateFunc := func(dir dirAndFileHandle) switchableWriter {
     638            1 :                 ww, err = newFailoverWriter(fwOpts, dir)
     639            1 :                 if err != nil {
     640            0 :                         return nil
     641            0 :                 }
     642            1 :                 return ww
     643              :         }
     644            1 :         wm.monitor.newWriter(writerCreateFunc)
     645            1 :         if ww != nil {
     646            1 :                 wm.mu.Lock()
     647            1 :                 defer wm.mu.Unlock()
     648            1 :                 wm.mu.ww = ww
     649            1 :         }
     650            1 :         return ww, err
     651              : }
     652              : 
     653              : // ElevateWriteStallThresholdForFailover implements Manager.
     654            1 : func (wm *failoverManager) ElevateWriteStallThresholdForFailover() bool {
     655            1 :         return wm.monitor.elevateWriteStallThresholdForFailover()
     656            1 : }
     657              : 
     658            1 : func (wm *failoverManager) writerClosed(llse logicalLogWithSizesEtc) {
     659            1 :         wm.monitor.noWriter()
     660            1 :         wm.mu.Lock()
     661            1 :         defer wm.mu.Unlock()
     662            1 :         wm.mu.closedWALs = append(wm.mu.closedWALs, llse)
     663            1 :         wm.mu.ww = nil
     664            1 : }
     665              : 
     666              : // Stats implements Manager.
     667            1 : func (wm *failoverManager) Stats() Stats {
     668            1 :         obsoleteLogsCount, obsoleteLogSize := wm.recycler.Stats()
     669            1 :         failoverStats := wm.monitor.stats()
     670            1 :         failoverStats.FailoverWriteAndSyncLatency = wm.opts.FailoverWriteAndSyncLatency
     671            1 :         wm.mu.Lock()
     672            1 :         defer wm.mu.Unlock()
     673            1 :         var liveFileCount int
     674            1 :         var liveFileSize uint64
     675            1 :         updateStats := func(segments []segmentWithSizeEtc) {
     676            1 :                 for _, s := range segments {
     677            1 :                         liveFileCount++
     678            1 :                         liveFileSize += s.approxFileSize
     679            1 :                 }
     680              :         }
     681            1 :         for _, llse := range wm.mu.closedWALs {
     682            1 :                 updateStats(llse.segments)
     683            1 :         }
     684            1 :         if wm.mu.ww != nil {
     685            0 :                 updateStats(wm.mu.ww.getLog().segments)
     686            0 :         }
     687            1 :         for i := range wm.initialObsolete {
     688            0 :                 if i == 0 || wm.initialObsolete[i].NumWAL != wm.initialObsolete[i-1].NumWAL {
     689            0 :                         obsoleteLogsCount++
     690            0 :                 }
     691            0 :                 obsoleteLogSize += wm.initialObsolete[i].ApproxFileSize
     692              :         }
     693            1 :         return Stats{
     694            1 :                 ObsoleteFileCount: obsoleteLogsCount,
     695            1 :                 ObsoleteFileSize:  obsoleteLogSize,
     696            1 :                 LiveFileCount:     liveFileCount,
     697            1 :                 LiveFileSize:      liveFileSize,
     698            1 :                 Failover:          failoverStats,
     699            1 :         }
     700              : }
     701              : 
     702              : // Close implements Manager.
     703            1 : func (wm *failoverManager) Close() error {
     704            1 :         wm.stopper.stop()
     705            1 :         // Since all goroutines are stopped, can close the dirs.
     706            1 :         var err error
     707            1 :         for _, f := range wm.dirHandles {
     708            1 :                 err = firstError(err, f.Close())
     709            1 :         }
     710            1 :         return err
     711              : }
     712              : 
     713              : // RecyclerForTesting implements Manager.
     714            0 : func (wm *failoverManager) RecyclerForTesting() *LogRecycler {
     715            0 :         return nil
     716            0 : }
     717              : 
     718              : // logCreator implements the logCreator func type.
     719              : func (wm *failoverManager) logCreator(
     720              :         dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
     721            1 : ) (logFile vfs.File, initialFileSize uint64, err error) {
     722            1 :         logFilename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li))
     723            1 :         isPrimary := dir == wm.opts.Primary
     724            1 :         // Only recycling when logNameIndex is 0 is a somewhat arbitrary choice.
     725            1 :         considerRecycle := li == 0 && isPrimary
     726            1 :         createInfo := CreateInfo{
     727            1 :                 JobID:       jobID,
     728            1 :                 Path:        logFilename,
     729            1 :                 IsSecondary: !isPrimary,
     730            1 :                 Num:         wn,
     731            1 :                 Err:         nil,
     732            1 :         }
     733            1 :         defer func() {
     734            1 :                 createInfo.Err = err
     735            1 :                 if wm.opts.EventListener != nil {
     736            1 :                         wm.opts.EventListener.LogCreated(createInfo)
     737            1 :                 }
     738              :         }()
     739            1 :         if considerRecycle {
     740            1 :                 // Try to use a recycled log file. Recycling log files is an important
     741            1 :                 // performance optimization as it is faster to sync a file that has
     742            1 :                 // already been written, than one which is being written for the first
     743            1 :                 // time. This is due to the need to sync file metadata when a file is
     744            1 :                 // being written for the first time. Note this is true even if file
     745            1 :                 // preallocation is performed (e.g. fallocate).
     746            1 :                 var recycleLog base.FileInfo
     747            1 :                 var recycleOK bool
     748            1 :                 func() {
     749            1 :                         wm.recyclerPeekPopMu.Lock()
     750            1 :                         defer wm.recyclerPeekPopMu.Unlock()
     751            1 :                         recycleLog, recycleOK = wm.recycler.Peek()
     752            1 :                         if recycleOK {
     753            0 :                                 if err = wm.recycler.Pop(recycleLog.FileNum); err != nil {
     754            0 :                                         panic(err)
     755              :                                 }
     756              :                         }
     757              :                 }()
     758            1 :                 if recycleOK {
     759            0 :                         createInfo.RecycledFileNum = recycleLog.FileNum
     760            0 :                         recycleLogName := dir.FS.PathJoin(dir.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0))
     761            0 :                         r.writeStart()
     762            0 :                         logFile, err = dir.FS.ReuseForWrite(recycleLogName, logFilename, "pebble-wal")
     763            0 :                         r.writeEnd(err)
     764            0 :                         // TODO(sumeer): should we fatal since primary dir? At some point it is
     765            0 :                         // better to fatal instead of continuing to failover.
     766            0 :                         // base.MustExist(dir.FS, logFilename, wm.opts.Logger, err)
     767            0 :                         if err != nil {
     768            0 :                                 // TODO(sumeer): we have popped from the logRecycler, which is
     769            0 :                                 // arguably correct, since we don't want to keep trying to reuse a log
     770            0 :                                 // that causes some error. But the original or new file may exist, and
     771            0 :                                 // no one will clean it up unless the process restarts.
     772            0 :                                 return nil, 0, err
     773            0 :                         }
     774              :                         // Figure out the recycled WAL size. This Stat is necessary because
     775              :                         // ReuseForWrite's contract allows for removing the old file and
     776              :                         // creating a new one. We don't know whether the WAL was actually
     777              :                         // recycled.
     778              :                         //
     779              :                         // TODO(jackson): Adding a boolean to the ReuseForWrite return value
     780              :                         // indicating whether or not the file was actually reused would allow us
     781              :                         // to skip the stat and use recycleLog.FileSize.
     782            0 :                         var finfo os.FileInfo
     783            0 :                         finfo, err = logFile.Stat()
     784            0 :                         if err != nil {
     785            0 :                                 logFile.Close()
     786            0 :                                 return nil, 0, err
     787            0 :                         }
     788            0 :                         initialFileSize = uint64(finfo.Size())
     789            0 :                         return logFile, initialFileSize, nil
     790              :                 }
     791              :         }
     792              :         // Did not recycle.
     793              :         //
     794              :         // Create file.
     795            1 :         r.writeStart()
     796            1 :         logFile, err = dir.FS.Create(logFilename, "pebble-wal")
     797            1 :         r.writeEnd(err)
     798            1 :         return logFile, 0, err
     799              : }
     800              : 
     801              : type stopper struct {
     802              :         quiescer chan struct{} // Closed when quiescing
     803              :         wg       sync.WaitGroup
     804              : }
     805              : 
     806            1 : func newStopper() *stopper {
     807            1 :         return &stopper{
     808            1 :                 quiescer: make(chan struct{}),
     809            1 :         }
     810            1 : }
     811              : 
     812            1 : func (s *stopper) runAsync(f func()) {
     813            1 :         s.wg.Add(1)
     814            1 :         go func() {
     815            1 :                 f()
     816            1 :                 s.wg.Done()
     817            1 :         }()
     818              : }
     819              : 
     820              : // shouldQuiesce returns a channel which will be closed when stop() has been
     821              : // invoked and outstanding goroutines should begin to quiesce.
     822            1 : func (s *stopper) shouldQuiesce() <-chan struct{} {
     823            1 :         return s.quiescer
     824            1 : }
     825              : 
     826            1 : func (s *stopper) stop() {
     827            1 :         close(s.quiescer)
     828            1 :         s.wg.Wait()
     829            1 : }
     830              : 
     831              : // timeSource and tickerI are extracted from CockroachDB's timeutil, with
     832              : // removal of support for Timer, and added support in the manual
     833              : // implementation for reset.
     834              : 
     835              : // timeSource is used to interact with the clock and tickers. Abstracts
     836              : // time.Now and time.NewTicker for testing.
     837              : type timeSource interface {
     838              :         now() time.Time
     839              :         newTicker(duration time.Duration) tickerI
     840              : }
     841              : 
     842              : // tickerI is an interface wrapping time.Ticker.
     843              : type tickerI interface {
     844              :         reset(duration time.Duration)
     845              :         stop()
     846              :         ch() <-chan time.Time
     847              : }
     848              : 
     849              : // defaultTime is a timeSource using the time package.
     850              : type defaultTime struct{}
     851              : 
     852              : var _ timeSource = defaultTime{}
     853              : 
     854            1 : func (defaultTime) now() time.Time {
     855            1 :         return time.Now()
     856            1 : }
     857              : 
     858            1 : func (defaultTime) newTicker(duration time.Duration) tickerI {
     859            1 :         return (*defaultTicker)(time.NewTicker(duration))
     860            1 : }
     861              : 
     862              : // defaultTicker uses time.Ticker.
     863              : type defaultTicker time.Ticker
     864              : 
     865              : var _ tickerI = &defaultTicker{}
     866              : 
     867            1 : func (t *defaultTicker) reset(duration time.Duration) {
     868            1 :         (*time.Ticker)(t).Reset(duration)
     869            1 : }
     870              : 
     871            1 : func (t *defaultTicker) stop() {
     872            1 :         (*time.Ticker)(t).Stop()
     873            1 : }
     874              : 
     875            1 : func (t *defaultTicker) ch() <-chan time.Time {
     876            1 :         return (*time.Ticker)(t).C
     877            1 : }
     878              : 
     879              : // Make lint happy.
     880              : var _ = (*failoverMonitor).noWriter
     881              : var _ = (*failoverManager).writerClosed
     882              : var _ = (&stopper{}).shouldQuiesce
        

Generated by: LCOV version 2.0-1