Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "math/rand/v2"
11 : "os"
12 : "sync"
13 : "time"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/vfs"
18 : )
19 :
20 : // dirProber probes the primary dir, until it is confirmed to be healthy. If
21 : // it doesn't have enough samples, it is deemed to be unhealthy. It is only
22 : // used for failback to the primary.
23 : type dirProber struct {
24 : fs vfs.FS
25 : // The full path of the file to use for the probe. The probe is destructive
26 : // in that it deletes and (re)creates the file.
27 : filename string
28 : // The probing interval, when enabled.
29 : interval time.Duration
30 : timeSource
31 : // buf holds the random bytes that are written during the probe.
32 : buf [100 << 10]byte
33 : // enabled is signaled to enable and disable probing. The initial state of
34 : // the prober is disabled.
35 : enabled chan bool
36 : mu struct {
37 : sync.Mutex
38 : // Circular buffer of history samples.
39 : history [probeHistoryLength]time.Duration
40 : // The history is in [firstProbeIndex, nextProbeIndex).
41 : firstProbeIndex int
42 : nextProbeIndex int
43 : }
44 : iterationForTesting chan<- struct{}
45 : }
46 :
47 : const probeHistoryLength = 128
48 :
49 : // Large value.
50 : const failedProbeDuration = 24 * 60 * 60 * time.Second
51 :
52 : // init takes a stopper in order to connect the dirProber's long-running
53 : // goroutines with the stopper's wait group, but the dirProber has its own
54 : // stop() method that should be invoked to trigger the shutdown.
55 : func (p *dirProber) init(
56 : fs vfs.FS,
57 : filename string,
58 : interval time.Duration,
59 : stopper *stopper,
60 : ts timeSource,
61 : notifyIterationForTesting chan<- struct{},
62 2 : ) {
63 2 : *p = dirProber{
64 2 : fs: fs,
65 2 : filename: filename,
66 2 : interval: interval,
67 2 : timeSource: ts,
68 2 : enabled: make(chan bool),
69 2 : iterationForTesting: notifyIterationForTesting,
70 2 : }
71 2 : // Random bytes for writing, to defeat any FS compression optimization.
72 2 : for i := range p.buf {
73 2 : p.buf[i] = byte(rand.Uint32())
74 2 : }
75 : // dirProber has an explicit stop() method instead of listening on
76 : // stopper.shouldQuiesce. This structure helps negotiate the shutdown
77 : // between failoverMonitor and dirProber. If the dirProber independently
78 : // listened to shouldQuiesce, it might exit before the failover monitor. The
79 : // [enable|disable]Probing methods require sending on a channel expecting
80 : // that the dirProber goroutine will receive on the same channel. If the
81 : // dirProber noticed the queiscence and exited first, the failoverMonitor
82 : // could deadlock waiting for a goroutine that no longer exists.
83 : //
84 : // Instead, shutdown of the dirProber is coordinated by the failoverMonitor:
85 : // - the failoverMonitor listens to stopper.shouldQuiesce.
86 : // - when the failoverMonitor is exiting, it will no longer attempt to
87 : // interact with the dirProber. Only then does it invoke dirProber.stop.
88 2 : stopper.runAsync(p.probeLoop)
89 : }
90 :
91 2 : func (p *dirProber) probeLoop() {
92 2 : ticker := p.timeSource.newTicker(p.interval)
93 2 : ticker.stop()
94 2 : tickerCh := ticker.ch()
95 2 : shouldContinue := true
96 2 : var enabled bool
97 2 : for shouldContinue {
98 2 : select {
99 2 : case <-tickerCh:
100 2 : if !enabled {
101 2 : // Could have a tick waiting before we disabled it. Ignore.
102 2 : continue
103 : }
104 2 : probeDur := func() time.Duration {
105 2 : // Delete, create, write, sync.
106 2 : start := p.timeSource.now()
107 2 : _ = p.fs.Remove(p.filename)
108 2 : f, err := p.fs.Create(p.filename, "pebble-wal")
109 2 : if err != nil {
110 1 : return failedProbeDuration
111 1 : }
112 2 : defer f.Close()
113 2 : n, err := f.Write(p.buf[:])
114 2 : if err != nil {
115 0 : return failedProbeDuration
116 0 : }
117 2 : if n != len(p.buf) {
118 0 : panic("invariant violation")
119 : }
120 2 : err = f.Sync()
121 2 : if err != nil {
122 0 : return failedProbeDuration
123 0 : }
124 2 : return p.timeSource.now().Sub(start)
125 : }()
126 2 : p.mu.Lock()
127 2 : nextIndex := p.mu.nextProbeIndex % probeHistoryLength
128 2 : p.mu.history[nextIndex] = probeDur
129 2 : p.mu.nextProbeIndex++
130 2 : numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex
131 2 : if numSamples > probeHistoryLength {
132 1 : // Length has exceeded capacity, i.e., overwritten the first probe.
133 1 : p.mu.firstProbeIndex++
134 1 : }
135 2 : p.mu.Unlock()
136 :
137 2 : case enabled, shouldContinue = <-p.enabled:
138 2 : if !enabled || !shouldContinue {
139 2 : ticker.stop()
140 2 : p.mu.Lock()
141 2 : p.mu.firstProbeIndex = 0
142 2 : p.mu.nextProbeIndex = 0
143 2 : p.mu.Unlock()
144 2 : } else {
145 2 : ticker.reset(p.interval)
146 2 : }
147 : }
148 2 : if p.iterationForTesting != nil {
149 1 : p.iterationForTesting <- struct{}{}
150 1 : }
151 : }
152 : }
153 :
154 2 : func (p *dirProber) enableProbing() {
155 2 : p.enabled <- true
156 2 : }
157 :
158 2 : func (p *dirProber) disableProbing() {
159 2 : p.enabled <- false
160 2 : }
161 :
162 2 : func (p *dirProber) stop() {
163 2 : close(p.enabled)
164 2 : }
165 :
166 2 : func (p *dirProber) getMeanMax(interval time.Duration) (time.Duration, time.Duration) {
167 2 : p.mu.Lock()
168 2 : defer p.mu.Unlock()
169 2 : numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex
170 2 : samplesNeeded := int((interval + p.interval - 1) / p.interval)
171 2 : if samplesNeeded == 0 {
172 0 : panic("interval is too short")
173 2 : } else if samplesNeeded > probeHistoryLength {
174 0 : panic("interval is too long")
175 : }
176 2 : if samplesNeeded > numSamples {
177 2 : // Not enough samples, so assume not yet healthy.
178 2 : return failedProbeDuration, failedProbeDuration
179 2 : }
180 2 : offset := numSamples - samplesNeeded
181 2 : var sum, max time.Duration
182 2 : for i := p.mu.firstProbeIndex + offset; i < p.mu.nextProbeIndex; i++ {
183 2 : sampleDur := p.mu.history[i%probeHistoryLength]
184 2 : sum += sampleDur
185 2 : if max < sampleDur {
186 2 : max = sampleDur
187 2 : }
188 : }
189 2 : mean := sum / time.Duration(samplesNeeded)
190 2 : return mean, max
191 : }
192 :
193 : type dirIndex int
194 :
195 : const (
196 : primaryDirIndex dirIndex = iota
197 : secondaryDirIndex
198 : numDirIndices
199 : )
200 :
201 : type dirAndFileHandle struct {
202 : Dir
203 : vfs.File
204 : }
205 :
206 : // switchableWriter is a subset of failoverWriter needed by failoverMonitor.
207 : type switchableWriter interface {
208 : switchToNewDir(dirAndFileHandle) error
209 : ongoingLatencyOrErrorForCurDir() (time.Duration, error)
210 : }
211 :
212 : type failoverMonitorOptions struct {
213 : // The primary and secondary dir.
214 : dirs [numDirIndices]dirAndFileHandle
215 :
216 : FailoverOptions
217 : stopper *stopper
218 : }
219 :
220 : // failoverMonitor monitors the latency and error observed by the
221 : // switchableWriter, and does failover by switching the dir. It also monitors
222 : // the primary dir for failback.
223 : type failoverMonitor struct {
224 : opts failoverMonitorOptions
225 : prober dirProber
226 : mu struct {
227 : sync.Mutex
228 : // dirIndex and lastFailbackTime are only modified by monitorLoop. They
229 : // are protected by the mutex for concurrent reads.
230 :
231 : // dirIndex is the directory to use by writer (if non-nil), or when the
232 : // writer becomes non-nil.
233 : dirIndex
234 : // The time at which the monitor last failed back to the primary. This is
235 : // only relevant when dirIndex == primaryDirIndex.
236 : lastFailBackTime time.Time
237 : // The current failoverWriter, exposed via a narrower interface.
238 : writer switchableWriter
239 :
240 : // Stats.
241 : dirSwitchCount int64
242 : lastAccumulateIntoDurations time.Time
243 : primaryWriteDuration time.Duration
244 : secondaryWriteDuration time.Duration
245 : }
246 : }
247 :
248 2 : func newFailoverMonitor(opts failoverMonitorOptions) *failoverMonitor {
249 2 : m := &failoverMonitor{
250 2 : opts: opts,
251 2 : }
252 2 : m.mu.lastAccumulateIntoDurations = opts.timeSource.now()
253 2 : m.prober.init(opts.dirs[primaryDirIndex].FS,
254 2 : opts.dirs[primaryDirIndex].FS.PathJoin(opts.dirs[primaryDirIndex].Dirname, "probe-file"),
255 2 : opts.PrimaryDirProbeInterval, opts.stopper, opts.timeSource, opts.proberIterationForTesting)
256 2 : opts.stopper.runAsync(func() {
257 2 : m.monitorLoop(opts.stopper.shouldQuiesce())
258 2 : })
259 2 : return m
260 : }
261 :
262 : // Called when previous writer is closed
263 2 : func (m *failoverMonitor) noWriter() {
264 2 : now := m.opts.timeSource.now()
265 2 : m.mu.Lock()
266 2 : defer m.mu.Unlock()
267 2 : m.mu.writer = nil
268 2 : m.accumulateDurationLocked(now)
269 2 : }
270 :
271 : // writerCreateFunc is allowed to return nil, if there is an error. It is not
272 : // the responsibility of failoverMonitor to handle that error. So this should
273 : // not be due to an IO error (which failoverMonitor is interested in).
274 2 : func (m *failoverMonitor) newWriter(writerCreateFunc func(dir dirAndFileHandle) switchableWriter) {
275 2 : m.mu.Lock()
276 2 : defer m.mu.Unlock()
277 2 : if m.mu.writer != nil {
278 0 : panic("previous writer not closed")
279 : }
280 2 : m.mu.writer = writerCreateFunc(m.opts.dirs[m.mu.dirIndex])
281 : }
282 :
283 2 : func (m *failoverMonitor) elevateWriteStallThresholdForFailover() bool {
284 2 : m.mu.Lock()
285 2 : defer m.mu.Unlock()
286 2 : if m.mu.dirIndex == secondaryDirIndex {
287 2 : return true
288 2 : }
289 2 : intervalSinceFailedback := m.opts.timeSource.now().Sub(m.mu.lastFailBackTime)
290 2 : return intervalSinceFailedback < m.opts.ElevatedWriteStallThresholdLag
291 : }
292 :
293 2 : func (m *failoverMonitor) accumulateDurationLocked(now time.Time) {
294 2 : dur := now.Sub(m.mu.lastAccumulateIntoDurations)
295 2 : m.mu.lastAccumulateIntoDurations = now
296 2 : if m.mu.dirIndex == primaryDirIndex {
297 2 : m.mu.primaryWriteDuration += dur
298 2 : return
299 2 : }
300 2 : m.mu.secondaryWriteDuration += dur
301 : }
302 :
303 2 : func (m *failoverMonitor) stats() FailoverStats {
304 2 : now := m.opts.timeSource.now()
305 2 : m.mu.Lock()
306 2 : defer m.mu.Unlock()
307 2 : m.accumulateDurationLocked(now)
308 2 : return FailoverStats{
309 2 : DirSwitchCount: m.mu.dirSwitchCount,
310 2 : PrimaryWriteDuration: m.mu.primaryWriteDuration,
311 2 : SecondaryWriteDuration: m.mu.secondaryWriteDuration,
312 2 : }
313 2 : }
314 :
315 : // lastWriterInfo is state maintained in the monitorLoop for the latest
316 : // switchable writer. It is mainly used to dampen the switching.
317 : type lastWriterInfo struct {
318 : writer switchableWriter
319 : numSwitches int
320 : ongoingLatencyAtSwitch time.Duration
321 : errorCounts [numDirIndices]int
322 : }
323 :
324 2 : func (m *failoverMonitor) monitorLoop(shouldQuiesce <-chan struct{}) {
325 2 : ticker := m.opts.timeSource.newTicker(m.opts.UnhealthySamplingInterval)
326 2 : if m.opts.monitorIterationForTesting != nil {
327 1 : m.opts.monitorIterationForTesting <- struct{}{}
328 1 : }
329 2 : tickerCh := ticker.ch()
330 2 : dirIndex := primaryDirIndex
331 2 : var lastWriter lastWriterInfo
332 2 : for {
333 2 : select {
334 2 : case <-shouldQuiesce:
335 2 : ticker.stop()
336 2 : m.prober.stop()
337 2 : return
338 2 : case <-tickerCh:
339 2 : writerOngoingLatency, writerErr := func() (time.Duration, error) {
340 2 : m.mu.Lock()
341 2 : defer m.mu.Unlock()
342 2 : if m.mu.writer != lastWriter.writer {
343 2 : lastWriter = lastWriterInfo{writer: m.mu.writer}
344 2 : }
345 2 : if lastWriter.writer == nil {
346 2 : return 0, nil
347 2 : }
348 2 : return lastWriter.writer.ongoingLatencyOrErrorForCurDir()
349 : }()
350 2 : switchDir := false
351 2 : // Arbitrary value.
352 2 : const highSecondaryErrorCountThreshold = 2
353 2 : // We don't consider a switch if currently using the primary dir and the
354 2 : // secondary dir has high enough errors. It is more likely that someone
355 2 : // has misconfigured a secondary e.g. wrong permissions or not enough
356 2 : // disk space. We only remember the error history in the context of the
357 2 : // lastWriter since an operator can fix the underlying misconfiguration.
358 2 : unhealthyThreshold, failoverEnabled := m.opts.UnhealthyOperationLatencyThreshold()
359 2 :
360 2 : if !(lastWriter.errorCounts[secondaryDirIndex] >= highSecondaryErrorCountThreshold &&
361 2 : dirIndex == primaryDirIndex) && failoverEnabled {
362 2 : // Switching heuristics. Subject to change based on real world experience.
363 2 : if writerErr != nil {
364 1 : // An error causes an immediate switch, since a LogWriter with an
365 1 : // error is useless.
366 1 : lastWriter.errorCounts[dirIndex]++
367 1 : switchDir = true
368 2 : } else if writerOngoingLatency > unhealthyThreshold {
369 2 : // Arbitrary value.
370 2 : const switchImmediatelyCountThreshold = 2
371 2 : // High latency. Switch immediately if the number of switches that
372 2 : // have been done is below the threshold, since that gives us an
373 2 : // observation of both dirs. Once above that threshold, decay the
374 2 : // switch rate by increasing the observed latency needed for a
375 2 : // switch.
376 2 : if lastWriter.numSwitches < switchImmediatelyCountThreshold ||
377 2 : writerOngoingLatency > 2*lastWriter.ongoingLatencyAtSwitch {
378 2 : switchDir = true
379 2 : lastWriter.ongoingLatencyAtSwitch = writerOngoingLatency
380 2 : }
381 : // Else high latency, but not high enough yet to motivate switch.
382 2 : } else if dirIndex == secondaryDirIndex {
383 2 : // The writer looks healthy. We can still switch if the writer is using the
384 2 : // secondary dir and the primary is healthy again.
385 2 : primaryMean, primaryMax := m.prober.getMeanMax(m.opts.HealthyInterval)
386 2 : if primaryMean < m.opts.HealthyProbeLatencyThreshold &&
387 2 : primaryMax < m.opts.HealthyProbeLatencyThreshold {
388 2 : switchDir = true
389 2 : }
390 : }
391 : }
392 2 : if switchDir {
393 2 : lastWriter.numSwitches++
394 2 : if dirIndex == secondaryDirIndex {
395 2 : // Switching back to primary, so don't need to probe to see if
396 2 : // primary is healthy.
397 2 : m.prober.disableProbing()
398 2 : dirIndex = primaryDirIndex
399 2 : } else {
400 2 : m.prober.enableProbing()
401 2 : dirIndex = secondaryDirIndex
402 2 : }
403 2 : dir := m.opts.dirs[dirIndex]
404 2 : now := m.opts.timeSource.now()
405 2 : m.mu.Lock()
406 2 : m.accumulateDurationLocked(now)
407 2 : m.mu.dirIndex = dirIndex
408 2 : m.mu.dirSwitchCount++
409 2 : if dirIndex == primaryDirIndex {
410 2 : m.mu.lastFailBackTime = now
411 2 : }
412 2 : if m.mu.writer != nil {
413 2 : m.mu.writer.switchToNewDir(dir)
414 2 : }
415 2 : m.mu.Unlock()
416 : }
417 : }
418 2 : if m.opts.monitorStateForTesting != nil {
419 1 : m.opts.monitorStateForTesting(lastWriter.numSwitches, lastWriter.ongoingLatencyAtSwitch)
420 1 : }
421 2 : if m.opts.monitorIterationForTesting != nil {
422 1 : m.opts.monitorIterationForTesting <- struct{}{}
423 1 : }
424 : }
425 : }
426 :
427 : type logicalLogWithSizesEtc struct {
428 : num NumWAL
429 : segments []segmentWithSizeEtc
430 : }
431 :
432 : type segmentWithSizeEtc struct {
433 : segment
434 : approxFileSize uint64
435 : synchronouslyClosed bool
436 : }
437 :
438 : type failoverManager struct {
439 : opts Options
440 : // initialObsolete holds the set of DeletableLogs that formed the logs
441 : // passed into Init. The initialObsolete logs are all obsolete. Once
442 : // returned via Manager.Obsolete, initialObsolete is cleared. The
443 : // initialObsolete logs are stored separately from mu.queue because they may
444 : // include logs that were NOT created by the standalone manager, and
445 : // multiple physical log files may form one logical WAL.
446 : initialObsolete []DeletableLog
447 :
448 : // TODO(jackson/sumeer): read-path etc.
449 :
450 : dirHandles [numDirIndices]vfs.File
451 : stopper *stopper
452 : monitor *failoverMonitor
453 : mu struct {
454 : sync.Mutex
455 : closedWALs []logicalLogWithSizesEtc
456 : ww *failoverWriter
457 : }
458 : recycler LogRecycler
459 : // Due to async creation of files in failoverWriter, multiple goroutines can
460 : // concurrently try to get a file from the recycler. This mutex protects the
461 : // logRecycler.{Peek,Pop} pair.
462 : recyclerPeekPopMu sync.Mutex
463 : }
464 :
465 : var _ Manager = &failoverManager{}
466 :
467 : // TODO(sumeer):
468 : // - log deletion: if record.LogWriter did not close yet, the cleaner may
469 : // get an error when deleting or renaming (only under windows?).
470 :
471 : // init implements Manager.
472 2 : func (wm *failoverManager) init(o Options, initial Logs) error {
473 2 : if o.timeSource == nil {
474 2 : o.timeSource = defaultTime{}
475 2 : }
476 2 : o.FailoverOptions.EnsureDefaults()
477 2 :
478 2 : // Synchronously ensure that we're able to write to the secondary before we
479 2 : // proceed. An operator doesn't want to encounter an issue writing to the
480 2 : // secondary the first time there's a need to failover. We write a bit of
481 2 : // metadata to a file in the secondary's directory.
482 2 : f, err := o.Secondary.FS.Create(o.Secondary.FS.PathJoin(o.Secondary.Dirname, "failover_source"), "pebble-wal")
483 2 : if err != nil {
484 1 : return errors.Newf("failed to write to WAL secondary dir: %v", err)
485 1 : }
486 2 : if _, err := io.WriteString(f, fmt.Sprintf("primary: %s\nprocess start: %s\n",
487 2 : o.Primary.Dirname,
488 2 : time.Now(),
489 2 : )); err != nil {
490 0 : return errors.Newf("failed to write metadata to WAL secondary dir: %v", err)
491 0 : }
492 2 : if err := errors.CombineErrors(f.Sync(), f.Close()); err != nil {
493 0 : return err
494 0 : }
495 :
496 2 : stopper := newStopper()
497 2 : var dirs [numDirIndices]dirAndFileHandle
498 2 : for i, dir := range []Dir{o.Primary, o.Secondary} {
499 2 : dirs[i].Dir = dir
500 2 : f, err := dir.FS.OpenDir(dir.Dirname)
501 2 : if err != nil {
502 0 : return err
503 0 : }
504 2 : dirs[i].File = f
505 : }
506 2 : fmOpts := failoverMonitorOptions{
507 2 : dirs: dirs,
508 2 : FailoverOptions: o.FailoverOptions,
509 2 : stopper: stopper,
510 2 : }
511 2 : monitor := newFailoverMonitor(fmOpts)
512 2 : *wm = failoverManager{
513 2 : opts: o,
514 2 : dirHandles: [numDirIndices]vfs.File{dirs[primaryDirIndex].File, dirs[secondaryDirIndex].File},
515 2 : stopper: stopper,
516 2 : monitor: monitor,
517 2 : }
518 2 : wm.recycler.Init(o.MaxNumRecyclableLogs)
519 2 : for _, ll := range initial {
520 2 : if wm.recycler.MinRecycleLogNum() <= ll.Num {
521 2 : wm.recycler.SetMinRecycleLogNum(ll.Num + 1)
522 2 : }
523 2 : var err error
524 2 : wm.initialObsolete, err = appendDeletableLogs(wm.initialObsolete, ll)
525 2 : if err != nil {
526 0 : return err
527 0 : }
528 : }
529 2 : return nil
530 : }
531 :
532 : // List implements Manager.
533 2 : func (wm *failoverManager) List() (Logs, error) {
534 2 : wm.mu.Lock()
535 2 : defer wm.mu.Unlock()
536 2 : n := len(wm.mu.closedWALs)
537 2 : if wm.mu.ww != nil {
538 2 : n++
539 2 : }
540 2 : wals := make(Logs, n)
541 2 : setLogicalLog := func(index int, llse logicalLogWithSizesEtc) {
542 2 : segments := make([]segment, len(llse.segments))
543 2 : for j := range llse.segments {
544 2 : segments[j] = llse.segments[j].segment
545 2 : }
546 2 : wals[index] = LogicalLog{
547 2 : Num: llse.num,
548 2 : segments: segments,
549 2 : }
550 : }
551 2 : for i, llse := range wm.mu.closedWALs {
552 2 : setLogicalLog(i, llse)
553 2 : }
554 2 : if wm.mu.ww != nil {
555 2 : setLogicalLog(n-1, wm.mu.ww.getLog())
556 2 : }
557 2 : return wals, nil
558 : }
559 :
560 : // Obsolete implements Manager.
561 : func (wm *failoverManager) Obsolete(
562 : minUnflushedNum NumWAL, noRecycle bool,
563 2 : ) (toDelete []DeletableLog, err error) {
564 2 : wm.mu.Lock()
565 2 : defer wm.mu.Unlock()
566 2 :
567 2 : // If this is the first call to Obsolete after Open, we may have deletable
568 2 : // logs outside the queue.
569 2 : toDelete, wm.initialObsolete = wm.initialObsolete, nil
570 2 :
571 2 : i := 0
572 2 : for ; i < len(wm.mu.closedWALs); i++ {
573 2 : ll := wm.mu.closedWALs[i]
574 2 : if ll.num >= minUnflushedNum {
575 2 : break
576 : }
577 : // Recycle only the primary at logNameIndex=0, if there was no failover,
578 : // and synchronously closed. It may not be safe to recycle a file that is
579 : // still being written to. And recycling when there was a failover may
580 : // fill up the recycler with smaller log files. The restriction regarding
581 : // logNameIndex=0 is because logRecycler.Peek only exposes the
582 : // DiskFileNum, and we need to use that to construct the path -- we could
583 : // remove this restriction by changing the logRecycler interface, but we
584 : // don't bother.
585 2 : canRecycle := !noRecycle && len(ll.segments) == 1 && ll.segments[0].synchronouslyClosed &&
586 2 : ll.segments[0].logNameIndex == 0 &&
587 2 : ll.segments[0].dir == wm.opts.Primary
588 2 : if !canRecycle || !wm.recycler.Add(base.FileInfo{
589 2 : FileNum: base.DiskFileNum(ll.num),
590 2 : FileSize: ll.segments[0].approxFileSize,
591 2 : }) {
592 2 : for _, s := range ll.segments {
593 2 : toDelete = append(toDelete, DeletableLog{
594 2 : FS: s.dir.FS,
595 2 : Path: s.dir.FS.PathJoin(s.dir.Dirname, makeLogFilename(ll.num, s.logNameIndex)),
596 2 : NumWAL: ll.num,
597 2 : ApproxFileSize: s.approxFileSize,
598 2 : })
599 2 : }
600 : }
601 : }
602 2 : wm.mu.closedWALs = wm.mu.closedWALs[i:]
603 2 : return toDelete, nil
604 : }
605 :
606 : // Create implements Manager.
607 2 : func (wm *failoverManager) Create(wn NumWAL, jobID int) (Writer, error) {
608 2 : func() {
609 2 : wm.mu.Lock()
610 2 : defer wm.mu.Unlock()
611 2 : if wm.mu.ww != nil {
612 0 : panic("previous wal.Writer not closed")
613 : }
614 : }()
615 2 : fwOpts := failoverWriterOpts{
616 2 : wn: wn,
617 2 : logger: wm.opts.Logger,
618 2 : timeSource: wm.opts.timeSource,
619 2 : jobID: jobID,
620 2 : logCreator: wm.logCreator,
621 2 : noSyncOnClose: wm.opts.NoSyncOnClose,
622 2 : bytesPerSync: wm.opts.BytesPerSync,
623 2 : preallocateSize: wm.opts.PreallocateSize,
624 2 : minSyncInterval: wm.opts.MinSyncInterval,
625 2 : fsyncLatency: wm.opts.FsyncLatency,
626 2 : queueSemChan: wm.opts.QueueSemChan,
627 2 : stopper: wm.stopper,
628 2 : failoverWriteAndSyncLatency: wm.opts.FailoverWriteAndSyncLatency,
629 2 : writerClosed: wm.writerClosed,
630 2 : writerCreatedForTest: wm.opts.logWriterCreatedForTesting,
631 2 : }
632 2 : var err error
633 2 : var ww *failoverWriter
634 2 : writerCreateFunc := func(dir dirAndFileHandle) switchableWriter {
635 2 : ww, err = newFailoverWriter(fwOpts, dir)
636 2 : if err != nil {
637 0 : return nil
638 0 : }
639 2 : return ww
640 : }
641 2 : wm.monitor.newWriter(writerCreateFunc)
642 2 : if ww != nil {
643 2 : wm.mu.Lock()
644 2 : defer wm.mu.Unlock()
645 2 : wm.mu.ww = ww
646 2 : }
647 2 : return ww, err
648 : }
649 :
650 : // ElevateWriteStallThresholdForFailover implements Manager.
651 2 : func (wm *failoverManager) ElevateWriteStallThresholdForFailover() bool {
652 2 : return wm.monitor.elevateWriteStallThresholdForFailover()
653 2 : }
654 :
655 2 : func (wm *failoverManager) writerClosed(llse logicalLogWithSizesEtc) {
656 2 : wm.monitor.noWriter()
657 2 : wm.mu.Lock()
658 2 : defer wm.mu.Unlock()
659 2 : wm.mu.closedWALs = append(wm.mu.closedWALs, llse)
660 2 : wm.mu.ww = nil
661 2 : }
662 :
663 : // Stats implements Manager.
664 2 : func (wm *failoverManager) Stats() Stats {
665 2 : obsoleteLogsCount, obsoleteLogSize := wm.recycler.Stats()
666 2 : failoverStats := wm.monitor.stats()
667 2 : failoverStats.FailoverWriteAndSyncLatency = wm.opts.FailoverWriteAndSyncLatency
668 2 : wm.mu.Lock()
669 2 : defer wm.mu.Unlock()
670 2 : var liveFileCount int
671 2 : var liveFileSize uint64
672 2 : updateStats := func(segments []segmentWithSizeEtc) {
673 2 : for _, s := range segments {
674 2 : liveFileCount++
675 2 : liveFileSize += s.approxFileSize
676 2 : }
677 : }
678 2 : for _, llse := range wm.mu.closedWALs {
679 2 : updateStats(llse.segments)
680 2 : }
681 2 : if wm.mu.ww != nil {
682 1 : updateStats(wm.mu.ww.getLog().segments)
683 1 : }
684 2 : for i := range wm.initialObsolete {
685 1 : if i == 0 || wm.initialObsolete[i].NumWAL != wm.initialObsolete[i-1].NumWAL {
686 1 : obsoleteLogsCount++
687 1 : }
688 1 : obsoleteLogSize += wm.initialObsolete[i].ApproxFileSize
689 : }
690 2 : return Stats{
691 2 : ObsoleteFileCount: obsoleteLogsCount,
692 2 : ObsoleteFileSize: obsoleteLogSize,
693 2 : LiveFileCount: liveFileCount,
694 2 : LiveFileSize: liveFileSize,
695 2 : Failover: failoverStats,
696 2 : }
697 : }
698 :
699 : // Close implements Manager.
700 2 : func (wm *failoverManager) Close() error {
701 2 : wm.stopper.stop()
702 2 : // Since all goroutines are stopped, can close the dirs.
703 2 : var err error
704 2 : for _, f := range wm.dirHandles {
705 2 : err = firstError(err, f.Close())
706 2 : }
707 2 : return err
708 : }
709 :
710 : // RecyclerForTesting implements Manager.
711 0 : func (wm *failoverManager) RecyclerForTesting() *LogRecycler {
712 0 : return nil
713 0 : }
714 :
715 : // logCreator implements the logCreator func type.
716 : func (wm *failoverManager) logCreator(
717 : dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
718 2 : ) (logFile vfs.File, initialFileSize uint64, err error) {
719 2 : logFilename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li))
720 2 : isPrimary := dir == wm.opts.Primary
721 2 : // Only recycling when logNameIndex is 0 is a somewhat arbitrary choice.
722 2 : considerRecycle := li == 0 && isPrimary
723 2 : createInfo := CreateInfo{
724 2 : JobID: jobID,
725 2 : Path: logFilename,
726 2 : IsSecondary: !isPrimary,
727 2 : Num: wn,
728 2 : Err: nil,
729 2 : }
730 2 : defer func() {
731 2 : createInfo.Err = err
732 2 : if wm.opts.EventListener != nil {
733 2 : wm.opts.EventListener.LogCreated(createInfo)
734 2 : }
735 : }()
736 2 : if considerRecycle {
737 2 : // Try to use a recycled log file. Recycling log files is an important
738 2 : // performance optimization as it is faster to sync a file that has
739 2 : // already been written, than one which is being written for the first
740 2 : // time. This is due to the need to sync file metadata when a file is
741 2 : // being written for the first time. Note this is true even if file
742 2 : // preallocation is performed (e.g. fallocate).
743 2 : var recycleLog base.FileInfo
744 2 : var recycleOK bool
745 2 : func() {
746 2 : wm.recyclerPeekPopMu.Lock()
747 2 : defer wm.recyclerPeekPopMu.Unlock()
748 2 : recycleLog, recycleOK = wm.recycler.Peek()
749 2 : if recycleOK {
750 1 : if err = wm.recycler.Pop(recycleLog.FileNum); err != nil {
751 0 : panic(err)
752 : }
753 : }
754 : }()
755 2 : if recycleOK {
756 1 : createInfo.RecycledFileNum = recycleLog.FileNum
757 1 : recycleLogName := dir.FS.PathJoin(dir.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0))
758 1 : r.writeStart()
759 1 : logFile, err = dir.FS.ReuseForWrite(recycleLogName, logFilename, "pebble-wal")
760 1 : r.writeEnd(err)
761 1 : // TODO(sumeer): should we fatal since primary dir? At some point it is
762 1 : // better to fatal instead of continuing to failover.
763 1 : // base.MustExist(dir.FS, logFilename, wm.opts.Logger, err)
764 1 : if err != nil {
765 0 : // TODO(sumeer): we have popped from the logRecycler, which is
766 0 : // arguably correct, since we don't want to keep trying to reuse a log
767 0 : // that causes some error. But the original or new file may exist, and
768 0 : // no one will clean it up unless the process restarts.
769 0 : return nil, 0, err
770 0 : }
771 : // Figure out the recycled WAL size. This Stat is necessary because
772 : // ReuseForWrite's contract allows for removing the old file and
773 : // creating a new one. We don't know whether the WAL was actually
774 : // recycled.
775 : //
776 : // TODO(jackson): Adding a boolean to the ReuseForWrite return value
777 : // indicating whether or not the file was actually reused would allow us
778 : // to skip the stat and use recycleLog.FileSize.
779 1 : var finfo os.FileInfo
780 1 : finfo, err = logFile.Stat()
781 1 : if err != nil {
782 0 : logFile.Close()
783 0 : return nil, 0, err
784 0 : }
785 1 : initialFileSize = uint64(finfo.Size())
786 1 : return logFile, initialFileSize, nil
787 : }
788 : }
789 : // Did not recycle.
790 : //
791 : // Create file.
792 2 : r.writeStart()
793 2 : logFile, err = dir.FS.Create(logFilename, "pebble-wal")
794 2 : r.writeEnd(err)
795 2 : return logFile, 0, err
796 : }
797 :
798 : type stopper struct {
799 : quiescer chan struct{} // Closed when quiescing
800 : wg sync.WaitGroup
801 : }
802 :
803 2 : func newStopper() *stopper {
804 2 : return &stopper{
805 2 : quiescer: make(chan struct{}),
806 2 : }
807 2 : }
808 :
809 2 : func (s *stopper) runAsync(f func()) {
810 2 : s.wg.Add(1)
811 2 : go func() {
812 2 : f()
813 2 : s.wg.Done()
814 2 : }()
815 : }
816 :
817 : // shouldQuiesce returns a channel which will be closed when stop() has been
818 : // invoked and outstanding goroutines should begin to quiesce.
819 2 : func (s *stopper) shouldQuiesce() <-chan struct{} {
820 2 : return s.quiescer
821 2 : }
822 :
823 2 : func (s *stopper) stop() {
824 2 : close(s.quiescer)
825 2 : s.wg.Wait()
826 2 : }
827 :
828 : // timeSource and tickerI are extracted from CockroachDB's timeutil, with
829 : // removal of support for Timer, and added support in the manual
830 : // implementation for reset.
831 :
832 : // timeSource is used to interact with the clock and tickers. Abstracts
833 : // time.Now and time.NewTicker for testing.
834 : type timeSource interface {
835 : now() time.Time
836 : newTicker(duration time.Duration) tickerI
837 : }
838 :
839 : // tickerI is an interface wrapping time.Ticker.
840 : type tickerI interface {
841 : reset(duration time.Duration)
842 : stop()
843 : ch() <-chan time.Time
844 : }
845 :
846 : // defaultTime is a timeSource using the time package.
847 : type defaultTime struct{}
848 :
849 : var _ timeSource = defaultTime{}
850 :
851 2 : func (defaultTime) now() time.Time {
852 2 : return time.Now()
853 2 : }
854 :
855 2 : func (defaultTime) newTicker(duration time.Duration) tickerI {
856 2 : return (*defaultTicker)(time.NewTicker(duration))
857 2 : }
858 :
859 : // defaultTicker uses time.Ticker.
860 : type defaultTicker time.Ticker
861 :
862 : var _ tickerI = &defaultTicker{}
863 :
864 2 : func (t *defaultTicker) reset(duration time.Duration) {
865 2 : (*time.Ticker)(t).Reset(duration)
866 2 : }
867 :
868 2 : func (t *defaultTicker) stop() {
869 2 : (*time.Ticker)(t).Stop()
870 2 : }
871 :
872 2 : func (t *defaultTicker) ch() <-chan time.Time {
873 2 : return (*time.Ticker)(t).C
874 2 : }
875 :
876 : // Make lint happy.
877 : var _ = (*failoverMonitor).noWriter
878 : var _ = (*failoverManager).writerClosed
879 : var _ = (&stopper{}).shouldQuiesce
|