LCOV - code coverage report
Current view: top level - pebble/wal - failover_manager.go (source / functions) Hit Total Coverage
Test: 2024-05-27 08:16Z 542915b2 - tests + meta.lcov Lines: 507 540 93.9 %
Date: 2024-05-27 08:16:58 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package wal
       6             : 
       7             : import (
       8             :         "fmt"
       9             :         "io"
      10             :         "os"
      11             :         "sync"
      12             :         "time"
      13             : 
      14             :         "github.com/cockroachdb/errors"
      15             :         "github.com/cockroachdb/pebble/internal/base"
      16             :         "github.com/cockroachdb/pebble/vfs"
      17             :         "golang.org/x/exp/rand"
      18             : )
      19             : 
      20             : // dirProber probes the primary dir, until it is confirmed to be healthy. If
      21             : // it doesn't have enough samples, it is deemed to be unhealthy. It is only
      22             : // used for failback to the primary.
      23             : type dirProber struct {
      24             :         fs vfs.FS
      25             :         // The full path of the file to use for the probe. The probe is destructive
      26             :         // in that it deletes and (re)creates the file.
      27             :         filename string
      28             :         // The probing interval, when enabled.
      29             :         interval time.Duration
      30             :         timeSource
      31             :         // buf holds the random bytes that are written during the probe.
      32             :         buf [100 << 10]byte
      33             :         // enabled is signaled to enable and disable probing. The initial state of
      34             :         // the prober is disabled.
      35             :         enabled chan bool
      36             :         mu      struct {
      37             :                 sync.Mutex
      38             :                 // Circular buffer of history samples.
      39             :                 history [probeHistoryLength]time.Duration
      40             :                 // The history is in [firstProbeIndex, nextProbeIndex).
      41             :                 firstProbeIndex int
      42             :                 nextProbeIndex  int
      43             :         }
      44             :         iterationForTesting chan<- struct{}
      45             : }
      46             : 
      47             : const probeHistoryLength = 128
      48             : 
      49             : // Large value.
      50             : const failedProbeDuration = 24 * 60 * 60 * time.Second
      51             : 
      52             : // init takes a stopper in order to connect the dirProber's long-running
      53             : // goroutines with the stopper's wait group, but the dirProber has its own
      54             : // stop() method that should be invoked to trigger the shutdown.
      55             : func (p *dirProber) init(
      56             :         fs vfs.FS,
      57             :         filename string,
      58             :         interval time.Duration,
      59             :         stopper *stopper,
      60             :         ts timeSource,
      61             :         notifyIterationForTesting chan<- struct{},
      62           2 : ) {
      63           2 :         *p = dirProber{
      64           2 :                 fs:                  fs,
      65           2 :                 filename:            filename,
      66           2 :                 interval:            interval,
      67           2 :                 timeSource:          ts,
      68           2 :                 enabled:             make(chan bool),
      69           2 :                 iterationForTesting: notifyIterationForTesting,
      70           2 :         }
      71           2 :         // Random bytes for writing, to defeat any FS compression optimization.
      72           2 :         _, err := rand.Read(p.buf[:])
      73           2 :         if err != nil {
      74           0 :                 panic(err)
      75             :         }
      76             :         // dirProber has an explicit stop() method instead of listening on
      77             :         // stopper.shouldQuiesce. This structure helps negotiate the shutdown
      78             :         // between failoverMonitor and dirProber. If the dirProber independently
      79             :         // listened to shouldQuiesce, it might exit before the failover monitor. The
      80             :         // [enable|disable]Probing methods require sending on a channel expecting
      81             :         // that the dirProber goroutine will receive on the same channel. If the
      82             :         // dirProber noticed the queiscence and exited first, the failoverMonitor
      83             :         // could deadlock waiting for a goroutine that no longer exists.
      84             :         //
      85             :         // Instead, shutdown of the dirProber is coordinated by the failoverMonitor:
      86             :         //   - the failoverMonitor listens to stopper.shouldQuiesce.
      87             :         //   - when the failoverMonitor is exiting, it will no longer attempt to
      88             :         //   interact with the dirProber. Only then does it invoke dirProber.stop.
      89           2 :         stopper.runAsync(p.probeLoop)
      90             : }
      91             : 
      92           2 : func (p *dirProber) probeLoop() {
      93           2 :         ticker := p.timeSource.newTicker(p.interval)
      94           2 :         ticker.stop()
      95           2 :         tickerCh := ticker.ch()
      96           2 :         shouldContinue := true
      97           2 :         var enabled bool
      98           2 :         for shouldContinue {
      99           2 :                 select {
     100           2 :                 case <-tickerCh:
     101           2 :                         if !enabled {
     102           2 :                                 // Could have a tick waiting before we disabled it. Ignore.
     103           2 :                                 continue
     104             :                         }
     105           2 :                         probeDur := func() time.Duration {
     106           2 :                                 // Delete, create, write, sync.
     107           2 :                                 start := p.timeSource.now()
     108           2 :                                 _ = p.fs.Remove(p.filename)
     109           2 :                                 f, err := p.fs.Create(p.filename, "pebble-wal")
     110           2 :                                 if err != nil {
     111           1 :                                         return failedProbeDuration
     112           1 :                                 }
     113           2 :                                 defer f.Close()
     114           2 :                                 n, err := f.Write(p.buf[:])
     115           2 :                                 if err != nil {
     116           0 :                                         return failedProbeDuration
     117           0 :                                 }
     118           2 :                                 if n != len(p.buf) {
     119           0 :                                         panic("invariant violation")
     120             :                                 }
     121           2 :                                 err = f.Sync()
     122           2 :                                 if err != nil {
     123           0 :                                         return failedProbeDuration
     124           0 :                                 }
     125           2 :                                 return p.timeSource.now().Sub(start)
     126             :                         }()
     127           2 :                         p.mu.Lock()
     128           2 :                         nextIndex := p.mu.nextProbeIndex % probeHistoryLength
     129           2 :                         p.mu.history[nextIndex] = probeDur
     130           2 :                         p.mu.nextProbeIndex++
     131           2 :                         numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex
     132           2 :                         if numSamples > probeHistoryLength {
     133           1 :                                 // Length has exceeded capacity, i.e., overwritten the first probe.
     134           1 :                                 p.mu.firstProbeIndex++
     135           1 :                         }
     136           2 :                         p.mu.Unlock()
     137             : 
     138           2 :                 case enabled, shouldContinue = <-p.enabled:
     139           2 :                         if !enabled || !shouldContinue {
     140           2 :                                 ticker.stop()
     141           2 :                                 p.mu.Lock()
     142           2 :                                 p.mu.firstProbeIndex = 0
     143           2 :                                 p.mu.nextProbeIndex = 0
     144           2 :                                 p.mu.Unlock()
     145           2 :                         } else {
     146           2 :                                 ticker.reset(p.interval)
     147           2 :                         }
     148             :                 }
     149           2 :                 if p.iterationForTesting != nil {
     150           1 :                         p.iterationForTesting <- struct{}{}
     151           1 :                 }
     152             :         }
     153             : }
     154             : 
     155           2 : func (p *dirProber) enableProbing() {
     156           2 :         p.enabled <- true
     157           2 : }
     158             : 
     159           2 : func (p *dirProber) disableProbing() {
     160           2 :         p.enabled <- false
     161           2 : }
     162             : 
     163           2 : func (p *dirProber) stop() {
     164           2 :         close(p.enabled)
     165           2 : }
     166             : 
     167           2 : func (p *dirProber) getMeanMax(interval time.Duration) (time.Duration, time.Duration) {
     168           2 :         p.mu.Lock()
     169           2 :         defer p.mu.Unlock()
     170           2 :         numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex
     171           2 :         samplesNeeded := int((interval + p.interval - 1) / p.interval)
     172           2 :         if samplesNeeded == 0 {
     173           0 :                 panic("interval is too short")
     174           2 :         } else if samplesNeeded > probeHistoryLength {
     175           0 :                 panic("interval is too long")
     176             :         }
     177           2 :         if samplesNeeded > numSamples {
     178           2 :                 // Not enough samples, so assume not yet healthy.
     179           2 :                 return failedProbeDuration, failedProbeDuration
     180           2 :         }
     181           2 :         offset := numSamples - samplesNeeded
     182           2 :         var sum, max time.Duration
     183           2 :         for i := p.mu.firstProbeIndex + offset; i < p.mu.nextProbeIndex; i++ {
     184           2 :                 sampleDur := p.mu.history[i%probeHistoryLength]
     185           2 :                 sum += sampleDur
     186           2 :                 if max < sampleDur {
     187           2 :                         max = sampleDur
     188           2 :                 }
     189             :         }
     190           2 :         mean := sum / time.Duration(samplesNeeded)
     191           2 :         return mean, max
     192             : }
     193             : 
     194             : type dirIndex int
     195             : 
     196             : const (
     197             :         primaryDirIndex dirIndex = iota
     198             :         secondaryDirIndex
     199             :         numDirIndices
     200             : )
     201             : 
     202             : type dirAndFileHandle struct {
     203             :         Dir
     204             :         vfs.File
     205             : }
     206             : 
     207             : // switchableWriter is a subset of failoverWriter needed by failoverMonitor.
     208             : type switchableWriter interface {
     209             :         switchToNewDir(dirAndFileHandle) error
     210             :         ongoingLatencyOrErrorForCurDir() (time.Duration, error)
     211             : }
     212             : 
     213             : type failoverMonitorOptions struct {
     214             :         // The primary and secondary dir.
     215             :         dirs [numDirIndices]dirAndFileHandle
     216             : 
     217             :         FailoverOptions
     218             :         stopper *stopper
     219             : }
     220             : 
     221             : // failoverMonitor monitors the latency and error observed by the
     222             : // switchableWriter, and does failover by switching the dir. It also monitors
     223             : // the primary dir for failback.
     224             : type failoverMonitor struct {
     225             :         opts   failoverMonitorOptions
     226             :         prober dirProber
     227             :         mu     struct {
     228             :                 sync.Mutex
     229             :                 // dirIndex and lastFailbackTime are only modified by monitorLoop. They
     230             :                 // are protected by the mutex for concurrent reads.
     231             : 
     232             :                 // dirIndex is the directory to use by writer (if non-nil), or when the
     233             :                 // writer becomes non-nil.
     234             :                 dirIndex
     235             :                 // The time at which the monitor last failed back to the primary. This is
     236             :                 // only relevant when dirIndex == primaryDirIndex.
     237             :                 lastFailBackTime time.Time
     238             :                 // The current failoverWriter, exposed via a narrower interface.
     239             :                 writer switchableWriter
     240             : 
     241             :                 // Stats.
     242             :                 dirSwitchCount              int64
     243             :                 lastAccumulateIntoDurations time.Time
     244             :                 primaryWriteDuration        time.Duration
     245             :                 secondaryWriteDuration      time.Duration
     246             :         }
     247             : }
     248             : 
     249           2 : func newFailoverMonitor(opts failoverMonitorOptions) *failoverMonitor {
     250           2 :         m := &failoverMonitor{
     251           2 :                 opts: opts,
     252           2 :         }
     253           2 :         m.mu.lastAccumulateIntoDurations = opts.timeSource.now()
     254           2 :         m.prober.init(opts.dirs[primaryDirIndex].FS,
     255           2 :                 opts.dirs[primaryDirIndex].FS.PathJoin(opts.dirs[primaryDirIndex].Dirname, "probe-file"),
     256           2 :                 opts.PrimaryDirProbeInterval, opts.stopper, opts.timeSource, opts.proberIterationForTesting)
     257           2 :         opts.stopper.runAsync(func() {
     258           2 :                 m.monitorLoop(opts.stopper.shouldQuiesce())
     259           2 :         })
     260           2 :         return m
     261             : }
     262             : 
     263             : // Called when previous writer is closed
     264           2 : func (m *failoverMonitor) noWriter() {
     265           2 :         now := m.opts.timeSource.now()
     266           2 :         m.mu.Lock()
     267           2 :         defer m.mu.Unlock()
     268           2 :         m.mu.writer = nil
     269           2 :         m.accumulateDurationLocked(now)
     270           2 : }
     271             : 
     272             : // writerCreateFunc is allowed to return nil, if there is an error. It is not
     273             : // the responsibility of failoverMonitor to handle that error. So this should
     274             : // not be due to an IO error (which failoverMonitor is interested in).
     275           2 : func (m *failoverMonitor) newWriter(writerCreateFunc func(dir dirAndFileHandle) switchableWriter) {
     276           2 :         m.mu.Lock()
     277           2 :         defer m.mu.Unlock()
     278           2 :         if m.mu.writer != nil {
     279           0 :                 panic("previous writer not closed")
     280             :         }
     281           2 :         m.mu.writer = writerCreateFunc(m.opts.dirs[m.mu.dirIndex])
     282             : }
     283             : 
     284           2 : func (m *failoverMonitor) elevateWriteStallThresholdForFailover() bool {
     285           2 :         m.mu.Lock()
     286           2 :         defer m.mu.Unlock()
     287           2 :         if m.mu.dirIndex == secondaryDirIndex {
     288           2 :                 return true
     289           2 :         }
     290           2 :         intervalSinceFailedback := m.opts.timeSource.now().Sub(m.mu.lastFailBackTime)
     291           2 :         return intervalSinceFailedback < m.opts.ElevatedWriteStallThresholdLag
     292             : }
     293             : 
     294           2 : func (m *failoverMonitor) accumulateDurationLocked(now time.Time) {
     295           2 :         dur := now.Sub(m.mu.lastAccumulateIntoDurations)
     296           2 :         m.mu.lastAccumulateIntoDurations = now
     297           2 :         if m.mu.dirIndex == primaryDirIndex {
     298           2 :                 m.mu.primaryWriteDuration += dur
     299           2 :                 return
     300           2 :         }
     301           2 :         m.mu.secondaryWriteDuration += dur
     302             : }
     303             : 
     304           2 : func (m *failoverMonitor) stats() FailoverStats {
     305           2 :         now := m.opts.timeSource.now()
     306           2 :         m.mu.Lock()
     307           2 :         defer m.mu.Unlock()
     308           2 :         m.accumulateDurationLocked(now)
     309           2 :         return FailoverStats{
     310           2 :                 DirSwitchCount:         m.mu.dirSwitchCount,
     311           2 :                 PrimaryWriteDuration:   m.mu.primaryWriteDuration,
     312           2 :                 SecondaryWriteDuration: m.mu.secondaryWriteDuration,
     313           2 :         }
     314           2 : }
     315             : 
     316             : // lastWriterInfo is state maintained in the monitorLoop for the latest
     317             : // switchable writer. It is mainly used to dampen the switching.
     318             : type lastWriterInfo struct {
     319             :         writer                 switchableWriter
     320             :         numSwitches            int
     321             :         ongoingLatencyAtSwitch time.Duration
     322             :         errorCounts            [numDirIndices]int
     323             : }
     324             : 
     325           2 : func (m *failoverMonitor) monitorLoop(shouldQuiesce <-chan struct{}) {
     326           2 :         ticker := m.opts.timeSource.newTicker(m.opts.UnhealthySamplingInterval)
     327           2 :         if m.opts.monitorIterationForTesting != nil {
     328           1 :                 m.opts.monitorIterationForTesting <- struct{}{}
     329           1 :         }
     330           2 :         tickerCh := ticker.ch()
     331           2 :         dirIndex := primaryDirIndex
     332           2 :         var lastWriter lastWriterInfo
     333           2 :         for {
     334           2 :                 select {
     335           2 :                 case <-shouldQuiesce:
     336           2 :                         ticker.stop()
     337           2 :                         m.prober.stop()
     338           2 :                         return
     339           2 :                 case <-tickerCh:
     340           2 :                         writerOngoingLatency, writerErr := func() (time.Duration, error) {
     341           2 :                                 m.mu.Lock()
     342           2 :                                 defer m.mu.Unlock()
     343           2 :                                 if m.mu.writer != lastWriter.writer {
     344           2 :                                         lastWriter = lastWriterInfo{writer: m.mu.writer}
     345           2 :                                 }
     346           2 :                                 if lastWriter.writer == nil {
     347           2 :                                         return 0, nil
     348           2 :                                 }
     349           2 :                                 return lastWriter.writer.ongoingLatencyOrErrorForCurDir()
     350             :                         }()
     351           2 :                         switchDir := false
     352           2 :                         // Arbitrary value.
     353           2 :                         const highSecondaryErrorCountThreshold = 2
     354           2 :                         // We don't consider a switch if currently using the primary dir and the
     355           2 :                         // secondary dir has high enough errors. It is more likely that someone
     356           2 :                         // has misconfigured a secondary e.g. wrong permissions or not enough
     357           2 :                         // disk space. We only remember the error history in the context of the
     358           2 :                         // lastWriter since an operator can fix the underlying misconfiguration.
     359           2 :                         unhealthyThreshold, failoverEnabled := m.opts.UnhealthyOperationLatencyThreshold()
     360           2 : 
     361           2 :                         if !(lastWriter.errorCounts[secondaryDirIndex] >= highSecondaryErrorCountThreshold &&
     362           2 :                                 dirIndex == primaryDirIndex) && failoverEnabled {
     363           2 :                                 // Switching heuristics. Subject to change based on real world experience.
     364           2 :                                 if writerErr != nil {
     365           1 :                                         // An error causes an immediate switch, since a LogWriter with an
     366           1 :                                         // error is useless.
     367           1 :                                         lastWriter.errorCounts[dirIndex]++
     368           1 :                                         switchDir = true
     369           2 :                                 } else if writerOngoingLatency > unhealthyThreshold {
     370           2 :                                         // Arbitrary value.
     371           2 :                                         const switchImmediatelyCountThreshold = 2
     372           2 :                                         // High latency. Switch immediately if the number of switches that
     373           2 :                                         // have been done is below the threshold, since that gives us an
     374           2 :                                         // observation of both dirs. Once above that threshold, decay the
     375           2 :                                         // switch rate by increasing the observed latency needed for a
     376           2 :                                         // switch.
     377           2 :                                         if lastWriter.numSwitches < switchImmediatelyCountThreshold ||
     378           2 :                                                 writerOngoingLatency > 2*lastWriter.ongoingLatencyAtSwitch {
     379           2 :                                                 switchDir = true
     380           2 :                                                 lastWriter.ongoingLatencyAtSwitch = writerOngoingLatency
     381           2 :                                         }
     382             :                                         // Else high latency, but not high enough yet to motivate switch.
     383           2 :                                 } else if dirIndex == secondaryDirIndex {
     384           2 :                                         // The writer looks healthy. We can still switch if the writer is using the
     385           2 :                                         // secondary dir and the primary is healthy again.
     386           2 :                                         primaryMean, primaryMax := m.prober.getMeanMax(m.opts.HealthyInterval)
     387           2 :                                         if primaryMean < m.opts.HealthyProbeLatencyThreshold &&
     388           2 :                                                 primaryMax < m.opts.HealthyProbeLatencyThreshold {
     389           2 :                                                 switchDir = true
     390           2 :                                         }
     391             :                                 }
     392             :                         }
     393           2 :                         if switchDir {
     394           2 :                                 lastWriter.numSwitches++
     395           2 :                                 if dirIndex == secondaryDirIndex {
     396           2 :                                         // Switching back to primary, so don't need to probe to see if
     397           2 :                                         // primary is healthy.
     398           2 :                                         m.prober.disableProbing()
     399           2 :                                         dirIndex = primaryDirIndex
     400           2 :                                 } else {
     401           2 :                                         m.prober.enableProbing()
     402           2 :                                         dirIndex = secondaryDirIndex
     403           2 :                                 }
     404           2 :                                 dir := m.opts.dirs[dirIndex]
     405           2 :                                 now := m.opts.timeSource.now()
     406           2 :                                 m.mu.Lock()
     407           2 :                                 m.accumulateDurationLocked(now)
     408           2 :                                 m.mu.dirIndex = dirIndex
     409           2 :                                 m.mu.dirSwitchCount++
     410           2 :                                 if dirIndex == primaryDirIndex {
     411           2 :                                         m.mu.lastFailBackTime = now
     412           2 :                                 }
     413           2 :                                 if m.mu.writer != nil {
     414           2 :                                         m.mu.writer.switchToNewDir(dir)
     415           2 :                                 }
     416           2 :                                 m.mu.Unlock()
     417             :                         }
     418             :                 }
     419           2 :                 if m.opts.monitorStateForTesting != nil {
     420           1 :                         m.opts.monitorStateForTesting(lastWriter.numSwitches, lastWriter.ongoingLatencyAtSwitch)
     421           1 :                 }
     422           2 :                 if m.opts.monitorIterationForTesting != nil {
     423           1 :                         m.opts.monitorIterationForTesting <- struct{}{}
     424           1 :                 }
     425             :         }
     426             : }
     427             : 
     428             : type logicalLogWithSizesEtc struct {
     429             :         num      NumWAL
     430             :         segments []segmentWithSizeEtc
     431             : }
     432             : 
     433             : type segmentWithSizeEtc struct {
     434             :         segment
     435             :         approxFileSize      uint64
     436             :         synchronouslyClosed bool
     437             : }
     438             : 
     439             : type failoverManager struct {
     440             :         opts Options
     441             :         // initialObsolete holds the set of DeletableLogs that formed the logs
     442             :         // passed into Init. The initialObsolete logs are all obsolete. Once
     443             :         // returned via Manager.Obsolete, initialObsolete is cleared. The
     444             :         // initialObsolete logs are stored separately from mu.queue because they may
     445             :         // include logs that were NOT created by the standalone manager, and
     446             :         // multiple physical log files may form one logical WAL.
     447             :         initialObsolete []DeletableLog
     448             : 
     449             :         // TODO(jackson/sumeer): read-path etc.
     450             : 
     451             :         dirHandles [numDirIndices]vfs.File
     452             :         stopper    *stopper
     453             :         monitor    *failoverMonitor
     454             :         mu         struct {
     455             :                 sync.Mutex
     456             :                 closedWALs []logicalLogWithSizesEtc
     457             :                 ww         *failoverWriter
     458             :         }
     459             :         recycler LogRecycler
     460             :         // Due to async creation of files in failoverWriter, multiple goroutines can
     461             :         // concurrently try to get a file from the recycler. This mutex protects the
     462             :         // logRecycler.{Peek,Pop} pair.
     463             :         recyclerPeekPopMu sync.Mutex
     464             : }
     465             : 
     466             : var _ Manager = &failoverManager{}
     467             : 
     468             : // TODO(sumeer):
     469             : // - log deletion: if record.LogWriter did not close yet, the cleaner may
     470             : //   get an error when deleting or renaming (only under windows?).
     471             : 
     472             : // init implements Manager.
     473           2 : func (wm *failoverManager) init(o Options, initial Logs) error {
     474           2 :         if o.timeSource == nil {
     475           2 :                 o.timeSource = defaultTime{}
     476           2 :         }
     477           2 :         o.FailoverOptions.EnsureDefaults()
     478           2 : 
     479           2 :         // Synchronously ensure that we're able to write to the secondary before we
     480           2 :         // proceed. An operator doesn't want to encounter an issue writing to the
     481           2 :         // secondary the first time there's a need to failover. We write a bit of
     482           2 :         // metadata to a file in the secondary's directory.
     483           2 :         f, err := o.Secondary.FS.Create(o.Secondary.FS.PathJoin(o.Secondary.Dirname, "failover_source"), "pebble-wal")
     484           2 :         if err != nil {
     485           1 :                 return errors.Newf("failed to write to WAL secondary dir: %v", err)
     486           1 :         }
     487           2 :         if _, err := io.WriteString(f, fmt.Sprintf("primary: %s\nprocess start: %s\n",
     488           2 :                 o.Primary.Dirname,
     489           2 :                 time.Now(),
     490           2 :         )); err != nil {
     491           0 :                 return errors.Newf("failed to write metadata to WAL secondary dir: %v", err)
     492           0 :         }
     493           2 :         if err := errors.CombineErrors(f.Sync(), f.Close()); err != nil {
     494           0 :                 return err
     495           0 :         }
     496             : 
     497           2 :         stopper := newStopper()
     498           2 :         var dirs [numDirIndices]dirAndFileHandle
     499           2 :         for i, dir := range []Dir{o.Primary, o.Secondary} {
     500           2 :                 dirs[i].Dir = dir
     501           2 :                 f, err := dir.FS.OpenDir(dir.Dirname)
     502           2 :                 if err != nil {
     503           0 :                         return err
     504           0 :                 }
     505           2 :                 dirs[i].File = f
     506             :         }
     507           2 :         fmOpts := failoverMonitorOptions{
     508           2 :                 dirs:            dirs,
     509           2 :                 FailoverOptions: o.FailoverOptions,
     510           2 :                 stopper:         stopper,
     511           2 :         }
     512           2 :         monitor := newFailoverMonitor(fmOpts)
     513           2 :         *wm = failoverManager{
     514           2 :                 opts:       o,
     515           2 :                 dirHandles: [numDirIndices]vfs.File{dirs[primaryDirIndex].File, dirs[secondaryDirIndex].File},
     516           2 :                 stopper:    stopper,
     517           2 :                 monitor:    monitor,
     518           2 :         }
     519           2 :         wm.recycler.Init(o.MaxNumRecyclableLogs)
     520           2 :         for _, ll := range initial {
     521           2 :                 if wm.recycler.MinRecycleLogNum() <= ll.Num {
     522           2 :                         wm.recycler.SetMinRecycleLogNum(ll.Num + 1)
     523           2 :                 }
     524           2 :                 var err error
     525           2 :                 wm.initialObsolete, err = appendDeletableLogs(wm.initialObsolete, ll)
     526           2 :                 if err != nil {
     527           0 :                         return err
     528           0 :                 }
     529             :         }
     530           2 :         return nil
     531             : }
     532             : 
     533             : // List implements Manager.
     534           2 : func (wm *failoverManager) List() (Logs, error) {
     535           2 :         wm.mu.Lock()
     536           2 :         defer wm.mu.Unlock()
     537           2 :         n := len(wm.mu.closedWALs)
     538           2 :         if wm.mu.ww != nil {
     539           2 :                 n++
     540           2 :         }
     541           2 :         wals := make(Logs, n)
     542           2 :         setLogicalLog := func(index int, llse logicalLogWithSizesEtc) {
     543           2 :                 segments := make([]segment, len(llse.segments))
     544           2 :                 for j := range llse.segments {
     545           2 :                         segments[j] = llse.segments[j].segment
     546           2 :                 }
     547           2 :                 wals[index] = LogicalLog{
     548           2 :                         Num:      llse.num,
     549           2 :                         segments: segments,
     550           2 :                 }
     551             :         }
     552           2 :         for i, llse := range wm.mu.closedWALs {
     553           2 :                 setLogicalLog(i, llse)
     554           2 :         }
     555           2 :         if wm.mu.ww != nil {
     556           2 :                 setLogicalLog(n-1, wm.mu.ww.getLog())
     557           2 :         }
     558           2 :         return wals, nil
     559             : }
     560             : 
     561             : // Obsolete implements Manager.
     562             : func (wm *failoverManager) Obsolete(
     563             :         minUnflushedNum NumWAL, noRecycle bool,
     564           2 : ) (toDelete []DeletableLog, err error) {
     565           2 :         wm.mu.Lock()
     566           2 :         defer wm.mu.Unlock()
     567           2 : 
     568           2 :         // If this is the first call to Obsolete after Open, we may have deletable
     569           2 :         // logs outside the queue.
     570           2 :         toDelete, wm.initialObsolete = wm.initialObsolete, nil
     571           2 : 
     572           2 :         i := 0
     573           2 :         for ; i < len(wm.mu.closedWALs); i++ {
     574           2 :                 ll := wm.mu.closedWALs[i]
     575           2 :                 if ll.num >= minUnflushedNum {
     576           2 :                         break
     577             :                 }
     578             :                 // Recycle only the primary at logNameIndex=0, if there was no failover,
     579             :                 // and synchronously closed. It may not be safe to recycle a file that is
     580             :                 // still being written to. And recycling when there was a failover may
     581             :                 // fill up the recycler with smaller log files. The restriction regarding
     582             :                 // logNameIndex=0 is because logRecycler.Peek only exposes the
     583             :                 // DiskFileNum, and we need to use that to construct the path -- we could
     584             :                 // remove this restriction by changing the logRecycler interface, but we
     585             :                 // don't bother.
     586           2 :                 canRecycle := !noRecycle && len(ll.segments) == 1 && ll.segments[0].synchronouslyClosed &&
     587           2 :                         ll.segments[0].logNameIndex == 0 &&
     588           2 :                         ll.segments[0].dir == wm.opts.Primary
     589           2 :                 if !canRecycle || !wm.recycler.Add(base.FileInfo{
     590           2 :                         FileNum:  base.DiskFileNum(ll.num),
     591           2 :                         FileSize: ll.segments[0].approxFileSize,
     592           2 :                 }) {
     593           2 :                         for _, s := range ll.segments {
     594           2 :                                 toDelete = append(toDelete, DeletableLog{
     595           2 :                                         FS:             s.dir.FS,
     596           2 :                                         Path:           s.dir.FS.PathJoin(s.dir.Dirname, makeLogFilename(ll.num, s.logNameIndex)),
     597           2 :                                         NumWAL:         ll.num,
     598           2 :                                         ApproxFileSize: s.approxFileSize,
     599           2 :                                 })
     600           2 :                         }
     601             :                 }
     602             :         }
     603           2 :         wm.mu.closedWALs = wm.mu.closedWALs[i:]
     604           2 :         return toDelete, nil
     605             : }
     606             : 
     607             : // Create implements Manager.
     608           2 : func (wm *failoverManager) Create(wn NumWAL, jobID int) (Writer, error) {
     609           2 :         func() {
     610           2 :                 wm.mu.Lock()
     611           2 :                 defer wm.mu.Unlock()
     612           2 :                 if wm.mu.ww != nil {
     613           0 :                         panic("previous wal.Writer not closed")
     614             :                 }
     615             :         }()
     616           2 :         fwOpts := failoverWriterOpts{
     617           2 :                 wn:                          wn,
     618           2 :                 logger:                      wm.opts.Logger,
     619           2 :                 timeSource:                  wm.opts.timeSource,
     620           2 :                 jobID:                       jobID,
     621           2 :                 logCreator:                  wm.logCreator,
     622           2 :                 noSyncOnClose:               wm.opts.NoSyncOnClose,
     623           2 :                 bytesPerSync:                wm.opts.BytesPerSync,
     624           2 :                 preallocateSize:             wm.opts.PreallocateSize,
     625           2 :                 minSyncInterval:             wm.opts.MinSyncInterval,
     626           2 :                 fsyncLatency:                wm.opts.FsyncLatency,
     627           2 :                 queueSemChan:                wm.opts.QueueSemChan,
     628           2 :                 stopper:                     wm.stopper,
     629           2 :                 failoverWriteAndSyncLatency: wm.opts.FailoverWriteAndSyncLatency,
     630           2 :                 writerClosed:                wm.writerClosed,
     631           2 :                 writerCreatedForTest:        wm.opts.logWriterCreatedForTesting,
     632           2 :         }
     633           2 :         var err error
     634           2 :         var ww *failoverWriter
     635           2 :         writerCreateFunc := func(dir dirAndFileHandle) switchableWriter {
     636           2 :                 ww, err = newFailoverWriter(fwOpts, dir)
     637           2 :                 if err != nil {
     638           0 :                         return nil
     639           0 :                 }
     640           2 :                 return ww
     641             :         }
     642           2 :         wm.monitor.newWriter(writerCreateFunc)
     643           2 :         if ww != nil {
     644           2 :                 wm.mu.Lock()
     645           2 :                 defer wm.mu.Unlock()
     646           2 :                 wm.mu.ww = ww
     647           2 :         }
     648           2 :         return ww, err
     649             : }
     650             : 
     651             : // ElevateWriteStallThresholdForFailover implements Manager.
     652           2 : func (wm *failoverManager) ElevateWriteStallThresholdForFailover() bool {
     653           2 :         return wm.monitor.elevateWriteStallThresholdForFailover()
     654           2 : }
     655             : 
     656           2 : func (wm *failoverManager) writerClosed(llse logicalLogWithSizesEtc) {
     657           2 :         wm.monitor.noWriter()
     658           2 :         wm.mu.Lock()
     659           2 :         defer wm.mu.Unlock()
     660           2 :         wm.mu.closedWALs = append(wm.mu.closedWALs, llse)
     661           2 :         wm.mu.ww = nil
     662           2 : }
     663             : 
     664             : // Stats implements Manager.
     665           2 : func (wm *failoverManager) Stats() Stats {
     666           2 :         obsoleteLogsCount, obsoleteLogSize := wm.recycler.Stats()
     667           2 :         failoverStats := wm.monitor.stats()
     668           2 :         failoverStats.FailoverWriteAndSyncLatency = wm.opts.FailoverWriteAndSyncLatency
     669           2 :         wm.mu.Lock()
     670           2 :         defer wm.mu.Unlock()
     671           2 :         var liveFileCount int
     672           2 :         var liveFileSize uint64
     673           2 :         updateStats := func(segments []segmentWithSizeEtc) {
     674           2 :                 for _, s := range segments {
     675           2 :                         liveFileCount++
     676           2 :                         liveFileSize += s.approxFileSize
     677           2 :                 }
     678             :         }
     679           2 :         for _, llse := range wm.mu.closedWALs {
     680           2 :                 updateStats(llse.segments)
     681           2 :         }
     682           2 :         if wm.mu.ww != nil {
     683           1 :                 updateStats(wm.mu.ww.getLog().segments)
     684           1 :         }
     685           2 :         for i := range wm.initialObsolete {
     686           1 :                 if i == 0 || wm.initialObsolete[i].NumWAL != wm.initialObsolete[i-1].NumWAL {
     687           1 :                         obsoleteLogsCount++
     688           1 :                 }
     689           1 :                 obsoleteLogSize += wm.initialObsolete[i].ApproxFileSize
     690             :         }
     691           2 :         return Stats{
     692           2 :                 ObsoleteFileCount: obsoleteLogsCount,
     693           2 :                 ObsoleteFileSize:  obsoleteLogSize,
     694           2 :                 LiveFileCount:     liveFileCount,
     695           2 :                 LiveFileSize:      liveFileSize,
     696           2 :                 Failover:          failoverStats,
     697           2 :         }
     698             : }
     699             : 
     700             : // Close implements Manager.
     701           2 : func (wm *failoverManager) Close() error {
     702           2 :         wm.stopper.stop()
     703           2 :         // Since all goroutines are stopped, can close the dirs.
     704           2 :         var err error
     705           2 :         for _, f := range wm.dirHandles {
     706           2 :                 err = firstError(err, f.Close())
     707           2 :         }
     708           2 :         return err
     709             : }
     710             : 
     711             : // RecyclerForTesting implements Manager.
     712           0 : func (wm *failoverManager) RecyclerForTesting() *LogRecycler {
     713           0 :         return nil
     714           0 : }
     715             : 
     716             : // logCreator implements the logCreator func type.
     717             : func (wm *failoverManager) logCreator(
     718             :         dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
     719           2 : ) (logFile vfs.File, initialFileSize uint64, err error) {
     720           2 :         logFilename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li))
     721           2 :         isPrimary := dir == wm.opts.Primary
     722           2 :         // Only recycling when logNameIndex is 0 is a somewhat arbitrary choice.
     723           2 :         considerRecycle := li == 0 && isPrimary
     724           2 :         createInfo := CreateInfo{
     725           2 :                 JobID:       jobID,
     726           2 :                 Path:        logFilename,
     727           2 :                 IsSecondary: !isPrimary,
     728           2 :                 Num:         wn,
     729           2 :                 Err:         nil,
     730           2 :         }
     731           2 :         defer func() {
     732           2 :                 createInfo.Err = err
     733           2 :                 if wm.opts.EventListener != nil {
     734           2 :                         wm.opts.EventListener.LogCreated(createInfo)
     735           2 :                 }
     736             :         }()
     737           2 :         if considerRecycle {
     738           2 :                 // Try to use a recycled log file. Recycling log files is an important
     739           2 :                 // performance optimization as it is faster to sync a file that has
     740           2 :                 // already been written, than one which is being written for the first
     741           2 :                 // time. This is due to the need to sync file metadata when a file is
     742           2 :                 // being written for the first time. Note this is true even if file
     743           2 :                 // preallocation is performed (e.g. fallocate).
     744           2 :                 var recycleLog base.FileInfo
     745           2 :                 var recycleOK bool
     746           2 :                 func() {
     747           2 :                         wm.recyclerPeekPopMu.Lock()
     748           2 :                         defer wm.recyclerPeekPopMu.Unlock()
     749           2 :                         recycleLog, recycleOK = wm.recycler.Peek()
     750           2 :                         if recycleOK {
     751           1 :                                 if err = wm.recycler.Pop(recycleLog.FileNum); err != nil {
     752           0 :                                         panic(err)
     753             :                                 }
     754             :                         }
     755             :                 }()
     756           2 :                 if recycleOK {
     757           1 :                         createInfo.RecycledFileNum = recycleLog.FileNum
     758           1 :                         recycleLogName := dir.FS.PathJoin(dir.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0))
     759           1 :                         r.writeStart()
     760           1 :                         logFile, err = dir.FS.ReuseForWrite(recycleLogName, logFilename, "pebble-wal")
     761           1 :                         r.writeEnd(err)
     762           1 :                         // TODO(sumeer): should we fatal since primary dir? At some point it is
     763           1 :                         // better to fatal instead of continuing to failover.
     764           1 :                         // base.MustExist(dir.FS, logFilename, wm.opts.Logger, err)
     765           1 :                         if err != nil {
     766           0 :                                 // TODO(sumeer): we have popped from the logRecycler, which is
     767           0 :                                 // arguably correct, since we don't want to keep trying to reuse a log
     768           0 :                                 // that causes some error. But the original or new file may exist, and
     769           0 :                                 // no one will clean it up unless the process restarts.
     770           0 :                                 return nil, 0, err
     771           0 :                         }
     772             :                         // Figure out the recycled WAL size. This Stat is necessary because
     773             :                         // ReuseForWrite's contract allows for removing the old file and
     774             :                         // creating a new one. We don't know whether the WAL was actually
     775             :                         // recycled.
     776             :                         //
     777             :                         // TODO(jackson): Adding a boolean to the ReuseForWrite return value
     778             :                         // indicating whether or not the file was actually reused would allow us
     779             :                         // to skip the stat and use recycleLog.FileSize.
     780           1 :                         var finfo os.FileInfo
     781           1 :                         finfo, err = logFile.Stat()
     782           1 :                         if err != nil {
     783           0 :                                 logFile.Close()
     784           0 :                                 return nil, 0, err
     785           0 :                         }
     786           1 :                         initialFileSize = uint64(finfo.Size())
     787           1 :                         return logFile, initialFileSize, nil
     788             :                 }
     789             :         }
     790             :         // Did not recycle.
     791             :         //
     792             :         // Create file.
     793           2 :         r.writeStart()
     794           2 :         logFile, err = dir.FS.Create(logFilename, "pebble-wal")
     795           2 :         r.writeEnd(err)
     796           2 :         return logFile, 0, err
     797             : }
     798             : 
     799             : type stopper struct {
     800             :         quiescer chan struct{} // Closed when quiescing
     801             :         wg       sync.WaitGroup
     802             : }
     803             : 
     804           2 : func newStopper() *stopper {
     805           2 :         return &stopper{
     806           2 :                 quiescer: make(chan struct{}),
     807           2 :         }
     808           2 : }
     809             : 
     810           2 : func (s *stopper) runAsync(f func()) {
     811           2 :         s.wg.Add(1)
     812           2 :         go func() {
     813           2 :                 f()
     814           2 :                 s.wg.Done()
     815           2 :         }()
     816             : }
     817             : 
     818             : // shouldQuiesce returns a channel which will be closed when stop() has been
     819             : // invoked and outstanding goroutines should begin to quiesce.
     820           2 : func (s *stopper) shouldQuiesce() <-chan struct{} {
     821           2 :         return s.quiescer
     822           2 : }
     823             : 
     824           2 : func (s *stopper) stop() {
     825           2 :         close(s.quiescer)
     826           2 :         s.wg.Wait()
     827           2 : }
     828             : 
     829             : // timeSource and tickerI are extracted from CockroachDB's timeutil, with
     830             : // removal of support for Timer, and added support in the manual
     831             : // implementation for reset.
     832             : 
     833             : // timeSource is used to interact with the clock and tickers. Abstracts
     834             : // time.Now and time.NewTicker for testing.
     835             : type timeSource interface {
     836             :         now() time.Time
     837             :         newTicker(duration time.Duration) tickerI
     838             : }
     839             : 
     840             : // tickerI is an interface wrapping time.Ticker.
     841             : type tickerI interface {
     842             :         reset(duration time.Duration)
     843             :         stop()
     844             :         ch() <-chan time.Time
     845             : }
     846             : 
     847             : // defaultTime is a timeSource using the time package.
     848             : type defaultTime struct{}
     849             : 
     850             : var _ timeSource = defaultTime{}
     851             : 
     852           2 : func (defaultTime) now() time.Time {
     853           2 :         return time.Now()
     854           2 : }
     855             : 
     856           2 : func (defaultTime) newTicker(duration time.Duration) tickerI {
     857           2 :         return (*defaultTicker)(time.NewTicker(duration))
     858           2 : }
     859             : 
     860             : // defaultTicker uses time.Ticker.
     861             : type defaultTicker time.Ticker
     862             : 
     863             : var _ tickerI = &defaultTicker{}
     864             : 
     865           2 : func (t *defaultTicker) reset(duration time.Duration) {
     866           2 :         (*time.Ticker)(t).Reset(duration)
     867           2 : }
     868             : 
     869           2 : func (t *defaultTicker) stop() {
     870           2 :         (*time.Ticker)(t).Stop()
     871           2 : }
     872             : 
     873           2 : func (t *defaultTicker) ch() <-chan time.Time {
     874           2 :         return (*time.Ticker)(t).C
     875           2 : }
     876             : 
     877             : // Make lint happy.
     878             : var _ = (*failoverMonitor).noWriter
     879             : var _ = (*failoverManager).writerClosed
     880             : var _ = (&stopper{}).shouldQuiesce

Generated by: LCOV version 1.14