LCOV - code coverage report
Current view: top level - pebble/wal - failover_manager.go (source / functions) Hit Total Coverage
Test: 2024-02-24 08:15Z 0b946194 - meta test only.lcov Lines: 0 508 0.0 %
Date: 2024-02-24 08:16:20 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package wal
       6             : 
       7             : import (
       8             :         "os"
       9             :         "sync"
      10             :         "time"
      11             : 
      12             :         "github.com/cockroachdb/pebble/internal/base"
      13             :         "github.com/cockroachdb/pebble/vfs"
      14             :         "golang.org/x/exp/rand"
      15             : )
      16             : 
      17             : // dirProber probes the primary dir, until it is confirmed to be healthy. If
      18             : // it doesn't have enough samples, it is deemed to be unhealthy. It is only
      19             : // used for failback to the primary.
      20             : type dirProber struct {
      21             :         fs vfs.FS
      22             :         // The full path of the file to use for the probe. The probe is destructive
      23             :         // in that it deletes and (re)creates the file.
      24             :         filename string
      25             :         // The probing interval, when enabled.
      26             :         interval time.Duration
      27             :         timeSource
      28             :         // buf holds the random bytes that are written during the probe.
      29             :         buf [100 << 10]byte
      30             :         // enabled is signaled to enable and disable probing. The initial state of
      31             :         // the prober is disabled.
      32             :         enabled chan bool
      33             :         mu      struct {
      34             :                 sync.Mutex
      35             :                 // Circular buffer of history samples.
      36             :                 history [probeHistoryLength]time.Duration
      37             :                 // The history is in [firstProbeIndex, nextProbeIndex).
      38             :                 firstProbeIndex int
      39             :                 nextProbeIndex  int
      40             :         }
      41             :         iterationForTesting chan<- struct{}
      42             : }
      43             : 
      44             : const probeHistoryLength = 128
      45             : 
      46             : // Large value.
      47             : const failedProbeDuration = 24 * 60 * 60 * time.Second
      48             : 
      49             : // There is no close/stop method on the dirProber -- use stopper.
      50             : func (p *dirProber) init(
      51             :         fs vfs.FS,
      52             :         filename string,
      53             :         interval time.Duration,
      54             :         stopper *stopper,
      55             :         ts timeSource,
      56             :         notifyIterationForTesting chan<- struct{},
      57           0 : ) {
      58           0 :         *p = dirProber{
      59           0 :                 fs:                  fs,
      60           0 :                 filename:            filename,
      61           0 :                 interval:            interval,
      62           0 :                 timeSource:          ts,
      63           0 :                 enabled:             make(chan bool),
      64           0 :                 iterationForTesting: notifyIterationForTesting,
      65           0 :         }
      66           0 :         // Random bytes for writing, to defeat any FS compression optimization.
      67           0 :         _, err := rand.Read(p.buf[:])
      68           0 :         if err != nil {
      69           0 :                 panic(err)
      70             :         }
      71           0 :         stopper.runAsync(func() {
      72           0 :                 p.probeLoop(stopper.shouldQuiesce())
      73           0 :         })
      74             : }
      75             : 
      76           0 : func (p *dirProber) probeLoop(shouldQuiesce <-chan struct{}) {
      77           0 :         ticker := p.timeSource.newTicker(p.interval)
      78           0 :         ticker.stop()
      79           0 :         tickerCh := ticker.ch()
      80           0 :         done := false
      81           0 :         var enabled bool
      82           0 :         for !done {
      83           0 :                 select {
      84           0 :                 case <-tickerCh:
      85           0 :                         if !enabled {
      86           0 :                                 // Could have a tick waiting before we disabled it. Ignore.
      87           0 :                                 continue
      88             :                         }
      89           0 :                         probeDur := func() time.Duration {
      90           0 :                                 // Delete, create, write, sync.
      91           0 :                                 start := p.timeSource.now()
      92           0 :                                 _ = p.fs.Remove(p.filename)
      93           0 :                                 f, err := p.fs.Create(p.filename)
      94           0 :                                 if err != nil {
      95           0 :                                         return failedProbeDuration
      96           0 :                                 }
      97           0 :                                 defer f.Close()
      98           0 :                                 n, err := f.Write(p.buf[:])
      99           0 :                                 if err != nil {
     100           0 :                                         return failedProbeDuration
     101           0 :                                 }
     102           0 :                                 if n != len(p.buf) {
     103           0 :                                         panic("invariant violation")
     104             :                                 }
     105           0 :                                 err = f.Sync()
     106           0 :                                 if err != nil {
     107           0 :                                         return failedProbeDuration
     108           0 :                                 }
     109           0 :                                 return p.timeSource.now().Sub(start)
     110             :                         }()
     111           0 :                         p.mu.Lock()
     112           0 :                         nextIndex := p.mu.nextProbeIndex % probeHistoryLength
     113           0 :                         p.mu.history[nextIndex] = probeDur
     114           0 :                         p.mu.nextProbeIndex++
     115           0 :                         numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex
     116           0 :                         if numSamples > probeHistoryLength {
     117           0 :                                 // Length has exceeded capacity, i.e., overwritten the first probe.
     118           0 :                                 p.mu.firstProbeIndex++
     119           0 :                         }
     120           0 :                         p.mu.Unlock()
     121             : 
     122           0 :                 case <-shouldQuiesce:
     123           0 :                         done = true
     124             : 
     125           0 :                 case enabled = <-p.enabled:
     126           0 :                         if enabled {
     127           0 :                                 ticker.reset(p.interval)
     128           0 :                         } else {
     129           0 :                                 ticker.stop()
     130           0 :                                 p.mu.Lock()
     131           0 :                                 p.mu.firstProbeIndex = 0
     132           0 :                                 p.mu.nextProbeIndex = 0
     133           0 :                                 p.mu.Unlock()
     134           0 :                         }
     135             :                 }
     136           0 :                 if p.iterationForTesting != nil {
     137           0 :                         p.iterationForTesting <- struct{}{}
     138           0 :                 }
     139             :         }
     140             : }
     141             : 
     142           0 : func (p *dirProber) enableProbing() {
     143           0 :         p.enabled <- true
     144           0 : }
     145             : 
     146           0 : func (p *dirProber) disableProbing() {
     147           0 :         p.enabled <- false
     148           0 : }
     149             : 
     150           0 : func (p *dirProber) getMeanMax(interval time.Duration) (time.Duration, time.Duration) {
     151           0 :         p.mu.Lock()
     152           0 :         defer p.mu.Unlock()
     153           0 :         numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex
     154           0 :         samplesNeeded := int((interval + p.interval - 1) / p.interval)
     155           0 :         if samplesNeeded == 0 {
     156           0 :                 panic("interval is too short")
     157           0 :         } else if samplesNeeded > probeHistoryLength {
     158           0 :                 panic("interval is too long")
     159             :         }
     160           0 :         if samplesNeeded > numSamples {
     161           0 :                 // Not enough samples, so assume not yet healthy.
     162           0 :                 return failedProbeDuration, failedProbeDuration
     163           0 :         }
     164           0 :         offset := numSamples - samplesNeeded
     165           0 :         var sum, max time.Duration
     166           0 :         for i := p.mu.firstProbeIndex + offset; i < p.mu.nextProbeIndex; i++ {
     167           0 :                 sampleDur := p.mu.history[i%probeHistoryLength]
     168           0 :                 sum += sampleDur
     169           0 :                 if max < sampleDur {
     170           0 :                         max = sampleDur
     171           0 :                 }
     172             :         }
     173           0 :         mean := sum / time.Duration(samplesNeeded)
     174           0 :         return mean, max
     175             : }
     176             : 
     177             : type dirIndex int
     178             : 
     179             : const (
     180             :         primaryDirIndex dirIndex = iota
     181             :         secondaryDirIndex
     182             :         numDirIndices
     183             : )
     184             : 
     185             : type dirAndFileHandle struct {
     186             :         Dir
     187             :         vfs.File
     188             : }
     189             : 
     190             : // switchableWriter is a subset of failoverWriter needed by failoverMonitor.
     191             : type switchableWriter interface {
     192             :         switchToNewDir(dirAndFileHandle) error
     193             :         ongoingLatencyOrErrorForCurDir() (time.Duration, error)
     194             : }
     195             : 
     196             : type failoverMonitorOptions struct {
     197             :         // The primary and secondary dir.
     198             :         dirs [numDirIndices]dirAndFileHandle
     199             : 
     200             :         FailoverOptions
     201             :         stopper *stopper
     202             : }
     203             : 
     204             : // failoverMonitor monitors the latency and error observed by the
     205             : // switchableWriter, and does failover by switching the dir. It also monitors
     206             : // the primary dir for failback.
     207             : type failoverMonitor struct {
     208             :         opts   failoverMonitorOptions
     209             :         prober dirProber
     210             :         mu     struct {
     211             :                 sync.Mutex
     212             :                 // dirIndex and lastFailbackTime are only modified by monitorLoop. They
     213             :                 // are protected by the mutex for concurrent reads.
     214             : 
     215             :                 // dirIndex is the directory to use by writer (if non-nil), or when the
     216             :                 // writer becomes non-nil.
     217             :                 dirIndex
     218             :                 // The time at which the monitor last failed back to the primary. This is
     219             :                 // only relevant when dirIndex == primaryDirIndex.
     220             :                 lastFailBackTime time.Time
     221             :                 // The current failoverWriter, exposed via a narrower interface.
     222             :                 writer switchableWriter
     223             :         }
     224             : }
     225             : 
     226           0 : func newFailoverMonitor(opts failoverMonitorOptions) *failoverMonitor {
     227           0 :         m := &failoverMonitor{
     228           0 :                 opts: opts,
     229           0 :         }
     230           0 :         m.prober.init(opts.dirs[primaryDirIndex].FS,
     231           0 :                 opts.dirs[primaryDirIndex].FS.PathJoin(opts.dirs[primaryDirIndex].Dirname, "probe-file"),
     232           0 :                 opts.PrimaryDirProbeInterval, opts.stopper, opts.timeSource, opts.proberIterationForTesting)
     233           0 :         opts.stopper.runAsync(func() {
     234           0 :                 m.monitorLoop(opts.stopper.shouldQuiesce())
     235           0 :         })
     236           0 :         return m
     237             : }
     238             : 
     239             : // Called when previous writer is closed
     240           0 : func (m *failoverMonitor) noWriter() {
     241           0 :         m.mu.Lock()
     242           0 :         defer m.mu.Unlock()
     243           0 :         m.mu.writer = nil
     244           0 : }
     245             : 
     246             : // writerCreateFunc is allowed to return nil, if there is an error. It is not
     247             : // the responsibility of failoverMonitor to handle that error. So this should
     248             : // not be due to an IO error (which failoverMonitor is interested in).
     249           0 : func (m *failoverMonitor) newWriter(writerCreateFunc func(dir dirAndFileHandle) switchableWriter) {
     250           0 :         m.mu.Lock()
     251           0 :         defer m.mu.Unlock()
     252           0 :         if m.mu.writer != nil {
     253           0 :                 panic("previous writer not closed")
     254             :         }
     255           0 :         m.mu.writer = writerCreateFunc(m.opts.dirs[m.mu.dirIndex])
     256             : }
     257             : 
     258           0 : func (m *failoverMonitor) elevateWriteStallThresholdForFailover() bool {
     259           0 :         m.mu.Lock()
     260           0 :         defer m.mu.Unlock()
     261           0 :         if m.mu.dirIndex == secondaryDirIndex {
     262           0 :                 return true
     263           0 :         }
     264           0 :         intervalSinceFailedback := m.opts.timeSource.now().Sub(m.mu.lastFailBackTime)
     265           0 :         return intervalSinceFailedback < m.opts.ElevatedWriteStallThresholdLag
     266             : }
     267             : 
     268             : // lastWriterInfo is state maintained in the monitorLoop for the latest
     269             : // switchable writer. It is mainly used to dampen the switching.
     270             : type lastWriterInfo struct {
     271             :         writer                 switchableWriter
     272             :         numSwitches            int
     273             :         ongoingLatencyAtSwitch time.Duration
     274             :         errorCounts            [numDirIndices]int
     275             : }
     276             : 
     277           0 : func (m *failoverMonitor) monitorLoop(shouldQuiesce <-chan struct{}) {
     278           0 :         ticker := m.opts.timeSource.newTicker(m.opts.UnhealthySamplingInterval)
     279           0 :         if m.opts.monitorIterationForTesting != nil {
     280           0 :                 m.opts.monitorIterationForTesting <- struct{}{}
     281           0 :         }
     282           0 :         tickerCh := ticker.ch()
     283           0 :         dirIndex := primaryDirIndex
     284           0 :         var lastWriter lastWriterInfo
     285           0 :         for {
     286           0 :                 select {
     287           0 :                 case <-shouldQuiesce:
     288           0 :                         ticker.stop()
     289           0 :                         return
     290           0 :                 case <-tickerCh:
     291           0 :                         writerOngoingLatency, writerErr := func() (time.Duration, error) {
     292           0 :                                 m.mu.Lock()
     293           0 :                                 defer m.mu.Unlock()
     294           0 :                                 if m.mu.writer != lastWriter.writer {
     295           0 :                                         lastWriter = lastWriterInfo{writer: m.mu.writer}
     296           0 :                                 }
     297           0 :                                 if lastWriter.writer == nil {
     298           0 :                                         return 0, nil
     299           0 :                                 }
     300           0 :                                 return lastWriter.writer.ongoingLatencyOrErrorForCurDir()
     301             :                         }()
     302           0 :                         switchDir := false
     303           0 :                         // Arbitrary value.
     304           0 :                         const highSecondaryErrorCountThreshold = 2
     305           0 :                         // We don't consider a switch if currently using the primary dir and the
     306           0 :                         // secondary dir has high enough errors. It is more likely that someone
     307           0 :                         // has misconfigured a secondary e.g. wrong permissions or not enough
     308           0 :                         // disk space. We only remember the error history in the context of the
     309           0 :                         // lastWriter since an operator can fix the underlying misconfiguration.
     310           0 :                         if !(lastWriter.errorCounts[secondaryDirIndex] >= highSecondaryErrorCountThreshold &&
     311           0 :                                 dirIndex == primaryDirIndex) {
     312           0 :                                 // Switching heuristics. Subject to change based on real world experience.
     313           0 :                                 if writerErr != nil {
     314           0 :                                         // An error causes an immediate switch, since a LogWriter with an
     315           0 :                                         // error is useless.
     316           0 :                                         lastWriter.errorCounts[dirIndex]++
     317           0 :                                         switchDir = true
     318           0 :                                 } else if writerOngoingLatency > m.opts.UnhealthyOperationLatencyThreshold() {
     319           0 :                                         // Arbitrary value.
     320           0 :                                         const switchImmediatelyCountThreshold = 2
     321           0 :                                         // High latency. Switch immediately if the number of switches that
     322           0 :                                         // have been done is below the threshold, since that gives us an
     323           0 :                                         // observation of both dirs. Once above that threshold, decay the
     324           0 :                                         // switch rate by increasing the observed latency needed for a
     325           0 :                                         // switch.
     326           0 :                                         if lastWriter.numSwitches < switchImmediatelyCountThreshold ||
     327           0 :                                                 writerOngoingLatency > 2*lastWriter.ongoingLatencyAtSwitch {
     328           0 :                                                 switchDir = true
     329           0 :                                                 lastWriter.ongoingLatencyAtSwitch = writerOngoingLatency
     330           0 :                                         }
     331             :                                         // Else high latency, but not high enough yet to motivate switch.
     332           0 :                                 } else if dirIndex == secondaryDirIndex {
     333           0 :                                         // The writer looks healthy. We can still switch if the writer is using the
     334           0 :                                         // secondary dir and the primary is healthy again.
     335           0 :                                         primaryMean, primaryMax := m.prober.getMeanMax(m.opts.HealthyInterval)
     336           0 :                                         if primaryMean < m.opts.HealthyProbeLatencyThreshold &&
     337           0 :                                                 primaryMax < m.opts.HealthyProbeLatencyThreshold {
     338           0 :                                                 switchDir = true
     339           0 :                                         }
     340             :                                 }
     341             :                         }
     342           0 :                         if switchDir {
     343           0 :                                 lastWriter.numSwitches++
     344           0 :                                 if dirIndex == secondaryDirIndex {
     345           0 :                                         // Switching back to primary, so don't need to probe to see if
     346           0 :                                         // primary is healthy.
     347           0 :                                         m.prober.disableProbing()
     348           0 :                                         dirIndex = primaryDirIndex
     349           0 :                                 } else {
     350           0 :                                         m.prober.enableProbing()
     351           0 :                                         dirIndex = secondaryDirIndex
     352           0 :                                 }
     353           0 :                                 dir := m.opts.dirs[dirIndex]
     354           0 :                                 m.mu.Lock()
     355           0 :                                 m.mu.dirIndex = dirIndex
     356           0 :                                 if dirIndex == primaryDirIndex {
     357           0 :                                         m.mu.lastFailBackTime = m.opts.timeSource.now()
     358           0 :                                 }
     359           0 :                                 if m.mu.writer != nil {
     360           0 :                                         m.mu.writer.switchToNewDir(dir)
     361           0 :                                 }
     362           0 :                                 m.mu.Unlock()
     363             :                         }
     364             :                 }
     365           0 :                 if m.opts.monitorStateForTesting != nil {
     366           0 :                         m.opts.monitorStateForTesting(lastWriter.numSwitches, lastWriter.ongoingLatencyAtSwitch)
     367           0 :                 }
     368           0 :                 if m.opts.monitorIterationForTesting != nil {
     369           0 :                         m.opts.monitorIterationForTesting <- struct{}{}
     370           0 :                 }
     371             :         }
     372             : }
     373             : 
     374           0 : func (o *FailoverOptions) ensureDefaults() {
     375           0 :         if o.PrimaryDirProbeInterval == 0 {
     376           0 :                 o.PrimaryDirProbeInterval = time.Second
     377           0 :         }
     378           0 :         if o.HealthyProbeLatencyThreshold == 0 {
     379           0 :                 o.HealthyProbeLatencyThreshold = 100 * time.Millisecond
     380           0 :         }
     381           0 :         if o.HealthyInterval == 0 {
     382           0 :                 o.HealthyInterval = 2 * time.Minute
     383           0 :         }
     384           0 :         if o.UnhealthySamplingInterval == 0 {
     385           0 :                 o.UnhealthySamplingInterval = 100 * time.Millisecond
     386           0 :         }
     387           0 :         if o.UnhealthyOperationLatencyThreshold == nil {
     388           0 :                 o.UnhealthyOperationLatencyThreshold = func() time.Duration {
     389           0 :                         return 200 * time.Millisecond
     390           0 :                 }
     391             :         }
     392             : }
     393             : 
     394             : type logicalLogWithSizesEtc struct {
     395             :         num      NumWAL
     396             :         segments []segmentWithSizeEtc
     397             : }
     398             : 
     399             : type segmentWithSizeEtc struct {
     400             :         segment
     401             :         approxFileSize      uint64
     402             :         synchronouslyClosed bool
     403             : }
     404             : 
     405             : type failoverManager struct {
     406             :         opts Options
     407             :         // TODO(jackson/sumeer): read-path etc.
     408             : 
     409             :         dirHandles [numDirIndices]vfs.File
     410             :         stopper    *stopper
     411             :         monitor    *failoverMonitor
     412             :         mu         struct {
     413             :                 sync.Mutex
     414             :                 closedWALs []logicalLogWithSizesEtc
     415             :                 ww         *failoverWriter
     416             :         }
     417             :         recycler LogRecycler
     418             :         // Due to async creation of files in failoverWriter, multiple goroutines can
     419             :         // concurrently try to get a file from the recycler. This mutex protects the
     420             :         // logRecycler.{Peek,Pop} pair.
     421             :         recyclerPeekPopMu sync.Mutex
     422             : }
     423             : 
     424             : var _ Manager = &failoverManager{}
     425             : 
     426             : // TODO(sumeer):
     427             : // - log deletion: if record.LogWriter did not close yet, the cleaner may
     428             : //   get an error when deleting or renaming (only under windows?).
     429             : 
     430             : // Init implements Manager.
     431           0 : func (wm *failoverManager) Init(o Options, initial Logs) error {
     432           0 :         if o.timeSource == nil {
     433           0 :                 o.timeSource = defaultTime{}
     434           0 :         }
     435           0 :         o.FailoverOptions.ensureDefaults()
     436           0 :         stopper := newStopper()
     437           0 :         var dirs [numDirIndices]dirAndFileHandle
     438           0 :         for i, dir := range []Dir{o.Primary, o.Secondary} {
     439           0 :                 dirs[i].Dir = dir
     440           0 :                 f, err := dir.FS.OpenDir(dir.Dirname)
     441           0 :                 if err != nil {
     442           0 :                         return err
     443           0 :                 }
     444           0 :                 dirs[i].File = f
     445             :         }
     446           0 :         fmOpts := failoverMonitorOptions{
     447           0 :                 dirs:            dirs,
     448           0 :                 FailoverOptions: o.FailoverOptions,
     449           0 :                 stopper:         stopper,
     450           0 :         }
     451           0 :         monitor := newFailoverMonitor(fmOpts)
     452           0 :         *wm = failoverManager{
     453           0 :                 opts:       o,
     454           0 :                 dirHandles: [numDirIndices]vfs.File{dirs[primaryDirIndex].File, dirs[secondaryDirIndex].File},
     455           0 :                 stopper:    stopper,
     456           0 :                 monitor:    monitor,
     457           0 :         }
     458           0 :         wm.recycler.Init(o.MaxNumRecyclableLogs)
     459           0 :         for _, ll := range initial {
     460           0 :                 llse := logicalLogWithSizesEtc{
     461           0 :                         num: ll.Num,
     462           0 :                 }
     463           0 :                 if wm.recycler.MinRecycleLogNum() <= ll.Num {
     464           0 :                         wm.recycler.SetMinRecycleLogNum(ll.Num + 1)
     465           0 :                 }
     466           0 :                 for i, s := range ll.segments {
     467           0 :                         fs, path := ll.SegmentLocation(i)
     468           0 :                         stat, err := fs.Stat(path)
     469           0 :                         if err != nil {
     470           0 :                                 return err
     471           0 :                         }
     472           0 :                         llse.segments = append(llse.segments, segmentWithSizeEtc{
     473           0 :                                 segment:             s,
     474           0 :                                 approxFileSize:      uint64(stat.Size()),
     475           0 :                                 synchronouslyClosed: true,
     476           0 :                         })
     477             :                 }
     478           0 :                 wm.mu.closedWALs = append(wm.mu.closedWALs, llse)
     479             :         }
     480           0 :         return nil
     481             : }
     482             : 
     483             : // List implements Manager.
     484           0 : func (wm *failoverManager) List() (Logs, error) {
     485           0 :         wm.mu.Lock()
     486           0 :         defer wm.mu.Unlock()
     487           0 :         n := len(wm.mu.closedWALs)
     488           0 :         if wm.mu.ww != nil {
     489           0 :                 n++
     490           0 :         }
     491           0 :         wals := make(Logs, n)
     492           0 :         setLogicalLog := func(index int, llse logicalLogWithSizesEtc) {
     493           0 :                 segments := make([]segment, len(llse.segments))
     494           0 :                 for j := range llse.segments {
     495           0 :                         segments[j] = llse.segments[j].segment
     496           0 :                 }
     497           0 :                 wals[index] = LogicalLog{
     498           0 :                         Num:      llse.num,
     499           0 :                         segments: segments,
     500           0 :                 }
     501             :         }
     502           0 :         for i, llse := range wm.mu.closedWALs {
     503           0 :                 setLogicalLog(i, llse)
     504           0 :         }
     505           0 :         if wm.mu.ww != nil {
     506           0 :                 setLogicalLog(n-1, wm.mu.ww.getLog())
     507           0 :         }
     508           0 :         return wals, nil
     509             : }
     510             : 
     511             : // Obsolete implements Manager.
     512             : func (wm *failoverManager) Obsolete(
     513             :         minUnflushedNum NumWAL, noRecycle bool,
     514           0 : ) (toDelete []DeletableLog, err error) {
     515           0 :         wm.mu.Lock()
     516           0 :         defer wm.mu.Unlock()
     517           0 :         i := 0
     518           0 :         for ; i < len(wm.mu.closedWALs); i++ {
     519           0 :                 ll := wm.mu.closedWALs[i]
     520           0 :                 if ll.num >= minUnflushedNum {
     521           0 :                         break
     522             :                 }
     523             :                 // Recycle only the primary at logNameIndex=0, if there was no failover,
     524             :                 // and synchronously closed. It may not be safe to recycle a file that is
     525             :                 // still being written to. And recycling when there was a failover may
     526             :                 // fill up the recycler with smaller log files. The restriction regarding
     527             :                 // logNameIndex=0 is because logRecycler.Peek only exposes the
     528             :                 // DiskFileNum, and we need to use that to construct the path -- we could
     529             :                 // remove this restriction by changing the logRecycler interface, but we
     530             :                 // don't bother.
     531           0 :                 canRecycle := !noRecycle && len(ll.segments) == 1 && ll.segments[0].synchronouslyClosed &&
     532           0 :                         ll.segments[0].logNameIndex == 0 &&
     533           0 :                         ll.segments[0].dir == wm.opts.Primary
     534           0 :                 if !canRecycle || !wm.recycler.Add(base.FileInfo{
     535           0 :                         FileNum:  base.DiskFileNum(ll.num),
     536           0 :                         FileSize: ll.segments[0].approxFileSize,
     537           0 :                 }) {
     538           0 :                         for _, s := range ll.segments {
     539           0 :                                 toDelete = append(toDelete, DeletableLog{
     540           0 :                                         FS:             s.dir.FS,
     541           0 :                                         Path:           s.dir.FS.PathJoin(s.dir.Dirname, makeLogFilename(ll.num, s.logNameIndex)),
     542           0 :                                         NumWAL:         ll.num,
     543           0 :                                         ApproxFileSize: s.approxFileSize,
     544           0 :                                 })
     545           0 :                         }
     546             :                 }
     547             :         }
     548           0 :         wm.mu.closedWALs = wm.mu.closedWALs[i:]
     549           0 :         return toDelete, nil
     550             : }
     551             : 
     552             : // Create implements Manager.
     553           0 : func (wm *failoverManager) Create(wn NumWAL, jobID int) (Writer, error) {
     554           0 :         func() {
     555           0 :                 wm.mu.Lock()
     556           0 :                 defer wm.mu.Unlock()
     557           0 :                 if wm.mu.ww != nil {
     558           0 :                         panic("previous wal.Writer not closed")
     559             :                 }
     560             :         }()
     561           0 :         fwOpts := failoverWriterOpts{
     562           0 :                 wn:                   wn,
     563           0 :                 logger:               wm.opts.Logger,
     564           0 :                 timeSource:           wm.opts.timeSource,
     565           0 :                 jobID:                jobID,
     566           0 :                 logCreator:           wm.logCreator,
     567           0 :                 noSyncOnClose:        wm.opts.NoSyncOnClose,
     568           0 :                 bytesPerSync:         wm.opts.BytesPerSync,
     569           0 :                 preallocateSize:      wm.opts.PreallocateSize,
     570           0 :                 minSyncInterval:      wm.opts.MinSyncInterval,
     571           0 :                 fsyncLatency:         wm.opts.FsyncLatency,
     572           0 :                 queueSemChan:         wm.opts.QueueSemChan,
     573           0 :                 stopper:              wm.stopper,
     574           0 :                 writerClosed:         wm.writerClosed,
     575           0 :                 writerCreatedForTest: wm.opts.logWriterCreatedForTesting,
     576           0 :         }
     577           0 :         var err error
     578           0 :         var ww *failoverWriter
     579           0 :         writerCreateFunc := func(dir dirAndFileHandle) switchableWriter {
     580           0 :                 ww, err = newFailoverWriter(fwOpts, dir)
     581           0 :                 if err != nil {
     582           0 :                         return nil
     583           0 :                 }
     584           0 :                 return ww
     585             :         }
     586           0 :         wm.monitor.newWriter(writerCreateFunc)
     587           0 :         if ww != nil {
     588           0 :                 wm.mu.Lock()
     589           0 :                 defer wm.mu.Unlock()
     590           0 :                 wm.mu.ww = ww
     591           0 :         }
     592           0 :         return ww, err
     593             : }
     594             : 
     595             : // ElevateWriteStallThresholdForFailover implements Manager.
     596           0 : func (wm *failoverManager) ElevateWriteStallThresholdForFailover() bool {
     597           0 :         return wm.monitor.elevateWriteStallThresholdForFailover()
     598           0 : }
     599             : 
     600           0 : func (wm *failoverManager) writerClosed(llse logicalLogWithSizesEtc) {
     601           0 :         wm.monitor.noWriter()
     602           0 :         wm.mu.Lock()
     603           0 :         defer wm.mu.Unlock()
     604           0 :         wm.mu.closedWALs = append(wm.mu.closedWALs, llse)
     605           0 :         wm.mu.ww = nil
     606           0 : }
     607             : 
     608             : // Stats implements Manager.
     609           0 : func (wm *failoverManager) Stats() Stats {
     610           0 :         recycledLogsCount, recycledLogSize := wm.recycler.Stats()
     611           0 :         wm.mu.Lock()
     612           0 :         defer wm.mu.Unlock()
     613           0 :         var liveFileCount int
     614           0 :         var liveFileSize uint64
     615           0 :         updateStats := func(segments []segmentWithSizeEtc) {
     616           0 :                 for _, s := range segments {
     617           0 :                         liveFileCount++
     618           0 :                         liveFileSize += s.approxFileSize
     619           0 :                 }
     620             :         }
     621           0 :         for _, llse := range wm.mu.closedWALs {
     622           0 :                 updateStats(llse.segments)
     623           0 :         }
     624           0 :         if wm.mu.ww != nil {
     625           0 :                 updateStats(wm.mu.ww.getLog().segments)
     626           0 :         }
     627           0 :         return Stats{
     628           0 :                 ObsoleteFileCount: recycledLogsCount,
     629           0 :                 ObsoleteFileSize:  recycledLogSize,
     630           0 :                 LiveFileCount:     liveFileCount,
     631           0 :                 LiveFileSize:      liveFileSize,
     632           0 :         }
     633             : }
     634             : 
     635             : // Close implements Manager.
     636           0 : func (wm *failoverManager) Close() error {
     637           0 :         wm.stopper.stop()
     638           0 :         // Since all goroutines are stopped, can close the dirs.
     639           0 :         var err error
     640           0 :         for _, f := range wm.dirHandles {
     641           0 :                 err = firstError(err, f.Close())
     642           0 :         }
     643           0 :         return err
     644             : }
     645             : 
     646             : // RecyclerForTesting implements Manager.
     647           0 : func (wm *failoverManager) RecyclerForTesting() *LogRecycler {
     648           0 :         return nil
     649           0 : }
     650             : 
     651             : // logCreator implements the logCreator func type.
     652             : func (wm *failoverManager) logCreator(
     653             :         dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
     654           0 : ) (logFile vfs.File, initialFileSize uint64, err error) {
     655           0 :         logFilename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li))
     656           0 :         isPrimary := dir == wm.opts.Primary
     657           0 :         // Only recycling when logNameIndex is 0 is a somewhat arbitrary choice.
     658           0 :         considerRecycle := li == 0 && isPrimary
     659           0 :         createInfo := CreateInfo{
     660           0 :                 JobID:       jobID,
     661           0 :                 Path:        logFilename,
     662           0 :                 IsSecondary: !isPrimary,
     663           0 :                 Num:         wn,
     664           0 :                 Err:         nil,
     665           0 :         }
     666           0 :         defer func() {
     667           0 :                 createInfo.Err = err
     668           0 :                 if wm.opts.EventListener != nil {
     669           0 :                         wm.opts.EventListener.LogCreated(createInfo)
     670           0 :                 }
     671             :         }()
     672           0 :         if considerRecycle {
     673           0 :                 // Try to use a recycled log file. Recycling log files is an important
     674           0 :                 // performance optimization as it is faster to sync a file that has
     675           0 :                 // already been written, than one which is being written for the first
     676           0 :                 // time. This is due to the need to sync file metadata when a file is
     677           0 :                 // being written for the first time. Note this is true even if file
     678           0 :                 // preallocation is performed (e.g. fallocate).
     679           0 :                 var recycleLog base.FileInfo
     680           0 :                 var recycleOK bool
     681           0 :                 func() {
     682           0 :                         wm.recyclerPeekPopMu.Lock()
     683           0 :                         defer wm.recyclerPeekPopMu.Unlock()
     684           0 :                         recycleLog, recycleOK = wm.recycler.Peek()
     685           0 :                         if recycleOK {
     686           0 :                                 if err = wm.recycler.Pop(recycleLog.FileNum); err != nil {
     687           0 :                                         panic(err)
     688             :                                 }
     689             :                         }
     690             :                 }()
     691           0 :                 if recycleOK {
     692           0 :                         createInfo.RecycledFileNum = recycleLog.FileNum
     693           0 :                         recycleLogName := dir.FS.PathJoin(dir.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0))
     694           0 :                         r.writeStart()
     695           0 :                         logFile, err = dir.FS.ReuseForWrite(recycleLogName, logFilename)
     696           0 :                         r.writeEnd(err)
     697           0 :                         // TODO(sumeer): should we fatal since primary dir? At some point it is
     698           0 :                         // better to fatal instead of continuing to failover.
     699           0 :                         // base.MustExist(dir.FS, logFilename, wm.opts.Logger, err)
     700           0 :                         if err != nil {
     701           0 :                                 // TODO(sumeer): we have popped from the logRecycler, which is
     702           0 :                                 // arguably correct, since we don't want to keep trying to reuse a log
     703           0 :                                 // that causes some error. But the original or new file may exist, and
     704           0 :                                 // no one will clean it up unless the process restarts.
     705           0 :                                 return nil, 0, err
     706           0 :                         }
     707             :                         // Figure out the recycled WAL size. This Stat is necessary because
     708             :                         // ReuseForWrite's contract allows for removing the old file and
     709             :                         // creating a new one. We don't know whether the WAL was actually
     710             :                         // recycled.
     711             :                         //
     712             :                         // TODO(jackson): Adding a boolean to the ReuseForWrite return value
     713             :                         // indicating whether or not the file was actually reused would allow us
     714             :                         // to skip the stat and use recycleLog.FileSize.
     715           0 :                         var finfo os.FileInfo
     716           0 :                         finfo, err = logFile.Stat()
     717           0 :                         if err != nil {
     718           0 :                                 logFile.Close()
     719           0 :                                 return nil, 0, err
     720           0 :                         }
     721           0 :                         initialFileSize = uint64(finfo.Size())
     722           0 :                         return logFile, initialFileSize, nil
     723             :                 }
     724             :         }
     725             :         // Did not recycle.
     726             :         //
     727             :         // Create file.
     728           0 :         r.writeStart()
     729           0 :         logFile, err = dir.FS.Create(logFilename)
     730           0 :         r.writeEnd(err)
     731           0 :         return logFile, 0, err
     732             : }
     733             : 
     734             : type stopper struct {
     735             :         quiescer chan struct{} // Closed when quiescing
     736             :         wg       sync.WaitGroup
     737             : }
     738             : 
     739           0 : func newStopper() *stopper {
     740           0 :         return &stopper{
     741           0 :                 quiescer: make(chan struct{}),
     742           0 :         }
     743           0 : }
     744             : 
     745           0 : func (s *stopper) runAsync(f func()) {
     746           0 :         s.wg.Add(1)
     747           0 :         go func() {
     748           0 :                 f()
     749           0 :                 s.wg.Done()
     750           0 :         }()
     751             : }
     752             : 
     753             : // shouldQuiesce returns a channel which will be closed when stop() has been
     754             : // invoked and outstanding goroutines should begin to quiesce.
     755           0 : func (s *stopper) shouldQuiesce() <-chan struct{} {
     756           0 :         return s.quiescer
     757           0 : }
     758             : 
     759           0 : func (s *stopper) stop() {
     760           0 :         close(s.quiescer)
     761           0 :         s.wg.Wait()
     762           0 : }
     763             : 
     764             : // timeSource and tickerI are extracted from CockroachDB's timeutil, with
     765             : // removal of support for Timer, and added support in the manual
     766             : // implementation for reset.
     767             : 
     768             : // timeSource is used to interact with the clock and tickers. Abstracts
     769             : // time.Now and time.NewTicker for testing.
     770             : type timeSource interface {
     771             :         now() time.Time
     772             :         newTicker(duration time.Duration) tickerI
     773             : }
     774             : 
     775             : // tickerI is an interface wrapping time.Ticker.
     776             : type tickerI interface {
     777             :         reset(duration time.Duration)
     778             :         stop()
     779             :         ch() <-chan time.Time
     780             : }
     781             : 
     782             : // defaultTime is a timeSource using the time package.
     783             : type defaultTime struct{}
     784             : 
     785             : var _ timeSource = defaultTime{}
     786             : 
     787           0 : func (defaultTime) now() time.Time {
     788           0 :         return time.Now()
     789           0 : }
     790             : 
     791           0 : func (defaultTime) newTicker(duration time.Duration) tickerI {
     792           0 :         return (*defaultTicker)(time.NewTicker(duration))
     793           0 : }
     794             : 
     795             : // defaultTicker uses time.Ticker.
     796             : type defaultTicker time.Ticker
     797             : 
     798             : var _ tickerI = &defaultTicker{}
     799             : 
     800           0 : func (t *defaultTicker) reset(duration time.Duration) {
     801           0 :         (*time.Ticker)(t).Reset(duration)
     802           0 : }
     803             : 
     804           0 : func (t *defaultTicker) stop() {
     805           0 :         (*time.Ticker)(t).Stop()
     806           0 : }
     807             : 
     808           0 : func (t *defaultTicker) ch() <-chan time.Time {
     809           0 :         return (*time.Ticker)(t).C
     810           0 : }
     811             : 
     812             : // Make lint happy.
     813             : var _ = (*failoverMonitor).noWriter
     814             : var _ = (*failoverManager).writerClosed
     815             : var _ = (&stopper{}).shouldQuiesce

Generated by: LCOV version 1.14