Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "os"
9 : "sync"
10 : "time"
11 :
12 : "github.com/cockroachdb/pebble/internal/base"
13 : "github.com/cockroachdb/pebble/vfs"
14 : "golang.org/x/exp/rand"
15 : )
16 :
17 : // dirProber probes the primary dir, until it is confirmed to be healthy. If
18 : // it doesn't have enough samples, it is deemed to be unhealthy. It is only
19 : // used for failback to the primary.
20 : type dirProber struct {
21 : fs vfs.FS
22 : // The full path of the file to use for the probe. The probe is destructive
23 : // in that it deletes and (re)creates the file.
24 : filename string
25 : // The probing interval, when enabled.
26 : interval time.Duration
27 : timeSource
28 : // buf holds the random bytes that are written during the probe.
29 : buf [100 << 10]byte
30 : // enabled is signaled to enable and disable probing. The initial state of
31 : // the prober is disabled.
32 : enabled chan bool
33 : mu struct {
34 : sync.Mutex
35 : // Circular buffer of history samples.
36 : history [probeHistoryLength]time.Duration
37 : // The history is in [firstProbeIndex, nextProbeIndex).
38 : firstProbeIndex int
39 : nextProbeIndex int
40 : }
41 : iterationForTesting chan<- struct{}
42 : }
43 :
44 : const probeHistoryLength = 128
45 :
46 : // Large value.
47 : const failedProbeDuration = 24 * 60 * 60 * time.Second
48 :
49 : // There is no close/stop method on the dirProber -- use stopper.
50 : func (p *dirProber) init(
51 : fs vfs.FS,
52 : filename string,
53 : interval time.Duration,
54 : stopper *stopper,
55 : ts timeSource,
56 : notifyIterationForTesting chan<- struct{},
57 0 : ) {
58 0 : *p = dirProber{
59 0 : fs: fs,
60 0 : filename: filename,
61 0 : interval: interval,
62 0 : timeSource: ts,
63 0 : enabled: make(chan bool),
64 0 : iterationForTesting: notifyIterationForTesting,
65 0 : }
66 0 : // Random bytes for writing, to defeat any FS compression optimization.
67 0 : _, err := rand.Read(p.buf[:])
68 0 : if err != nil {
69 0 : panic(err)
70 : }
71 0 : stopper.runAsync(func() {
72 0 : p.probeLoop(stopper.shouldQuiesce())
73 0 : })
74 : }
75 :
76 0 : func (p *dirProber) probeLoop(shouldQuiesce <-chan struct{}) {
77 0 : ticker := p.timeSource.newTicker(p.interval)
78 0 : ticker.stop()
79 0 : tickerCh := ticker.ch()
80 0 : done := false
81 0 : var enabled bool
82 0 : for !done {
83 0 : select {
84 0 : case <-tickerCh:
85 0 : if !enabled {
86 0 : // Could have a tick waiting before we disabled it. Ignore.
87 0 : continue
88 : }
89 0 : probeDur := func() time.Duration {
90 0 : // Delete, create, write, sync.
91 0 : start := p.timeSource.now()
92 0 : _ = p.fs.Remove(p.filename)
93 0 : f, err := p.fs.Create(p.filename)
94 0 : if err != nil {
95 0 : return failedProbeDuration
96 0 : }
97 0 : defer f.Close()
98 0 : n, err := f.Write(p.buf[:])
99 0 : if err != nil {
100 0 : return failedProbeDuration
101 0 : }
102 0 : if n != len(p.buf) {
103 0 : panic("invariant violation")
104 : }
105 0 : err = f.Sync()
106 0 : if err != nil {
107 0 : return failedProbeDuration
108 0 : }
109 0 : return p.timeSource.now().Sub(start)
110 : }()
111 0 : p.mu.Lock()
112 0 : nextIndex := p.mu.nextProbeIndex % probeHistoryLength
113 0 : p.mu.history[nextIndex] = probeDur
114 0 : p.mu.nextProbeIndex++
115 0 : numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex
116 0 : if numSamples > probeHistoryLength {
117 0 : // Length has exceeded capacity, i.e., overwritten the first probe.
118 0 : p.mu.firstProbeIndex++
119 0 : }
120 0 : p.mu.Unlock()
121 :
122 0 : case <-shouldQuiesce:
123 0 : done = true
124 :
125 0 : case enabled = <-p.enabled:
126 0 : if enabled {
127 0 : ticker.reset(p.interval)
128 0 : } else {
129 0 : ticker.stop()
130 0 : p.mu.Lock()
131 0 : p.mu.firstProbeIndex = 0
132 0 : p.mu.nextProbeIndex = 0
133 0 : p.mu.Unlock()
134 0 : }
135 : }
136 0 : if p.iterationForTesting != nil {
137 0 : p.iterationForTesting <- struct{}{}
138 0 : }
139 : }
140 : }
141 :
142 0 : func (p *dirProber) enableProbing() {
143 0 : p.enabled <- true
144 0 : }
145 :
146 0 : func (p *dirProber) disableProbing() {
147 0 : p.enabled <- false
148 0 : }
149 :
150 0 : func (p *dirProber) getMeanMax(interval time.Duration) (time.Duration, time.Duration) {
151 0 : p.mu.Lock()
152 0 : defer p.mu.Unlock()
153 0 : numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex
154 0 : samplesNeeded := int((interval + p.interval - 1) / p.interval)
155 0 : if samplesNeeded == 0 {
156 0 : panic("interval is too short")
157 0 : } else if samplesNeeded > probeHistoryLength {
158 0 : panic("interval is too long")
159 : }
160 0 : if samplesNeeded > numSamples {
161 0 : // Not enough samples, so assume not yet healthy.
162 0 : return failedProbeDuration, failedProbeDuration
163 0 : }
164 0 : offset := numSamples - samplesNeeded
165 0 : var sum, max time.Duration
166 0 : for i := p.mu.firstProbeIndex + offset; i < p.mu.nextProbeIndex; i++ {
167 0 : sampleDur := p.mu.history[i%probeHistoryLength]
168 0 : sum += sampleDur
169 0 : if max < sampleDur {
170 0 : max = sampleDur
171 0 : }
172 : }
173 0 : mean := sum / time.Duration(samplesNeeded)
174 0 : return mean, max
175 : }
176 :
177 : type dirIndex int
178 :
179 : const (
180 : primaryDirIndex dirIndex = iota
181 : secondaryDirIndex
182 : numDirIndices
183 : )
184 :
185 : type dirAndFileHandle struct {
186 : Dir
187 : vfs.File
188 : }
189 :
190 : // switchableWriter is a subset of failoverWriter needed by failoverMonitor.
191 : type switchableWriter interface {
192 : switchToNewDir(dirAndFileHandle) error
193 : ongoingLatencyOrErrorForCurDir() (time.Duration, error)
194 : }
195 :
196 : type failoverMonitorOptions struct {
197 : // The primary and secondary dir.
198 : dirs [numDirIndices]dirAndFileHandle
199 :
200 : FailoverOptions
201 : stopper *stopper
202 : }
203 :
204 : // failoverMonitor monitors the latency and error observed by the
205 : // switchableWriter, and does failover by switching the dir. It also monitors
206 : // the primary dir for failback.
207 : type failoverMonitor struct {
208 : opts failoverMonitorOptions
209 : prober dirProber
210 : mu struct {
211 : sync.Mutex
212 : // dirIndex and lastFailbackTime are only modified by monitorLoop. They
213 : // are protected by the mutex for concurrent reads.
214 :
215 : // dirIndex is the directory to use by writer (if non-nil), or when the
216 : // writer becomes non-nil.
217 : dirIndex
218 : // The time at which the monitor last failed back to the primary. This is
219 : // only relevant when dirIndex == primaryDirIndex.
220 : lastFailBackTime time.Time
221 : // The current failoverWriter, exposed via a narrower interface.
222 : writer switchableWriter
223 : }
224 : }
225 :
226 0 : func newFailoverMonitor(opts failoverMonitorOptions) *failoverMonitor {
227 0 : m := &failoverMonitor{
228 0 : opts: opts,
229 0 : }
230 0 : m.prober.init(opts.dirs[primaryDirIndex].FS,
231 0 : opts.dirs[primaryDirIndex].FS.PathJoin(opts.dirs[primaryDirIndex].Dirname, "probe-file"),
232 0 : opts.PrimaryDirProbeInterval, opts.stopper, opts.timeSource, opts.proberIterationForTesting)
233 0 : opts.stopper.runAsync(func() {
234 0 : m.monitorLoop(opts.stopper.shouldQuiesce())
235 0 : })
236 0 : return m
237 : }
238 :
239 : // Called when previous writer is closed
240 0 : func (m *failoverMonitor) noWriter() {
241 0 : m.mu.Lock()
242 0 : defer m.mu.Unlock()
243 0 : m.mu.writer = nil
244 0 : }
245 :
246 : // writerCreateFunc is allowed to return nil, if there is an error. It is not
247 : // the responsibility of failoverMonitor to handle that error. So this should
248 : // not be due to an IO error (which failoverMonitor is interested in).
249 0 : func (m *failoverMonitor) newWriter(writerCreateFunc func(dir dirAndFileHandle) switchableWriter) {
250 0 : m.mu.Lock()
251 0 : defer m.mu.Unlock()
252 0 : if m.mu.writer != nil {
253 0 : panic("previous writer not closed")
254 : }
255 0 : m.mu.writer = writerCreateFunc(m.opts.dirs[m.mu.dirIndex])
256 : }
257 :
258 0 : func (m *failoverMonitor) elevateWriteStallThresholdForFailover() bool {
259 0 : m.mu.Lock()
260 0 : defer m.mu.Unlock()
261 0 : if m.mu.dirIndex == secondaryDirIndex {
262 0 : return true
263 0 : }
264 0 : intervalSinceFailedback := m.opts.timeSource.now().Sub(m.mu.lastFailBackTime)
265 0 : return intervalSinceFailedback < m.opts.ElevatedWriteStallThresholdLag
266 : }
267 :
268 : // lastWriterInfo is state maintained in the monitorLoop for the latest
269 : // switchable writer. It is mainly used to dampen the switching.
270 : type lastWriterInfo struct {
271 : writer switchableWriter
272 : numSwitches int
273 : ongoingLatencyAtSwitch time.Duration
274 : errorCounts [numDirIndices]int
275 : }
276 :
277 0 : func (m *failoverMonitor) monitorLoop(shouldQuiesce <-chan struct{}) {
278 0 : ticker := m.opts.timeSource.newTicker(m.opts.UnhealthySamplingInterval)
279 0 : if m.opts.monitorIterationForTesting != nil {
280 0 : m.opts.monitorIterationForTesting <- struct{}{}
281 0 : }
282 0 : tickerCh := ticker.ch()
283 0 : dirIndex := primaryDirIndex
284 0 : var lastWriter lastWriterInfo
285 0 : for {
286 0 : select {
287 0 : case <-shouldQuiesce:
288 0 : ticker.stop()
289 0 : return
290 0 : case <-tickerCh:
291 0 : writerOngoingLatency, writerErr := func() (time.Duration, error) {
292 0 : m.mu.Lock()
293 0 : defer m.mu.Unlock()
294 0 : if m.mu.writer != lastWriter.writer {
295 0 : lastWriter = lastWriterInfo{writer: m.mu.writer}
296 0 : }
297 0 : if lastWriter.writer == nil {
298 0 : return 0, nil
299 0 : }
300 0 : return lastWriter.writer.ongoingLatencyOrErrorForCurDir()
301 : }()
302 0 : switchDir := false
303 0 : // Arbitrary value.
304 0 : const highSecondaryErrorCountThreshold = 2
305 0 : // We don't consider a switch if currently using the primary dir and the
306 0 : // secondary dir has high enough errors. It is more likely that someone
307 0 : // has misconfigured a secondary e.g. wrong permissions or not enough
308 0 : // disk space. We only remember the error history in the context of the
309 0 : // lastWriter since an operator can fix the underlying misconfiguration.
310 0 : if !(lastWriter.errorCounts[secondaryDirIndex] >= highSecondaryErrorCountThreshold &&
311 0 : dirIndex == primaryDirIndex) {
312 0 : // Switching heuristics. Subject to change based on real world experience.
313 0 : if writerErr != nil {
314 0 : // An error causes an immediate switch, since a LogWriter with an
315 0 : // error is useless.
316 0 : lastWriter.errorCounts[dirIndex]++
317 0 : switchDir = true
318 0 : } else if writerOngoingLatency > m.opts.UnhealthyOperationLatencyThreshold() {
319 0 : // Arbitrary value.
320 0 : const switchImmediatelyCountThreshold = 2
321 0 : // High latency. Switch immediately if the number of switches that
322 0 : // have been done is below the threshold, since that gives us an
323 0 : // observation of both dirs. Once above that threshold, decay the
324 0 : // switch rate by increasing the observed latency needed for a
325 0 : // switch.
326 0 : if lastWriter.numSwitches < switchImmediatelyCountThreshold ||
327 0 : writerOngoingLatency > 2*lastWriter.ongoingLatencyAtSwitch {
328 0 : switchDir = true
329 0 : lastWriter.ongoingLatencyAtSwitch = writerOngoingLatency
330 0 : }
331 : // Else high latency, but not high enough yet to motivate switch.
332 0 : } else if dirIndex == secondaryDirIndex {
333 0 : // The writer looks healthy. We can still switch if the writer is using the
334 0 : // secondary dir and the primary is healthy again.
335 0 : primaryMean, primaryMax := m.prober.getMeanMax(m.opts.HealthyInterval)
336 0 : if primaryMean < m.opts.HealthyProbeLatencyThreshold &&
337 0 : primaryMax < m.opts.HealthyProbeLatencyThreshold {
338 0 : switchDir = true
339 0 : }
340 : }
341 : }
342 0 : if switchDir {
343 0 : lastWriter.numSwitches++
344 0 : if dirIndex == secondaryDirIndex {
345 0 : // Switching back to primary, so don't need to probe to see if
346 0 : // primary is healthy.
347 0 : m.prober.disableProbing()
348 0 : dirIndex = primaryDirIndex
349 0 : } else {
350 0 : m.prober.enableProbing()
351 0 : dirIndex = secondaryDirIndex
352 0 : }
353 0 : dir := m.opts.dirs[dirIndex]
354 0 : m.mu.Lock()
355 0 : m.mu.dirIndex = dirIndex
356 0 : if dirIndex == primaryDirIndex {
357 0 : m.mu.lastFailBackTime = m.opts.timeSource.now()
358 0 : }
359 0 : if m.mu.writer != nil {
360 0 : m.mu.writer.switchToNewDir(dir)
361 0 : }
362 0 : m.mu.Unlock()
363 : }
364 : }
365 0 : if m.opts.monitorStateForTesting != nil {
366 0 : m.opts.monitorStateForTesting(lastWriter.numSwitches, lastWriter.ongoingLatencyAtSwitch)
367 0 : }
368 0 : if m.opts.monitorIterationForTesting != nil {
369 0 : m.opts.monitorIterationForTesting <- struct{}{}
370 0 : }
371 : }
372 : }
373 :
374 0 : func (o *FailoverOptions) ensureDefaults() {
375 0 : if o.PrimaryDirProbeInterval == 0 {
376 0 : o.PrimaryDirProbeInterval = time.Second
377 0 : }
378 0 : if o.HealthyProbeLatencyThreshold == 0 {
379 0 : o.HealthyProbeLatencyThreshold = 100 * time.Millisecond
380 0 : }
381 0 : if o.HealthyInterval == 0 {
382 0 : o.HealthyInterval = 2 * time.Minute
383 0 : }
384 0 : if o.UnhealthySamplingInterval == 0 {
385 0 : o.UnhealthySamplingInterval = 100 * time.Millisecond
386 0 : }
387 0 : if o.UnhealthyOperationLatencyThreshold == nil {
388 0 : o.UnhealthyOperationLatencyThreshold = func() time.Duration {
389 0 : return 200 * time.Millisecond
390 0 : }
391 : }
392 : }
393 :
394 : type logicalLogWithSizesEtc struct {
395 : num NumWAL
396 : segments []segmentWithSizeEtc
397 : }
398 :
399 : type segmentWithSizeEtc struct {
400 : segment
401 : approxFileSize uint64
402 : synchronouslyClosed bool
403 : }
404 :
405 : type failoverManager struct {
406 : opts Options
407 : // TODO(jackson/sumeer): read-path etc.
408 :
409 : dirHandles [numDirIndices]vfs.File
410 : stopper *stopper
411 : monitor *failoverMonitor
412 : mu struct {
413 : sync.Mutex
414 : closedWALs []logicalLogWithSizesEtc
415 : ww *failoverWriter
416 : }
417 : recycler LogRecycler
418 : // Due to async creation of files in failoverWriter, multiple goroutines can
419 : // concurrently try to get a file from the recycler. This mutex protects the
420 : // logRecycler.{Peek,Pop} pair.
421 : recyclerPeekPopMu sync.Mutex
422 : }
423 :
424 : var _ Manager = &failoverManager{}
425 :
426 : // TODO(sumeer):
427 : // - log deletion: if record.LogWriter did not close yet, the cleaner may
428 : // get an error when deleting or renaming (only under windows?).
429 :
430 : // Init implements Manager.
431 0 : func (wm *failoverManager) Init(o Options, initial Logs) error {
432 0 : if o.timeSource == nil {
433 0 : o.timeSource = defaultTime{}
434 0 : }
435 0 : o.FailoverOptions.ensureDefaults()
436 0 : stopper := newStopper()
437 0 : var dirs [numDirIndices]dirAndFileHandle
438 0 : for i, dir := range []Dir{o.Primary, o.Secondary} {
439 0 : dirs[i].Dir = dir
440 0 : f, err := dir.FS.OpenDir(dir.Dirname)
441 0 : if err != nil {
442 0 : return err
443 0 : }
444 0 : dirs[i].File = f
445 : }
446 0 : fmOpts := failoverMonitorOptions{
447 0 : dirs: dirs,
448 0 : FailoverOptions: o.FailoverOptions,
449 0 : stopper: stopper,
450 0 : }
451 0 : monitor := newFailoverMonitor(fmOpts)
452 0 : *wm = failoverManager{
453 0 : opts: o,
454 0 : dirHandles: [numDirIndices]vfs.File{dirs[primaryDirIndex].File, dirs[secondaryDirIndex].File},
455 0 : stopper: stopper,
456 0 : monitor: monitor,
457 0 : }
458 0 : wm.recycler.Init(o.MaxNumRecyclableLogs)
459 0 : for _, ll := range initial {
460 0 : llse := logicalLogWithSizesEtc{
461 0 : num: ll.Num,
462 0 : }
463 0 : if wm.recycler.MinRecycleLogNum() <= ll.Num {
464 0 : wm.recycler.SetMinRecycleLogNum(ll.Num + 1)
465 0 : }
466 0 : for i, s := range ll.segments {
467 0 : fs, path := ll.SegmentLocation(i)
468 0 : stat, err := fs.Stat(path)
469 0 : if err != nil {
470 0 : return err
471 0 : }
472 0 : llse.segments = append(llse.segments, segmentWithSizeEtc{
473 0 : segment: s,
474 0 : approxFileSize: uint64(stat.Size()),
475 0 : synchronouslyClosed: true,
476 0 : })
477 : }
478 0 : wm.mu.closedWALs = append(wm.mu.closedWALs, llse)
479 : }
480 0 : return nil
481 : }
482 :
483 : // List implements Manager.
484 0 : func (wm *failoverManager) List() (Logs, error) {
485 0 : wm.mu.Lock()
486 0 : defer wm.mu.Unlock()
487 0 : n := len(wm.mu.closedWALs)
488 0 : if wm.mu.ww != nil {
489 0 : n++
490 0 : }
491 0 : wals := make(Logs, n)
492 0 : setLogicalLog := func(index int, llse logicalLogWithSizesEtc) {
493 0 : segments := make([]segment, len(llse.segments))
494 0 : for j := range llse.segments {
495 0 : segments[j] = llse.segments[j].segment
496 0 : }
497 0 : wals[index] = LogicalLog{
498 0 : Num: llse.num,
499 0 : segments: segments,
500 0 : }
501 : }
502 0 : for i, llse := range wm.mu.closedWALs {
503 0 : setLogicalLog(i, llse)
504 0 : }
505 0 : if wm.mu.ww != nil {
506 0 : setLogicalLog(n-1, wm.mu.ww.getLog())
507 0 : }
508 0 : return wals, nil
509 : }
510 :
511 : // Obsolete implements Manager.
512 : func (wm *failoverManager) Obsolete(
513 : minUnflushedNum NumWAL, noRecycle bool,
514 0 : ) (toDelete []DeletableLog, err error) {
515 0 : wm.mu.Lock()
516 0 : defer wm.mu.Unlock()
517 0 : i := 0
518 0 : for ; i < len(wm.mu.closedWALs); i++ {
519 0 : ll := wm.mu.closedWALs[i]
520 0 : if ll.num >= minUnflushedNum {
521 0 : break
522 : }
523 : // Recycle only the primary at logNameIndex=0, if there was no failover,
524 : // and synchronously closed. It may not be safe to recycle a file that is
525 : // still being written to. And recycling when there was a failover may
526 : // fill up the recycler with smaller log files. The restriction regarding
527 : // logNameIndex=0 is because logRecycler.Peek only exposes the
528 : // DiskFileNum, and we need to use that to construct the path -- we could
529 : // remove this restriction by changing the logRecycler interface, but we
530 : // don't bother.
531 0 : canRecycle := !noRecycle && len(ll.segments) == 1 && ll.segments[0].synchronouslyClosed &&
532 0 : ll.segments[0].logNameIndex == 0 &&
533 0 : ll.segments[0].dir == wm.opts.Primary
534 0 : if !canRecycle || !wm.recycler.Add(base.FileInfo{
535 0 : FileNum: base.DiskFileNum(ll.num),
536 0 : FileSize: ll.segments[0].approxFileSize,
537 0 : }) {
538 0 : for _, s := range ll.segments {
539 0 : toDelete = append(toDelete, DeletableLog{
540 0 : FS: s.dir.FS,
541 0 : Path: s.dir.FS.PathJoin(s.dir.Dirname, makeLogFilename(ll.num, s.logNameIndex)),
542 0 : NumWAL: ll.num,
543 0 : ApproxFileSize: s.approxFileSize,
544 0 : })
545 0 : }
546 : }
547 : }
548 0 : wm.mu.closedWALs = wm.mu.closedWALs[i:]
549 0 : return toDelete, nil
550 : }
551 :
552 : // Create implements Manager.
553 0 : func (wm *failoverManager) Create(wn NumWAL, jobID int) (Writer, error) {
554 0 : func() {
555 0 : wm.mu.Lock()
556 0 : defer wm.mu.Unlock()
557 0 : if wm.mu.ww != nil {
558 0 : panic("previous wal.Writer not closed")
559 : }
560 : }()
561 0 : fwOpts := failoverWriterOpts{
562 0 : wn: wn,
563 0 : logger: wm.opts.Logger,
564 0 : timeSource: wm.opts.timeSource,
565 0 : jobID: jobID,
566 0 : logCreator: wm.logCreator,
567 0 : noSyncOnClose: wm.opts.NoSyncOnClose,
568 0 : bytesPerSync: wm.opts.BytesPerSync,
569 0 : preallocateSize: wm.opts.PreallocateSize,
570 0 : minSyncInterval: wm.opts.MinSyncInterval,
571 0 : fsyncLatency: wm.opts.FsyncLatency,
572 0 : queueSemChan: wm.opts.QueueSemChan,
573 0 : stopper: wm.stopper,
574 0 : writerClosed: wm.writerClosed,
575 0 : writerCreatedForTest: wm.opts.logWriterCreatedForTesting,
576 0 : }
577 0 : var err error
578 0 : var ww *failoverWriter
579 0 : writerCreateFunc := func(dir dirAndFileHandle) switchableWriter {
580 0 : ww, err = newFailoverWriter(fwOpts, dir)
581 0 : if err != nil {
582 0 : return nil
583 0 : }
584 0 : return ww
585 : }
586 0 : wm.monitor.newWriter(writerCreateFunc)
587 0 : if ww != nil {
588 0 : wm.mu.Lock()
589 0 : defer wm.mu.Unlock()
590 0 : wm.mu.ww = ww
591 0 : }
592 0 : return ww, err
593 : }
594 :
595 : // ElevateWriteStallThresholdForFailover implements Manager.
596 0 : func (wm *failoverManager) ElevateWriteStallThresholdForFailover() bool {
597 0 : return wm.monitor.elevateWriteStallThresholdForFailover()
598 0 : }
599 :
600 0 : func (wm *failoverManager) writerClosed(llse logicalLogWithSizesEtc) {
601 0 : wm.monitor.noWriter()
602 0 : wm.mu.Lock()
603 0 : defer wm.mu.Unlock()
604 0 : wm.mu.closedWALs = append(wm.mu.closedWALs, llse)
605 0 : wm.mu.ww = nil
606 0 : }
607 :
608 : // Stats implements Manager.
609 0 : func (wm *failoverManager) Stats() Stats {
610 0 : recycledLogsCount, recycledLogSize := wm.recycler.Stats()
611 0 : wm.mu.Lock()
612 0 : defer wm.mu.Unlock()
613 0 : var liveFileCount int
614 0 : var liveFileSize uint64
615 0 : updateStats := func(segments []segmentWithSizeEtc) {
616 0 : for _, s := range segments {
617 0 : liveFileCount++
618 0 : liveFileSize += s.approxFileSize
619 0 : }
620 : }
621 0 : for _, llse := range wm.mu.closedWALs {
622 0 : updateStats(llse.segments)
623 0 : }
624 0 : if wm.mu.ww != nil {
625 0 : updateStats(wm.mu.ww.getLog().segments)
626 0 : }
627 0 : return Stats{
628 0 : ObsoleteFileCount: recycledLogsCount,
629 0 : ObsoleteFileSize: recycledLogSize,
630 0 : LiveFileCount: liveFileCount,
631 0 : LiveFileSize: liveFileSize,
632 0 : }
633 : }
634 :
635 : // Close implements Manager.
636 0 : func (wm *failoverManager) Close() error {
637 0 : wm.stopper.stop()
638 0 : // Since all goroutines are stopped, can close the dirs.
639 0 : var err error
640 0 : for _, f := range wm.dirHandles {
641 0 : err = firstError(err, f.Close())
642 0 : }
643 0 : return err
644 : }
645 :
646 : // RecyclerForTesting implements Manager.
647 0 : func (wm *failoverManager) RecyclerForTesting() *LogRecycler {
648 0 : return nil
649 0 : }
650 :
651 : // logCreator implements the logCreator func type.
652 : func (wm *failoverManager) logCreator(
653 : dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
654 0 : ) (logFile vfs.File, initialFileSize uint64, err error) {
655 0 : logFilename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li))
656 0 : isPrimary := dir == wm.opts.Primary
657 0 : // Only recycling when logNameIndex is 0 is a somewhat arbitrary choice.
658 0 : considerRecycle := li == 0 && isPrimary
659 0 : createInfo := CreateInfo{
660 0 : JobID: jobID,
661 0 : Path: logFilename,
662 0 : IsSecondary: !isPrimary,
663 0 : Num: wn,
664 0 : Err: nil,
665 0 : }
666 0 : defer func() {
667 0 : createInfo.Err = err
668 0 : if wm.opts.EventListener != nil {
669 0 : wm.opts.EventListener.LogCreated(createInfo)
670 0 : }
671 : }()
672 0 : if considerRecycle {
673 0 : // Try to use a recycled log file. Recycling log files is an important
674 0 : // performance optimization as it is faster to sync a file that has
675 0 : // already been written, than one which is being written for the first
676 0 : // time. This is due to the need to sync file metadata when a file is
677 0 : // being written for the first time. Note this is true even if file
678 0 : // preallocation is performed (e.g. fallocate).
679 0 : var recycleLog base.FileInfo
680 0 : var recycleOK bool
681 0 : func() {
682 0 : wm.recyclerPeekPopMu.Lock()
683 0 : defer wm.recyclerPeekPopMu.Unlock()
684 0 : recycleLog, recycleOK = wm.recycler.Peek()
685 0 : if recycleOK {
686 0 : if err = wm.recycler.Pop(recycleLog.FileNum); err != nil {
687 0 : panic(err)
688 : }
689 : }
690 : }()
691 0 : if recycleOK {
692 0 : createInfo.RecycledFileNum = recycleLog.FileNum
693 0 : recycleLogName := dir.FS.PathJoin(dir.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0))
694 0 : r.writeStart()
695 0 : logFile, err = dir.FS.ReuseForWrite(recycleLogName, logFilename)
696 0 : r.writeEnd(err)
697 0 : // TODO(sumeer): should we fatal since primary dir? At some point it is
698 0 : // better to fatal instead of continuing to failover.
699 0 : // base.MustExist(dir.FS, logFilename, wm.opts.Logger, err)
700 0 : if err != nil {
701 0 : // TODO(sumeer): we have popped from the logRecycler, which is
702 0 : // arguably correct, since we don't want to keep trying to reuse a log
703 0 : // that causes some error. But the original or new file may exist, and
704 0 : // no one will clean it up unless the process restarts.
705 0 : return nil, 0, err
706 0 : }
707 : // Figure out the recycled WAL size. This Stat is necessary because
708 : // ReuseForWrite's contract allows for removing the old file and
709 : // creating a new one. We don't know whether the WAL was actually
710 : // recycled.
711 : //
712 : // TODO(jackson): Adding a boolean to the ReuseForWrite return value
713 : // indicating whether or not the file was actually reused would allow us
714 : // to skip the stat and use recycleLog.FileSize.
715 0 : var finfo os.FileInfo
716 0 : finfo, err = logFile.Stat()
717 0 : if err != nil {
718 0 : logFile.Close()
719 0 : return nil, 0, err
720 0 : }
721 0 : initialFileSize = uint64(finfo.Size())
722 0 : return logFile, initialFileSize, nil
723 : }
724 : }
725 : // Did not recycle.
726 : //
727 : // Create file.
728 0 : r.writeStart()
729 0 : logFile, err = dir.FS.Create(logFilename)
730 0 : r.writeEnd(err)
731 0 : return logFile, 0, err
732 : }
733 :
734 : type stopper struct {
735 : quiescer chan struct{} // Closed when quiescing
736 : wg sync.WaitGroup
737 : }
738 :
739 0 : func newStopper() *stopper {
740 0 : return &stopper{
741 0 : quiescer: make(chan struct{}),
742 0 : }
743 0 : }
744 :
745 0 : func (s *stopper) runAsync(f func()) {
746 0 : s.wg.Add(1)
747 0 : go func() {
748 0 : f()
749 0 : s.wg.Done()
750 0 : }()
751 : }
752 :
753 : // shouldQuiesce returns a channel which will be closed when stop() has been
754 : // invoked and outstanding goroutines should begin to quiesce.
755 0 : func (s *stopper) shouldQuiesce() <-chan struct{} {
756 0 : return s.quiescer
757 0 : }
758 :
759 0 : func (s *stopper) stop() {
760 0 : close(s.quiescer)
761 0 : s.wg.Wait()
762 0 : }
763 :
764 : // timeSource and tickerI are extracted from CockroachDB's timeutil, with
765 : // removal of support for Timer, and added support in the manual
766 : // implementation for reset.
767 :
768 : // timeSource is used to interact with the clock and tickers. Abstracts
769 : // time.Now and time.NewTicker for testing.
770 : type timeSource interface {
771 : now() time.Time
772 : newTicker(duration time.Duration) tickerI
773 : }
774 :
775 : // tickerI is an interface wrapping time.Ticker.
776 : type tickerI interface {
777 : reset(duration time.Duration)
778 : stop()
779 : ch() <-chan time.Time
780 : }
781 :
782 : // defaultTime is a timeSource using the time package.
783 : type defaultTime struct{}
784 :
785 : var _ timeSource = defaultTime{}
786 :
787 0 : func (defaultTime) now() time.Time {
788 0 : return time.Now()
789 0 : }
790 :
791 0 : func (defaultTime) newTicker(duration time.Duration) tickerI {
792 0 : return (*defaultTicker)(time.NewTicker(duration))
793 0 : }
794 :
795 : // defaultTicker uses time.Ticker.
796 : type defaultTicker time.Ticker
797 :
798 : var _ tickerI = &defaultTicker{}
799 :
800 0 : func (t *defaultTicker) reset(duration time.Duration) {
801 0 : (*time.Ticker)(t).Reset(duration)
802 0 : }
803 :
804 0 : func (t *defaultTicker) stop() {
805 0 : (*time.Ticker)(t).Stop()
806 0 : }
807 :
808 0 : func (t *defaultTicker) ch() <-chan time.Time {
809 0 : return (*time.Ticker)(t).C
810 0 : }
811 :
812 : // Make lint happy.
813 : var _ = (*failoverMonitor).noWriter
814 : var _ = (*failoverManager).writerClosed
815 : var _ = (&stopper{}).shouldQuiesce
|