Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "math/rand/v2"
11 : "os"
12 : "sync"
13 : "time"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/vfs"
18 : )
19 :
20 : // dirProber probes the primary dir, until it is confirmed to be healthy. If
21 : // it doesn't have enough samples, it is deemed to be unhealthy. It is only
22 : // used for failback to the primary.
23 : type dirProber struct {
24 : fs vfs.FS
25 : // The full path of the file to use for the probe. The probe is destructive
26 : // in that it deletes and (re)creates the file.
27 : filename string
28 : // The probing interval, when enabled.
29 : interval time.Duration
30 : timeSource
31 : // buf holds the random bytes that are written during the probe.
32 : buf [100 << 10]byte
33 : // enabled is signaled to enable and disable probing. The initial state of
34 : // the prober is disabled.
35 : enabled chan bool
36 : mu struct {
37 : sync.Mutex
38 : // Circular buffer of history samples.
39 : history [probeHistoryLength]time.Duration
40 : // The history is in [firstProbeIndex, nextProbeIndex).
41 : firstProbeIndex int
42 : nextProbeIndex int
43 : }
44 : iterationForTesting chan<- struct{}
45 : }
46 :
47 : const probeHistoryLength = 128
48 :
49 : // Large value.
50 : const failedProbeDuration = 24 * 60 * 60 * time.Second
51 :
52 : // init takes a stopper in order to connect the dirProber's long-running
53 : // goroutines with the stopper's wait group, but the dirProber has its own
54 : // stop() method that should be invoked to trigger the shutdown.
55 : func (p *dirProber) init(
56 : fs vfs.FS,
57 : filename string,
58 : interval time.Duration,
59 : stopper *stopper,
60 : ts timeSource,
61 : notifyIterationForTesting chan<- struct{},
62 1 : ) {
63 1 : *p = dirProber{
64 1 : fs: fs,
65 1 : filename: filename,
66 1 : interval: interval,
67 1 : timeSource: ts,
68 1 : enabled: make(chan bool),
69 1 : iterationForTesting: notifyIterationForTesting,
70 1 : }
71 1 : // Random bytes for writing, to defeat any FS compression optimization.
72 1 : for i := range p.buf {
73 1 : p.buf[i] = byte(rand.Uint32())
74 1 : }
75 : // dirProber has an explicit stop() method instead of listening on
76 : // stopper.shouldQuiesce. This structure helps negotiate the shutdown
77 : // between failoverMonitor and dirProber. If the dirProber independently
78 : // listened to shouldQuiesce, it might exit before the failover monitor. The
79 : // [enable|disable]Probing methods require sending on a channel expecting
80 : // that the dirProber goroutine will receive on the same channel. If the
81 : // dirProber noticed the queiscence and exited first, the failoverMonitor
82 : // could deadlock waiting for a goroutine that no longer exists.
83 : //
84 : // Instead, shutdown of the dirProber is coordinated by the failoverMonitor:
85 : // - the failoverMonitor listens to stopper.shouldQuiesce.
86 : // - when the failoverMonitor is exiting, it will no longer attempt to
87 : // interact with the dirProber. Only then does it invoke dirProber.stop.
88 1 : stopper.runAsync(p.probeLoop)
89 : }
90 :
91 1 : func (p *dirProber) probeLoop() {
92 1 : ticker := p.timeSource.newTicker(p.interval)
93 1 : ticker.stop()
94 1 : tickerCh := ticker.ch()
95 1 : shouldContinue := true
96 1 : var enabled bool
97 1 : for shouldContinue {
98 1 : select {
99 1 : case <-tickerCh:
100 1 : if !enabled {
101 1 : // Could have a tick waiting before we disabled it. Ignore.
102 1 : continue
103 : }
104 1 : probeDur := func() time.Duration {
105 1 : // Delete, create, write, sync.
106 1 : start := p.timeSource.now()
107 1 : _ = p.fs.Remove(p.filename)
108 1 : f, err := p.fs.Create(p.filename, "pebble-wal")
109 1 : if err != nil {
110 1 : return failedProbeDuration
111 1 : }
112 1 : defer f.Close()
113 1 : n, err := f.Write(p.buf[:])
114 1 : if err != nil {
115 0 : return failedProbeDuration
116 0 : }
117 1 : if n != len(p.buf) {
118 0 : panic("invariant violation")
119 : }
120 1 : err = f.Sync()
121 1 : if err != nil {
122 0 : return failedProbeDuration
123 0 : }
124 1 : return p.timeSource.now().Sub(start)
125 : }()
126 1 : p.mu.Lock()
127 1 : nextIndex := p.mu.nextProbeIndex % probeHistoryLength
128 1 : p.mu.history[nextIndex] = probeDur
129 1 : p.mu.nextProbeIndex++
130 1 : numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex
131 1 : if numSamples > probeHistoryLength {
132 0 : // Length has exceeded capacity, i.e., overwritten the first probe.
133 0 : p.mu.firstProbeIndex++
134 0 : }
135 1 : p.mu.Unlock()
136 :
137 1 : case enabled, shouldContinue = <-p.enabled:
138 1 : if !enabled || !shouldContinue {
139 1 : ticker.stop()
140 1 : p.mu.Lock()
141 1 : p.mu.firstProbeIndex = 0
142 1 : p.mu.nextProbeIndex = 0
143 1 : p.mu.Unlock()
144 1 : } else {
145 1 : ticker.reset(p.interval)
146 1 : }
147 : }
148 1 : if p.iterationForTesting != nil {
149 1 : p.iterationForTesting <- struct{}{}
150 1 : }
151 : }
152 : }
153 :
154 1 : func (p *dirProber) enableProbing() {
155 1 : p.enabled <- true
156 1 : }
157 :
158 1 : func (p *dirProber) disableProbing() {
159 1 : p.enabled <- false
160 1 : }
161 :
162 1 : func (p *dirProber) stop() {
163 1 : close(p.enabled)
164 1 : }
165 :
166 1 : func (p *dirProber) getMeanMax(interval time.Duration) (time.Duration, time.Duration) {
167 1 : p.mu.Lock()
168 1 : defer p.mu.Unlock()
169 1 : numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex
170 1 : samplesNeeded := int((interval + p.interval - 1) / p.interval)
171 1 : if samplesNeeded == 0 {
172 0 : panic("interval is too short")
173 1 : } else if samplesNeeded > probeHistoryLength {
174 0 : panic("interval is too long")
175 : }
176 1 : if samplesNeeded > numSamples {
177 1 : // Not enough samples, so assume not yet healthy.
178 1 : return failedProbeDuration, failedProbeDuration
179 1 : }
180 1 : offset := numSamples - samplesNeeded
181 1 : var sum, max time.Duration
182 1 : for i := p.mu.firstProbeIndex + offset; i < p.mu.nextProbeIndex; i++ {
183 1 : sampleDur := p.mu.history[i%probeHistoryLength]
184 1 : sum += sampleDur
185 1 : if max < sampleDur {
186 1 : max = sampleDur
187 1 : }
188 : }
189 1 : mean := sum / time.Duration(samplesNeeded)
190 1 : return mean, max
191 : }
192 :
193 : type dirIndex int
194 :
195 : const (
196 : primaryDirIndex dirIndex = iota
197 : secondaryDirIndex
198 : numDirIndices
199 : )
200 :
201 : type dirAndFileHandle struct {
202 : Dir
203 : vfs.File
204 : }
205 :
206 : // switchableWriter is a subset of failoverWriter needed by failoverMonitor.
207 : type switchableWriter interface {
208 : switchToNewDir(dirAndFileHandle) error
209 : ongoingLatencyOrErrorForCurDir() (time.Duration, error)
210 : }
211 :
212 : type failoverMonitorOptions struct {
213 : // The primary and secondary dir.
214 : dirs [numDirIndices]dirAndFileHandle
215 :
216 : FailoverOptions
217 : stopper *stopper
218 : }
219 :
220 : // failoverMonitor monitors the latency and error observed by the
221 : // switchableWriter, and does failover by switching the dir. It also monitors
222 : // the primary dir for failback.
223 : type failoverMonitor struct {
224 : opts failoverMonitorOptions
225 : prober dirProber
226 : mu struct {
227 : sync.Mutex
228 : // dirIndex and lastFailbackTime are only modified by monitorLoop. They
229 : // are protected by the mutex for concurrent reads.
230 :
231 : // dirIndex is the directory to use by writer (if non-nil), or when the
232 : // writer becomes non-nil.
233 : dirIndex
234 : // The time at which the monitor last failed back to the primary. This is
235 : // only relevant when dirIndex == primaryDirIndex.
236 : lastFailBackTime time.Time
237 : // The current failoverWriter, exposed via a narrower interface.
238 : writer switchableWriter
239 :
240 : // Stats.
241 : dirSwitchCount int64
242 : lastAccumulateIntoDurations time.Time
243 : primaryWriteDuration time.Duration
244 : secondaryWriteDuration time.Duration
245 : }
246 : }
247 :
248 1 : func newFailoverMonitor(opts failoverMonitorOptions) *failoverMonitor {
249 1 : m := &failoverMonitor{
250 1 : opts: opts,
251 1 : }
252 1 : m.mu.lastAccumulateIntoDurations = opts.timeSource.now()
253 1 : m.prober.init(opts.dirs[primaryDirIndex].FS,
254 1 : opts.dirs[primaryDirIndex].FS.PathJoin(opts.dirs[primaryDirIndex].Dirname, "probe-file"),
255 1 : opts.PrimaryDirProbeInterval, opts.stopper, opts.timeSource, opts.proberIterationForTesting)
256 1 : opts.stopper.runAsync(func() {
257 1 : m.monitorLoop(opts.stopper.shouldQuiesce())
258 1 : })
259 1 : return m
260 : }
261 :
262 : // Called when previous writer is closed
263 1 : func (m *failoverMonitor) noWriter() {
264 1 : now := m.opts.timeSource.now()
265 1 : m.mu.Lock()
266 1 : defer m.mu.Unlock()
267 1 : m.mu.writer = nil
268 1 : m.accumulateDurationLocked(now)
269 1 : }
270 :
271 : // writerCreateFunc is allowed to return nil, if there is an error. It is not
272 : // the responsibility of failoverMonitor to handle that error. So this should
273 : // not be due to an IO error (which failoverMonitor is interested in).
274 1 : func (m *failoverMonitor) newWriter(writerCreateFunc func(dir dirAndFileHandle) switchableWriter) {
275 1 : m.mu.Lock()
276 1 : defer m.mu.Unlock()
277 1 : if m.mu.writer != nil {
278 0 : panic("previous writer not closed")
279 : }
280 1 : m.mu.writer = writerCreateFunc(m.opts.dirs[m.mu.dirIndex])
281 : }
282 :
283 1 : func (m *failoverMonitor) elevateWriteStallThresholdForFailover() bool {
284 1 : m.mu.Lock()
285 1 : defer m.mu.Unlock()
286 1 : if m.mu.dirIndex == secondaryDirIndex {
287 1 : return true
288 1 : }
289 1 : intervalSinceFailedback := m.opts.timeSource.now().Sub(m.mu.lastFailBackTime)
290 1 : return intervalSinceFailedback < m.opts.ElevatedWriteStallThresholdLag
291 : }
292 :
293 1 : func (m *failoverMonitor) accumulateDurationLocked(now time.Time) {
294 1 : dur := now.Sub(m.mu.lastAccumulateIntoDurations)
295 1 : m.mu.lastAccumulateIntoDurations = now
296 1 : if m.mu.dirIndex == primaryDirIndex {
297 1 : m.mu.primaryWriteDuration += dur
298 1 : return
299 1 : }
300 1 : m.mu.secondaryWriteDuration += dur
301 : }
302 :
303 1 : func (m *failoverMonitor) stats() FailoverStats {
304 1 : now := m.opts.timeSource.now()
305 1 : m.mu.Lock()
306 1 : defer m.mu.Unlock()
307 1 : m.accumulateDurationLocked(now)
308 1 : return FailoverStats{
309 1 : DirSwitchCount: m.mu.dirSwitchCount,
310 1 : PrimaryWriteDuration: m.mu.primaryWriteDuration,
311 1 : SecondaryWriteDuration: m.mu.secondaryWriteDuration,
312 1 : }
313 1 : }
314 :
315 : // lastWriterInfo is state maintained in the monitorLoop for the latest
316 : // switchable writer. It is mainly used to dampen the switching.
317 : type lastWriterInfo struct {
318 : writer switchableWriter
319 : numSwitches int
320 : ongoingLatencyAtSwitch time.Duration
321 : errorCounts [numDirIndices]int
322 : }
323 :
324 1 : func (m *failoverMonitor) monitorLoop(shouldQuiesce <-chan struct{}) {
325 1 : ticker := m.opts.timeSource.newTicker(m.opts.UnhealthySamplingInterval)
326 1 : if m.opts.monitorIterationForTesting != nil {
327 1 : m.opts.monitorIterationForTesting <- struct{}{}
328 1 : }
329 1 : tickerCh := ticker.ch()
330 1 : dirIndex := primaryDirIndex
331 1 : var lastWriter lastWriterInfo
332 1 : for {
333 1 : select {
334 1 : case <-shouldQuiesce:
335 1 : ticker.stop()
336 1 : m.prober.stop()
337 1 : return
338 1 : case <-tickerCh:
339 1 : writerOngoingLatency, writerErr := func() (time.Duration, error) {
340 1 : m.mu.Lock()
341 1 : defer m.mu.Unlock()
342 1 : if m.mu.writer != lastWriter.writer {
343 1 : lastWriter = lastWriterInfo{writer: m.mu.writer}
344 1 : }
345 1 : if lastWriter.writer == nil {
346 1 : return 0, nil
347 1 : }
348 1 : return lastWriter.writer.ongoingLatencyOrErrorForCurDir()
349 : }()
350 1 : switchDir := false
351 1 : // Arbitrary value.
352 1 : const highSecondaryErrorCountThreshold = 2
353 1 : // We don't consider a switch if currently using the primary dir and the
354 1 : // secondary dir has high enough errors. It is more likely that someone
355 1 : // has misconfigured a secondary e.g. wrong permissions or not enough
356 1 : // disk space. We only remember the error history in the context of the
357 1 : // lastWriter since an operator can fix the underlying misconfiguration.
358 1 : unhealthyThreshold, failoverEnabled := m.opts.UnhealthyOperationLatencyThreshold()
359 1 :
360 1 : if !(lastWriter.errorCounts[secondaryDirIndex] >= highSecondaryErrorCountThreshold &&
361 1 : dirIndex == primaryDirIndex) && failoverEnabled {
362 1 : // Switching heuristics. Subject to change based on real world experience.
363 1 : if writerErr != nil {
364 1 : // An error causes an immediate switch, since a LogWriter with an
365 1 : // error is useless.
366 1 : lastWriter.errorCounts[dirIndex]++
367 1 : switchDir = true
368 1 : } else if writerOngoingLatency > unhealthyThreshold {
369 1 : // Arbitrary value.
370 1 : const switchImmediatelyCountThreshold = 2
371 1 : // High latency. Switch immediately if the number of switches that
372 1 : // have been done is below the threshold, since that gives us an
373 1 : // observation of both dirs. Once above that threshold, decay the
374 1 : // switch rate by increasing the observed latency needed for a
375 1 : // switch.
376 1 : if lastWriter.numSwitches < switchImmediatelyCountThreshold ||
377 1 : writerOngoingLatency > 2*lastWriter.ongoingLatencyAtSwitch {
378 1 : switchDir = true
379 1 : lastWriter.ongoingLatencyAtSwitch = writerOngoingLatency
380 1 : }
381 : // Else high latency, but not high enough yet to motivate switch.
382 1 : } else if dirIndex == secondaryDirIndex {
383 1 : // The writer looks healthy. We can still switch if the writer is using the
384 1 : // secondary dir and the primary is healthy again.
385 1 : primaryMean, primaryMax := m.prober.getMeanMax(m.opts.HealthyInterval)
386 1 : if primaryMean < m.opts.HealthyProbeLatencyThreshold &&
387 1 : primaryMax < m.opts.HealthyProbeLatencyThreshold {
388 1 : switchDir = true
389 1 : }
390 : }
391 : }
392 1 : if switchDir {
393 1 : lastWriter.numSwitches++
394 1 : if dirIndex == secondaryDirIndex {
395 1 : // Switching back to primary, so don't need to probe to see if
396 1 : // primary is healthy.
397 1 : m.prober.disableProbing()
398 1 : dirIndex = primaryDirIndex
399 1 : } else {
400 1 : m.prober.enableProbing()
401 1 : dirIndex = secondaryDirIndex
402 1 : }
403 1 : dir := m.opts.dirs[dirIndex]
404 1 : now := m.opts.timeSource.now()
405 1 : m.mu.Lock()
406 1 : m.accumulateDurationLocked(now)
407 1 : m.mu.dirIndex = dirIndex
408 1 : m.mu.dirSwitchCount++
409 1 : if dirIndex == primaryDirIndex {
410 1 : m.mu.lastFailBackTime = now
411 1 : }
412 1 : if m.mu.writer != nil {
413 1 : m.mu.writer.switchToNewDir(dir)
414 1 : }
415 1 : m.mu.Unlock()
416 : }
417 : }
418 1 : if m.opts.monitorStateForTesting != nil {
419 1 : m.opts.monitorStateForTesting(lastWriter.numSwitches, lastWriter.ongoingLatencyAtSwitch)
420 1 : }
421 1 : if m.opts.monitorIterationForTesting != nil {
422 1 : m.opts.monitorIterationForTesting <- struct{}{}
423 1 : }
424 : }
425 : }
426 :
427 : type logicalLogWithSizesEtc struct {
428 : num NumWAL
429 : segments []segmentWithSizeEtc
430 : }
431 :
432 : type segmentWithSizeEtc struct {
433 : segment
434 : approxFileSize uint64
435 : synchronouslyClosed bool
436 : }
437 :
438 : type failoverManager struct {
439 : opts Options
440 : // initialObsolete holds the set of DeletableLogs that formed the logs
441 : // passed into Init. The initialObsolete logs are all obsolete. Once
442 : // returned via Manager.Obsolete, initialObsolete is cleared. The
443 : // initialObsolete logs are stored separately from mu.queue because they may
444 : // include logs that were NOT created by the standalone manager, and
445 : // multiple physical log files may form one logical WAL.
446 : initialObsolete []DeletableLog
447 :
448 : // TODO(jackson/sumeer): read-path etc.
449 :
450 : dirHandles [numDirIndices]vfs.File
451 : stopper *stopper
452 : monitor *failoverMonitor
453 : mu struct {
454 : sync.Mutex
455 : closedWALs []logicalLogWithSizesEtc
456 : ww *failoverWriter
457 : }
458 : recycler LogRecycler
459 : // Due to async creation of files in failoverWriter, multiple goroutines can
460 : // concurrently try to get a file from the recycler. This mutex protects the
461 : // logRecycler.{Peek,Pop} pair.
462 : recyclerPeekPopMu sync.Mutex
463 : }
464 :
465 : var _ Manager = &failoverManager{}
466 :
467 : // TODO(sumeer):
468 : // - log deletion: if record.LogWriter did not close yet, the cleaner may
469 : // get an error when deleting or renaming (only under windows?).
470 :
471 : // init implements Manager.
472 1 : func (wm *failoverManager) init(o Options, initial Logs) error {
473 1 : if o.timeSource == nil {
474 1 : o.timeSource = defaultTime{}
475 1 : }
476 1 : o.FailoverOptions.EnsureDefaults()
477 1 :
478 1 : // Synchronously ensure that we're able to write to the secondary before we
479 1 : // proceed. An operator doesn't want to encounter an issue writing to the
480 1 : // secondary the first time there's a need to failover. We write a bit of
481 1 : // metadata to a file in the secondary's directory.
482 1 : f, err := o.Secondary.FS.Create(o.Secondary.FS.PathJoin(o.Secondary.Dirname, "failover_source"), "pebble-wal")
483 1 : if err != nil {
484 1 : return errors.Newf("failed to write to WAL secondary dir: %v", err)
485 1 : }
486 1 : if _, err := io.WriteString(f, fmt.Sprintf("primary: %s\nprocess start: %s\n",
487 1 : o.Primary.Dirname,
488 1 : time.Now(),
489 1 : )); err != nil {
490 0 : return errors.Newf("failed to write metadata to WAL secondary dir: %v", err)
491 0 : }
492 1 : if err := errors.CombineErrors(f.Sync(), f.Close()); err != nil {
493 0 : return err
494 0 : }
495 :
496 1 : stopper := newStopper()
497 1 : var dirs [numDirIndices]dirAndFileHandle
498 1 : for i, dir := range []Dir{o.Primary, o.Secondary} {
499 1 : dirs[i].Dir = dir
500 1 : f, err := dir.FS.OpenDir(dir.Dirname)
501 1 : if err != nil {
502 0 : return err
503 0 : }
504 1 : dirs[i].File = f
505 : }
506 1 : fmOpts := failoverMonitorOptions{
507 1 : dirs: dirs,
508 1 : FailoverOptions: o.FailoverOptions,
509 1 : stopper: stopper,
510 1 : }
511 1 : monitor := newFailoverMonitor(fmOpts)
512 1 : *wm = failoverManager{
513 1 : opts: o,
514 1 : dirHandles: [numDirIndices]vfs.File{dirs[primaryDirIndex].File, dirs[secondaryDirIndex].File},
515 1 : stopper: stopper,
516 1 : monitor: monitor,
517 1 : }
518 1 : wm.recycler.Init(o.MaxNumRecyclableLogs)
519 1 : for _, ll := range initial {
520 1 : if wm.recycler.MinRecycleLogNum() <= ll.Num {
521 1 : wm.recycler.SetMinRecycleLogNum(ll.Num + 1)
522 1 : }
523 1 : var err error
524 1 : wm.initialObsolete, err = appendDeletableLogs(wm.initialObsolete, ll)
525 1 : if err != nil {
526 0 : return err
527 0 : }
528 : }
529 1 : return nil
530 : }
531 :
532 : // List implements Manager.
533 1 : func (wm *failoverManager) List() (Logs, error) {
534 1 : wm.mu.Lock()
535 1 : defer wm.mu.Unlock()
536 1 : n := len(wm.mu.closedWALs)
537 1 : if wm.mu.ww != nil {
538 1 : n++
539 1 : }
540 1 : wals := make(Logs, n)
541 1 : setLogicalLog := func(index int, llse logicalLogWithSizesEtc) {
542 1 : segments := make([]segment, len(llse.segments))
543 1 : for j := range llse.segments {
544 1 : segments[j] = llse.segments[j].segment
545 1 : }
546 1 : wals[index] = LogicalLog{
547 1 : Num: llse.num,
548 1 : segments: segments,
549 1 : }
550 : }
551 1 : for i, llse := range wm.mu.closedWALs {
552 1 : setLogicalLog(i, llse)
553 1 : }
554 1 : if wm.mu.ww != nil {
555 1 : setLogicalLog(n-1, wm.mu.ww.getLog())
556 1 : }
557 1 : return wals, nil
558 : }
559 :
560 : // Obsolete implements Manager.
561 : func (wm *failoverManager) Obsolete(
562 : minUnflushedNum NumWAL, noRecycle bool,
563 1 : ) (toDelete []DeletableLog, err error) {
564 1 : wm.mu.Lock()
565 1 : defer wm.mu.Unlock()
566 1 :
567 1 : // If this is the first call to Obsolete after Open, we may have deletable
568 1 : // logs outside the queue.
569 1 : toDelete, wm.initialObsolete = wm.initialObsolete, nil
570 1 :
571 1 : i := 0
572 1 : for ; i < len(wm.mu.closedWALs); i++ {
573 1 : ll := wm.mu.closedWALs[i]
574 1 : if ll.num >= minUnflushedNum {
575 1 : break
576 : }
577 : // Recycle only the primary at logNameIndex=0, if there was no failover,
578 : // and synchronously closed. It may not be safe to recycle a file that is
579 : // still being written to. And recycling when there was a failover may
580 : // fill up the recycler with smaller log files. The restriction regarding
581 : // logNameIndex=0 is because logRecycler.Peek only exposes the
582 : // DiskFileNum, and we need to use that to construct the path -- we could
583 : // remove this restriction by changing the logRecycler interface, but we
584 : // don't bother.
585 1 : canRecycle := !noRecycle && len(ll.segments) == 1 && ll.segments[0].synchronouslyClosed &&
586 1 : ll.segments[0].logNameIndex == 0 &&
587 1 : ll.segments[0].dir == wm.opts.Primary
588 1 : if !canRecycle || !wm.recycler.Add(base.FileInfo{
589 1 : FileNum: base.DiskFileNum(ll.num),
590 1 : FileSize: ll.segments[0].approxFileSize,
591 1 : }) {
592 1 : for _, s := range ll.segments {
593 1 : toDelete = append(toDelete, DeletableLog{
594 1 : FS: s.dir.FS,
595 1 : Path: s.dir.FS.PathJoin(s.dir.Dirname, makeLogFilename(ll.num, s.logNameIndex)),
596 1 : NumWAL: ll.num,
597 1 : ApproxFileSize: s.approxFileSize,
598 1 : })
599 1 : }
600 : }
601 : }
602 1 : wm.mu.closedWALs = wm.mu.closedWALs[i:]
603 1 : return toDelete, nil
604 : }
605 :
606 : // Create implements Manager.
607 1 : func (wm *failoverManager) Create(wn NumWAL, jobID int) (Writer, error) {
608 1 : func() {
609 1 : wm.mu.Lock()
610 1 : defer wm.mu.Unlock()
611 1 : if wm.mu.ww != nil {
612 0 : panic("previous wal.Writer not closed")
613 : }
614 : }()
615 1 : fwOpts := failoverWriterOpts{
616 1 : wn: wn,
617 1 : logger: wm.opts.Logger,
618 1 : timeSource: wm.opts.timeSource,
619 1 : jobID: jobID,
620 1 : logCreator: wm.logCreator,
621 1 : noSyncOnClose: wm.opts.NoSyncOnClose,
622 1 : bytesPerSync: wm.opts.BytesPerSync,
623 1 : preallocateSize: wm.opts.PreallocateSize,
624 1 : minSyncInterval: wm.opts.MinSyncInterval,
625 1 : fsyncLatency: wm.opts.FsyncLatency,
626 1 : queueSemChan: wm.opts.QueueSemChan,
627 1 : stopper: wm.stopper,
628 1 : failoverWriteAndSyncLatency: wm.opts.FailoverWriteAndSyncLatency,
629 1 : writerClosed: wm.writerClosed,
630 1 : writerCreatedForTest: wm.opts.logWriterCreatedForTesting,
631 1 : }
632 1 : var err error
633 1 : var ww *failoverWriter
634 1 : writerCreateFunc := func(dir dirAndFileHandle) switchableWriter {
635 1 : ww, err = newFailoverWriter(fwOpts, dir)
636 1 : if err != nil {
637 0 : return nil
638 0 : }
639 1 : return ww
640 : }
641 1 : wm.monitor.newWriter(writerCreateFunc)
642 1 : if ww != nil {
643 1 : wm.mu.Lock()
644 1 : defer wm.mu.Unlock()
645 1 : wm.mu.ww = ww
646 1 : }
647 1 : return ww, err
648 : }
649 :
650 : // ElevateWriteStallThresholdForFailover implements Manager.
651 1 : func (wm *failoverManager) ElevateWriteStallThresholdForFailover() bool {
652 1 : return wm.monitor.elevateWriteStallThresholdForFailover()
653 1 : }
654 :
655 1 : func (wm *failoverManager) writerClosed(llse logicalLogWithSizesEtc) {
656 1 : wm.monitor.noWriter()
657 1 : wm.mu.Lock()
658 1 : defer wm.mu.Unlock()
659 1 : wm.mu.closedWALs = append(wm.mu.closedWALs, llse)
660 1 : wm.mu.ww = nil
661 1 : }
662 :
663 : // Stats implements Manager.
664 1 : func (wm *failoverManager) Stats() Stats {
665 1 : obsoleteLogsCount, obsoleteLogSize := wm.recycler.Stats()
666 1 : failoverStats := wm.monitor.stats()
667 1 : failoverStats.FailoverWriteAndSyncLatency = wm.opts.FailoverWriteAndSyncLatency
668 1 : wm.mu.Lock()
669 1 : defer wm.mu.Unlock()
670 1 : var liveFileCount int
671 1 : var liveFileSize uint64
672 1 : updateStats := func(segments []segmentWithSizeEtc) {
673 1 : for _, s := range segments {
674 1 : liveFileCount++
675 1 : liveFileSize += s.approxFileSize
676 1 : }
677 : }
678 1 : for _, llse := range wm.mu.closedWALs {
679 1 : updateStats(llse.segments)
680 1 : }
681 1 : if wm.mu.ww != nil {
682 1 : updateStats(wm.mu.ww.getLog().segments)
683 1 : }
684 1 : for i := range wm.initialObsolete {
685 1 : if i == 0 || wm.initialObsolete[i].NumWAL != wm.initialObsolete[i-1].NumWAL {
686 1 : obsoleteLogsCount++
687 1 : }
688 1 : obsoleteLogSize += wm.initialObsolete[i].ApproxFileSize
689 : }
690 1 : return Stats{
691 1 : ObsoleteFileCount: obsoleteLogsCount,
692 1 : ObsoleteFileSize: obsoleteLogSize,
693 1 : LiveFileCount: liveFileCount,
694 1 : LiveFileSize: liveFileSize,
695 1 : Failover: failoverStats,
696 1 : }
697 : }
698 :
699 : // Close implements Manager.
700 1 : func (wm *failoverManager) Close() error {
701 1 : wm.stopper.stop()
702 1 : // Since all goroutines are stopped, can close the dirs.
703 1 : var err error
704 1 : for _, f := range wm.dirHandles {
705 1 : err = firstError(err, f.Close())
706 1 : }
707 1 : return err
708 : }
709 :
710 : // RecyclerForTesting implements Manager.
711 0 : func (wm *failoverManager) RecyclerForTesting() *LogRecycler {
712 0 : return nil
713 0 : }
714 :
715 : // logCreator implements the logCreator func type.
716 : func (wm *failoverManager) logCreator(
717 : dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
718 1 : ) (logFile vfs.File, initialFileSize uint64, err error) {
719 1 : logFilename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li))
720 1 : isPrimary := dir == wm.opts.Primary
721 1 : // Only recycling when logNameIndex is 0 is a somewhat arbitrary choice.
722 1 : considerRecycle := li == 0 && isPrimary
723 1 : createInfo := CreateInfo{
724 1 : JobID: jobID,
725 1 : Path: logFilename,
726 1 : IsSecondary: !isPrimary,
727 1 : Num: wn,
728 1 : Err: nil,
729 1 : }
730 1 : defer func() {
731 1 : createInfo.Err = err
732 1 : if wm.opts.EventListener != nil {
733 1 : wm.opts.EventListener.LogCreated(createInfo)
734 1 : }
735 : }()
736 1 : if considerRecycle {
737 1 : // Try to use a recycled log file. Recycling log files is an important
738 1 : // performance optimization as it is faster to sync a file that has
739 1 : // already been written, than one which is being written for the first
740 1 : // time. This is due to the need to sync file metadata when a file is
741 1 : // being written for the first time. Note this is true even if file
742 1 : // preallocation is performed (e.g. fallocate).
743 1 : var recycleLog base.FileInfo
744 1 : var recycleOK bool
745 1 : func() {
746 1 : wm.recyclerPeekPopMu.Lock()
747 1 : defer wm.recyclerPeekPopMu.Unlock()
748 1 : recycleLog, recycleOK = wm.recycler.Peek()
749 1 : if recycleOK {
750 1 : if err = wm.recycler.Pop(recycleLog.FileNum); err != nil {
751 0 : panic(err)
752 : }
753 : }
754 : }()
755 1 : if recycleOK {
756 1 : createInfo.RecycledFileNum = recycleLog.FileNum
757 1 : recycleLogName := dir.FS.PathJoin(dir.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0))
758 1 : r.writeStart()
759 1 : logFile, err = dir.FS.ReuseForWrite(recycleLogName, logFilename, "pebble-wal")
760 1 : r.writeEnd(err)
761 1 : // TODO(sumeer): should we fatal since primary dir? At some point it is
762 1 : // better to fatal instead of continuing to failover.
763 1 : // base.MustExist(dir.FS, logFilename, wm.opts.Logger, err)
764 1 : if err != nil {
765 0 : // TODO(sumeer): we have popped from the logRecycler, which is
766 0 : // arguably correct, since we don't want to keep trying to reuse a log
767 0 : // that causes some error. But the original or new file may exist, and
768 0 : // no one will clean it up unless the process restarts.
769 0 : return nil, 0, err
770 0 : }
771 : // Figure out the recycled WAL size. This Stat is necessary because
772 : // ReuseForWrite's contract allows for removing the old file and
773 : // creating a new one. We don't know whether the WAL was actually
774 : // recycled.
775 : //
776 : // TODO(jackson): Adding a boolean to the ReuseForWrite return value
777 : // indicating whether or not the file was actually reused would allow us
778 : // to skip the stat and use recycleLog.FileSize.
779 1 : var finfo os.FileInfo
780 1 : finfo, err = logFile.Stat()
781 1 : if err != nil {
782 0 : logFile.Close()
783 0 : return nil, 0, err
784 0 : }
785 1 : initialFileSize = uint64(finfo.Size())
786 1 : return logFile, initialFileSize, nil
787 : }
788 : }
789 : // Did not recycle.
790 : //
791 : // Create file.
792 1 : r.writeStart()
793 1 : logFile, err = dir.FS.Create(logFilename, "pebble-wal")
794 1 : r.writeEnd(err)
795 1 : return logFile, 0, err
796 : }
797 :
798 : type stopper struct {
799 : quiescer chan struct{} // Closed when quiescing
800 : wg sync.WaitGroup
801 : }
802 :
803 1 : func newStopper() *stopper {
804 1 : return &stopper{
805 1 : quiescer: make(chan struct{}),
806 1 : }
807 1 : }
808 :
809 1 : func (s *stopper) runAsync(f func()) {
810 1 : s.wg.Add(1)
811 1 : go func() {
812 1 : f()
813 1 : s.wg.Done()
814 1 : }()
815 : }
816 :
817 : // shouldQuiesce returns a channel which will be closed when stop() has been
818 : // invoked and outstanding goroutines should begin to quiesce.
819 1 : func (s *stopper) shouldQuiesce() <-chan struct{} {
820 1 : return s.quiescer
821 1 : }
822 :
823 1 : func (s *stopper) stop() {
824 1 : close(s.quiescer)
825 1 : s.wg.Wait()
826 1 : }
827 :
828 : // timeSource and tickerI are extracted from CockroachDB's timeutil, with
829 : // removal of support for Timer, and added support in the manual
830 : // implementation for reset.
831 :
832 : // timeSource is used to interact with the clock and tickers. Abstracts
833 : // time.Now and time.NewTicker for testing.
834 : type timeSource interface {
835 : now() time.Time
836 : newTicker(duration time.Duration) tickerI
837 : }
838 :
839 : // tickerI is an interface wrapping time.Ticker.
840 : type tickerI interface {
841 : reset(duration time.Duration)
842 : stop()
843 : ch() <-chan time.Time
844 : }
845 :
846 : // defaultTime is a timeSource using the time package.
847 : type defaultTime struct{}
848 :
849 : var _ timeSource = defaultTime{}
850 :
851 1 : func (defaultTime) now() time.Time {
852 1 : return time.Now()
853 1 : }
854 :
855 1 : func (defaultTime) newTicker(duration time.Duration) tickerI {
856 1 : return (*defaultTicker)(time.NewTicker(duration))
857 1 : }
858 :
859 : // defaultTicker uses time.Ticker.
860 : type defaultTicker time.Ticker
861 :
862 : var _ tickerI = &defaultTicker{}
863 :
864 1 : func (t *defaultTicker) reset(duration time.Duration) {
865 1 : (*time.Ticker)(t).Reset(duration)
866 1 : }
867 :
868 1 : func (t *defaultTicker) stop() {
869 1 : (*time.Ticker)(t).Stop()
870 1 : }
871 :
872 1 : func (t *defaultTicker) ch() <-chan time.Time {
873 1 : return (*time.Ticker)(t).C
874 1 : }
875 :
876 : // Make lint happy.
877 : var _ = (*failoverMonitor).noWriter
878 : var _ = (*failoverManager).writerClosed
879 : var _ = (&stopper{}).shouldQuiesce
|