Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "os"
11 : "sync"
12 : "time"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/vfs"
17 : "golang.org/x/exp/rand"
18 : )
19 :
20 : // dirProber probes the primary dir, until it is confirmed to be healthy. If
21 : // it doesn't have enough samples, it is deemed to be unhealthy. It is only
22 : // used for failback to the primary.
23 : type dirProber struct {
24 : fs vfs.FS
25 : // The full path of the file to use for the probe. The probe is destructive
26 : // in that it deletes and (re)creates the file.
27 : filename string
28 : // The probing interval, when enabled.
29 : interval time.Duration
30 : timeSource
31 : // buf holds the random bytes that are written during the probe.
32 : buf [100 << 10]byte
33 : // enabled is signaled to enable and disable probing. The initial state of
34 : // the prober is disabled.
35 : enabled chan bool
36 : mu struct {
37 : sync.Mutex
38 : // Circular buffer of history samples.
39 : history [probeHistoryLength]time.Duration
40 : // The history is in [firstProbeIndex, nextProbeIndex).
41 : firstProbeIndex int
42 : nextProbeIndex int
43 : }
44 : iterationForTesting chan<- struct{}
45 : }
46 :
47 : const probeHistoryLength = 128
48 :
49 : // Large value.
50 : const failedProbeDuration = 24 * 60 * 60 * time.Second
51 :
52 : // init takes a stopper in order to connect the dirProber's long-running
53 : // goroutines with the stopper's wait group, but the dirProber has its own
54 : // stop() method that should be invoked to trigger the shutdown.
55 : func (p *dirProber) init(
56 : fs vfs.FS,
57 : filename string,
58 : interval time.Duration,
59 : stopper *stopper,
60 : ts timeSource,
61 : notifyIterationForTesting chan<- struct{},
62 2 : ) {
63 2 : *p = dirProber{
64 2 : fs: fs,
65 2 : filename: filename,
66 2 : interval: interval,
67 2 : timeSource: ts,
68 2 : enabled: make(chan bool),
69 2 : iterationForTesting: notifyIterationForTesting,
70 2 : }
71 2 : // Random bytes for writing, to defeat any FS compression optimization.
72 2 : _, err := rand.Read(p.buf[:])
73 2 : if err != nil {
74 0 : panic(err)
75 : }
76 : // dirProber has an explicit stop() method instead of listening on
77 : // stopper.shouldQuiesce. This structure helps negotiate the shutdown
78 : // between failoverMonitor and dirProber. If the dirProber independently
79 : // listened to shouldQuiesce, it might exit before the failover monitor. The
80 : // [enable|disable]Probing methods require sending on a channel expecting
81 : // that the dirProber goroutine will receive on the same channel. If the
82 : // dirProber noticed the queiscence and exited first, the failoverMonitor
83 : // could deadlock waiting for a goroutine that no longer exists.
84 : //
85 : // Instead, shutdown of the dirProber is coordinated by the failoverMonitor:
86 : // - the failoverMonitor listens to stopper.shouldQuiesce.
87 : // - when the failoverMonitor is exiting, it will no longer attempt to
88 : // interact with the dirProber. Only then does it invoke dirProber.stop.
89 2 : stopper.runAsync(p.probeLoop)
90 : }
91 :
92 2 : func (p *dirProber) probeLoop() {
93 2 : ticker := p.timeSource.newTicker(p.interval)
94 2 : ticker.stop()
95 2 : tickerCh := ticker.ch()
96 2 : shouldContinue := true
97 2 : var enabled bool
98 2 : for shouldContinue {
99 2 : select {
100 2 : case <-tickerCh:
101 2 : if !enabled {
102 2 : // Could have a tick waiting before we disabled it. Ignore.
103 2 : continue
104 : }
105 2 : probeDur := func() time.Duration {
106 2 : // Delete, create, write, sync.
107 2 : start := p.timeSource.now()
108 2 : _ = p.fs.Remove(p.filename)
109 2 : f, err := p.fs.Create(p.filename, "pebble-wal")
110 2 : if err != nil {
111 1 : return failedProbeDuration
112 1 : }
113 2 : defer f.Close()
114 2 : n, err := f.Write(p.buf[:])
115 2 : if err != nil {
116 0 : return failedProbeDuration
117 0 : }
118 2 : if n != len(p.buf) {
119 0 : panic("invariant violation")
120 : }
121 2 : err = f.Sync()
122 2 : if err != nil {
123 0 : return failedProbeDuration
124 0 : }
125 2 : return p.timeSource.now().Sub(start)
126 : }()
127 2 : p.mu.Lock()
128 2 : nextIndex := p.mu.nextProbeIndex % probeHistoryLength
129 2 : p.mu.history[nextIndex] = probeDur
130 2 : p.mu.nextProbeIndex++
131 2 : numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex
132 2 : if numSamples > probeHistoryLength {
133 2 : // Length has exceeded capacity, i.e., overwritten the first probe.
134 2 : p.mu.firstProbeIndex++
135 2 : }
136 2 : p.mu.Unlock()
137 :
138 2 : case enabled, shouldContinue = <-p.enabled:
139 2 : if !enabled || !shouldContinue {
140 2 : ticker.stop()
141 2 : p.mu.Lock()
142 2 : p.mu.firstProbeIndex = 0
143 2 : p.mu.nextProbeIndex = 0
144 2 : p.mu.Unlock()
145 2 : } else {
146 2 : ticker.reset(p.interval)
147 2 : }
148 : }
149 2 : if p.iterationForTesting != nil {
150 1 : p.iterationForTesting <- struct{}{}
151 1 : }
152 : }
153 : }
154 :
155 2 : func (p *dirProber) enableProbing() {
156 2 : p.enabled <- true
157 2 : }
158 :
159 2 : func (p *dirProber) disableProbing() {
160 2 : p.enabled <- false
161 2 : }
162 :
163 2 : func (p *dirProber) stop() {
164 2 : close(p.enabled)
165 2 : }
166 :
167 2 : func (p *dirProber) getMeanMax(interval time.Duration) (time.Duration, time.Duration) {
168 2 : p.mu.Lock()
169 2 : defer p.mu.Unlock()
170 2 : numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex
171 2 : samplesNeeded := int((interval + p.interval - 1) / p.interval)
172 2 : if samplesNeeded == 0 {
173 0 : panic("interval is too short")
174 2 : } else if samplesNeeded > probeHistoryLength {
175 0 : panic("interval is too long")
176 : }
177 2 : if samplesNeeded > numSamples {
178 2 : // Not enough samples, so assume not yet healthy.
179 2 : return failedProbeDuration, failedProbeDuration
180 2 : }
181 2 : offset := numSamples - samplesNeeded
182 2 : var sum, max time.Duration
183 2 : for i := p.mu.firstProbeIndex + offset; i < p.mu.nextProbeIndex; i++ {
184 2 : sampleDur := p.mu.history[i%probeHistoryLength]
185 2 : sum += sampleDur
186 2 : if max < sampleDur {
187 2 : max = sampleDur
188 2 : }
189 : }
190 2 : mean := sum / time.Duration(samplesNeeded)
191 2 : return mean, max
192 : }
193 :
194 : type dirIndex int
195 :
196 : const (
197 : primaryDirIndex dirIndex = iota
198 : secondaryDirIndex
199 : numDirIndices
200 : )
201 :
202 : type dirAndFileHandle struct {
203 : Dir
204 : vfs.File
205 : }
206 :
207 : // switchableWriter is a subset of failoverWriter needed by failoverMonitor.
208 : type switchableWriter interface {
209 : switchToNewDir(dirAndFileHandle) error
210 : ongoingLatencyOrErrorForCurDir() (time.Duration, error)
211 : }
212 :
213 : type failoverMonitorOptions struct {
214 : // The primary and secondary dir.
215 : dirs [numDirIndices]dirAndFileHandle
216 :
217 : FailoverOptions
218 : stopper *stopper
219 : }
220 :
221 : // failoverMonitor monitors the latency and error observed by the
222 : // switchableWriter, and does failover by switching the dir. It also monitors
223 : // the primary dir for failback.
224 : type failoverMonitor struct {
225 : opts failoverMonitorOptions
226 : prober dirProber
227 : mu struct {
228 : sync.Mutex
229 : // dirIndex and lastFailbackTime are only modified by monitorLoop. They
230 : // are protected by the mutex for concurrent reads.
231 :
232 : // dirIndex is the directory to use by writer (if non-nil), or when the
233 : // writer becomes non-nil.
234 : dirIndex
235 : // The time at which the monitor last failed back to the primary. This is
236 : // only relevant when dirIndex == primaryDirIndex.
237 : lastFailBackTime time.Time
238 : // The current failoverWriter, exposed via a narrower interface.
239 : writer switchableWriter
240 :
241 : // Stats.
242 : dirSwitchCount int64
243 : lastAccumulateIntoDurations time.Time
244 : primaryWriteDuration time.Duration
245 : secondaryWriteDuration time.Duration
246 : }
247 : }
248 :
249 2 : func newFailoverMonitor(opts failoverMonitorOptions) *failoverMonitor {
250 2 : m := &failoverMonitor{
251 2 : opts: opts,
252 2 : }
253 2 : m.mu.lastAccumulateIntoDurations = opts.timeSource.now()
254 2 : m.prober.init(opts.dirs[primaryDirIndex].FS,
255 2 : opts.dirs[primaryDirIndex].FS.PathJoin(opts.dirs[primaryDirIndex].Dirname, "probe-file"),
256 2 : opts.PrimaryDirProbeInterval, opts.stopper, opts.timeSource, opts.proberIterationForTesting)
257 2 : opts.stopper.runAsync(func() {
258 2 : m.monitorLoop(opts.stopper.shouldQuiesce())
259 2 : })
260 2 : return m
261 : }
262 :
263 : // Called when previous writer is closed
264 2 : func (m *failoverMonitor) noWriter() {
265 2 : now := m.opts.timeSource.now()
266 2 : m.mu.Lock()
267 2 : defer m.mu.Unlock()
268 2 : m.mu.writer = nil
269 2 : m.accumulateDurationLocked(now)
270 2 : }
271 :
272 : // writerCreateFunc is allowed to return nil, if there is an error. It is not
273 : // the responsibility of failoverMonitor to handle that error. So this should
274 : // not be due to an IO error (which failoverMonitor is interested in).
275 2 : func (m *failoverMonitor) newWriter(writerCreateFunc func(dir dirAndFileHandle) switchableWriter) {
276 2 : m.mu.Lock()
277 2 : defer m.mu.Unlock()
278 2 : if m.mu.writer != nil {
279 0 : panic("previous writer not closed")
280 : }
281 2 : m.mu.writer = writerCreateFunc(m.opts.dirs[m.mu.dirIndex])
282 : }
283 :
284 2 : func (m *failoverMonitor) elevateWriteStallThresholdForFailover() bool {
285 2 : m.mu.Lock()
286 2 : defer m.mu.Unlock()
287 2 : if m.mu.dirIndex == secondaryDirIndex {
288 2 : return true
289 2 : }
290 2 : intervalSinceFailedback := m.opts.timeSource.now().Sub(m.mu.lastFailBackTime)
291 2 : return intervalSinceFailedback < m.opts.ElevatedWriteStallThresholdLag
292 : }
293 :
294 2 : func (m *failoverMonitor) accumulateDurationLocked(now time.Time) {
295 2 : dur := now.Sub(m.mu.lastAccumulateIntoDurations)
296 2 : m.mu.lastAccumulateIntoDurations = now
297 2 : if m.mu.dirIndex == primaryDirIndex {
298 2 : m.mu.primaryWriteDuration += dur
299 2 : return
300 2 : }
301 2 : m.mu.secondaryWriteDuration += dur
302 : }
303 :
304 2 : func (m *failoverMonitor) stats() FailoverStats {
305 2 : now := m.opts.timeSource.now()
306 2 : m.mu.Lock()
307 2 : defer m.mu.Unlock()
308 2 : m.accumulateDurationLocked(now)
309 2 : return FailoverStats{
310 2 : DirSwitchCount: m.mu.dirSwitchCount,
311 2 : PrimaryWriteDuration: m.mu.primaryWriteDuration,
312 2 : SecondaryWriteDuration: m.mu.secondaryWriteDuration,
313 2 : }
314 2 : }
315 :
316 : // lastWriterInfo is state maintained in the monitorLoop for the latest
317 : // switchable writer. It is mainly used to dampen the switching.
318 : type lastWriterInfo struct {
319 : writer switchableWriter
320 : numSwitches int
321 : ongoingLatencyAtSwitch time.Duration
322 : errorCounts [numDirIndices]int
323 : }
324 :
325 2 : func (m *failoverMonitor) monitorLoop(shouldQuiesce <-chan struct{}) {
326 2 : ticker := m.opts.timeSource.newTicker(m.opts.UnhealthySamplingInterval)
327 2 : if m.opts.monitorIterationForTesting != nil {
328 1 : m.opts.monitorIterationForTesting <- struct{}{}
329 1 : }
330 2 : tickerCh := ticker.ch()
331 2 : dirIndex := primaryDirIndex
332 2 : var lastWriter lastWriterInfo
333 2 : for {
334 2 : select {
335 2 : case <-shouldQuiesce:
336 2 : ticker.stop()
337 2 : m.prober.stop()
338 2 : return
339 2 : case <-tickerCh:
340 2 : writerOngoingLatency, writerErr := func() (time.Duration, error) {
341 2 : m.mu.Lock()
342 2 : defer m.mu.Unlock()
343 2 : if m.mu.writer != lastWriter.writer {
344 2 : lastWriter = lastWriterInfo{writer: m.mu.writer}
345 2 : }
346 2 : if lastWriter.writer == nil {
347 2 : return 0, nil
348 2 : }
349 2 : return lastWriter.writer.ongoingLatencyOrErrorForCurDir()
350 : }()
351 2 : switchDir := false
352 2 : // Arbitrary value.
353 2 : const highSecondaryErrorCountThreshold = 2
354 2 : // We don't consider a switch if currently using the primary dir and the
355 2 : // secondary dir has high enough errors. It is more likely that someone
356 2 : // has misconfigured a secondary e.g. wrong permissions or not enough
357 2 : // disk space. We only remember the error history in the context of the
358 2 : // lastWriter since an operator can fix the underlying misconfiguration.
359 2 : unhealthyThreshold, failoverEnabled := m.opts.UnhealthyOperationLatencyThreshold()
360 2 :
361 2 : if !(lastWriter.errorCounts[secondaryDirIndex] >= highSecondaryErrorCountThreshold &&
362 2 : dirIndex == primaryDirIndex) && failoverEnabled {
363 2 : // Switching heuristics. Subject to change based on real world experience.
364 2 : if writerErr != nil {
365 1 : // An error causes an immediate switch, since a LogWriter with an
366 1 : // error is useless.
367 1 : lastWriter.errorCounts[dirIndex]++
368 1 : switchDir = true
369 2 : } else if writerOngoingLatency > unhealthyThreshold {
370 2 : // Arbitrary value.
371 2 : const switchImmediatelyCountThreshold = 2
372 2 : // High latency. Switch immediately if the number of switches that
373 2 : // have been done is below the threshold, since that gives us an
374 2 : // observation of both dirs. Once above that threshold, decay the
375 2 : // switch rate by increasing the observed latency needed for a
376 2 : // switch.
377 2 : if lastWriter.numSwitches < switchImmediatelyCountThreshold ||
378 2 : writerOngoingLatency > 2*lastWriter.ongoingLatencyAtSwitch {
379 2 : switchDir = true
380 2 : lastWriter.ongoingLatencyAtSwitch = writerOngoingLatency
381 2 : }
382 : // Else high latency, but not high enough yet to motivate switch.
383 2 : } else if dirIndex == secondaryDirIndex {
384 2 : // The writer looks healthy. We can still switch if the writer is using the
385 2 : // secondary dir and the primary is healthy again.
386 2 : primaryMean, primaryMax := m.prober.getMeanMax(m.opts.HealthyInterval)
387 2 : if primaryMean < m.opts.HealthyProbeLatencyThreshold &&
388 2 : primaryMax < m.opts.HealthyProbeLatencyThreshold {
389 2 : switchDir = true
390 2 : }
391 : }
392 : }
393 2 : if switchDir {
394 2 : lastWriter.numSwitches++
395 2 : if dirIndex == secondaryDirIndex {
396 2 : // Switching back to primary, so don't need to probe to see if
397 2 : // primary is healthy.
398 2 : m.prober.disableProbing()
399 2 : dirIndex = primaryDirIndex
400 2 : } else {
401 2 : m.prober.enableProbing()
402 2 : dirIndex = secondaryDirIndex
403 2 : }
404 2 : dir := m.opts.dirs[dirIndex]
405 2 : now := m.opts.timeSource.now()
406 2 : m.mu.Lock()
407 2 : m.accumulateDurationLocked(now)
408 2 : m.mu.dirIndex = dirIndex
409 2 : m.mu.dirSwitchCount++
410 2 : if dirIndex == primaryDirIndex {
411 2 : m.mu.lastFailBackTime = now
412 2 : }
413 2 : if m.mu.writer != nil {
414 2 : m.mu.writer.switchToNewDir(dir)
415 2 : }
416 2 : m.mu.Unlock()
417 : }
418 : }
419 2 : if m.opts.monitorStateForTesting != nil {
420 1 : m.opts.monitorStateForTesting(lastWriter.numSwitches, lastWriter.ongoingLatencyAtSwitch)
421 1 : }
422 2 : if m.opts.monitorIterationForTesting != nil {
423 1 : m.opts.monitorIterationForTesting <- struct{}{}
424 1 : }
425 : }
426 : }
427 :
428 : type logicalLogWithSizesEtc struct {
429 : num NumWAL
430 : segments []segmentWithSizeEtc
431 : }
432 :
433 : type segmentWithSizeEtc struct {
434 : segment
435 : approxFileSize uint64
436 : synchronouslyClosed bool
437 : }
438 :
439 : type failoverManager struct {
440 : opts Options
441 : // initialObsolete holds the set of DeletableLogs that formed the logs
442 : // passed into Init. The initialObsolete logs are all obsolete. Once
443 : // returned via Manager.Obsolete, initialObsolete is cleared. The
444 : // initialObsolete logs are stored separately from mu.queue because they may
445 : // include logs that were NOT created by the standalone manager, and
446 : // multiple physical log files may form one logical WAL.
447 : initialObsolete []DeletableLog
448 :
449 : // TODO(jackson/sumeer): read-path etc.
450 :
451 : dirHandles [numDirIndices]vfs.File
452 : stopper *stopper
453 : monitor *failoverMonitor
454 : mu struct {
455 : sync.Mutex
456 : closedWALs []logicalLogWithSizesEtc
457 : ww *failoverWriter
458 : }
459 : recycler LogRecycler
460 : // Due to async creation of files in failoverWriter, multiple goroutines can
461 : // concurrently try to get a file from the recycler. This mutex protects the
462 : // logRecycler.{Peek,Pop} pair.
463 : recyclerPeekPopMu sync.Mutex
464 : }
465 :
466 : var _ Manager = &failoverManager{}
467 :
468 : // TODO(sumeer):
469 : // - log deletion: if record.LogWriter did not close yet, the cleaner may
470 : // get an error when deleting or renaming (only under windows?).
471 :
472 : // init implements Manager.
473 2 : func (wm *failoverManager) init(o Options, initial Logs) error {
474 2 : if o.timeSource == nil {
475 2 : o.timeSource = defaultTime{}
476 2 : }
477 2 : o.FailoverOptions.EnsureDefaults()
478 2 :
479 2 : // Synchronously ensure that we're able to write to the secondary before we
480 2 : // proceed. An operator doesn't want to encounter an issue writing to the
481 2 : // secondary the first time there's a need to failover. We write a bit of
482 2 : // metadata to a file in the secondary's directory.
483 2 : f, err := o.Secondary.FS.Create(o.Secondary.FS.PathJoin(o.Secondary.Dirname, "failover_source"), "pebble-wal")
484 2 : if err != nil {
485 1 : return errors.Newf("failed to write to WAL secondary dir: %v", err)
486 1 : }
487 2 : if _, err := io.WriteString(f, fmt.Sprintf("primary: %s\nprocess start: %s\n",
488 2 : o.Primary.Dirname,
489 2 : time.Now(),
490 2 : )); err != nil {
491 0 : return errors.Newf("failed to write metadata to WAL secondary dir: %v", err)
492 0 : }
493 2 : if err := errors.CombineErrors(f.Sync(), f.Close()); err != nil {
494 0 : return err
495 0 : }
496 :
497 2 : stopper := newStopper()
498 2 : var dirs [numDirIndices]dirAndFileHandle
499 2 : for i, dir := range []Dir{o.Primary, o.Secondary} {
500 2 : dirs[i].Dir = dir
501 2 : f, err := dir.FS.OpenDir(dir.Dirname)
502 2 : if err != nil {
503 0 : return err
504 0 : }
505 2 : dirs[i].File = f
506 : }
507 2 : fmOpts := failoverMonitorOptions{
508 2 : dirs: dirs,
509 2 : FailoverOptions: o.FailoverOptions,
510 2 : stopper: stopper,
511 2 : }
512 2 : monitor := newFailoverMonitor(fmOpts)
513 2 : *wm = failoverManager{
514 2 : opts: o,
515 2 : dirHandles: [numDirIndices]vfs.File{dirs[primaryDirIndex].File, dirs[secondaryDirIndex].File},
516 2 : stopper: stopper,
517 2 : monitor: monitor,
518 2 : }
519 2 : wm.recycler.Init(o.MaxNumRecyclableLogs)
520 2 : for _, ll := range initial {
521 2 : if wm.recycler.MinRecycleLogNum() <= ll.Num {
522 2 : wm.recycler.SetMinRecycleLogNum(ll.Num + 1)
523 2 : }
524 2 : var err error
525 2 : wm.initialObsolete, err = appendDeletableLogs(wm.initialObsolete, ll)
526 2 : if err != nil {
527 0 : return err
528 0 : }
529 : }
530 2 : return nil
531 : }
532 :
533 : // List implements Manager.
534 2 : func (wm *failoverManager) List() (Logs, error) {
535 2 : wm.mu.Lock()
536 2 : defer wm.mu.Unlock()
537 2 : n := len(wm.mu.closedWALs)
538 2 : if wm.mu.ww != nil {
539 2 : n++
540 2 : }
541 2 : wals := make(Logs, n)
542 2 : setLogicalLog := func(index int, llse logicalLogWithSizesEtc) {
543 2 : segments := make([]segment, len(llse.segments))
544 2 : for j := range llse.segments {
545 2 : segments[j] = llse.segments[j].segment
546 2 : }
547 2 : wals[index] = LogicalLog{
548 2 : Num: llse.num,
549 2 : segments: segments,
550 2 : }
551 : }
552 2 : for i, llse := range wm.mu.closedWALs {
553 2 : setLogicalLog(i, llse)
554 2 : }
555 2 : if wm.mu.ww != nil {
556 2 : setLogicalLog(n-1, wm.mu.ww.getLog())
557 2 : }
558 2 : return wals, nil
559 : }
560 :
561 : // Obsolete implements Manager.
562 : func (wm *failoverManager) Obsolete(
563 : minUnflushedNum NumWAL, noRecycle bool,
564 2 : ) (toDelete []DeletableLog, err error) {
565 2 : wm.mu.Lock()
566 2 : defer wm.mu.Unlock()
567 2 :
568 2 : // If this is the first call to Obsolete after Open, we may have deletable
569 2 : // logs outside the queue.
570 2 : toDelete, wm.initialObsolete = wm.initialObsolete, nil
571 2 :
572 2 : i := 0
573 2 : for ; i < len(wm.mu.closedWALs); i++ {
574 2 : ll := wm.mu.closedWALs[i]
575 2 : if ll.num >= minUnflushedNum {
576 2 : break
577 : }
578 : // Recycle only the primary at logNameIndex=0, if there was no failover,
579 : // and synchronously closed. It may not be safe to recycle a file that is
580 : // still being written to. And recycling when there was a failover may
581 : // fill up the recycler with smaller log files. The restriction regarding
582 : // logNameIndex=0 is because logRecycler.Peek only exposes the
583 : // DiskFileNum, and we need to use that to construct the path -- we could
584 : // remove this restriction by changing the logRecycler interface, but we
585 : // don't bother.
586 2 : canRecycle := !noRecycle && len(ll.segments) == 1 && ll.segments[0].synchronouslyClosed &&
587 2 : ll.segments[0].logNameIndex == 0 &&
588 2 : ll.segments[0].dir == wm.opts.Primary
589 2 : if !canRecycle || !wm.recycler.Add(base.FileInfo{
590 2 : FileNum: base.DiskFileNum(ll.num),
591 2 : FileSize: ll.segments[0].approxFileSize,
592 2 : }) {
593 2 : for _, s := range ll.segments {
594 2 : toDelete = append(toDelete, DeletableLog{
595 2 : FS: s.dir.FS,
596 2 : Path: s.dir.FS.PathJoin(s.dir.Dirname, makeLogFilename(ll.num, s.logNameIndex)),
597 2 : NumWAL: ll.num,
598 2 : ApproxFileSize: s.approxFileSize,
599 2 : })
600 2 : }
601 : }
602 : }
603 2 : wm.mu.closedWALs = wm.mu.closedWALs[i:]
604 2 : return toDelete, nil
605 : }
606 :
607 : // Create implements Manager.
608 2 : func (wm *failoverManager) Create(wn NumWAL, jobID int) (Writer, error) {
609 2 : func() {
610 2 : wm.mu.Lock()
611 2 : defer wm.mu.Unlock()
612 2 : if wm.mu.ww != nil {
613 0 : panic("previous wal.Writer not closed")
614 : }
615 : }()
616 2 : fwOpts := failoverWriterOpts{
617 2 : wn: wn,
618 2 : logger: wm.opts.Logger,
619 2 : timeSource: wm.opts.timeSource,
620 2 : jobID: jobID,
621 2 : logCreator: wm.logCreator,
622 2 : noSyncOnClose: wm.opts.NoSyncOnClose,
623 2 : bytesPerSync: wm.opts.BytesPerSync,
624 2 : preallocateSize: wm.opts.PreallocateSize,
625 2 : minSyncInterval: wm.opts.MinSyncInterval,
626 2 : fsyncLatency: wm.opts.FsyncLatency,
627 2 : queueSemChan: wm.opts.QueueSemChan,
628 2 : stopper: wm.stopper,
629 2 : failoverWriteAndSyncLatency: wm.opts.FailoverWriteAndSyncLatency,
630 2 : writerClosed: wm.writerClosed,
631 2 : writerCreatedForTest: wm.opts.logWriterCreatedForTesting,
632 2 : }
633 2 : var err error
634 2 : var ww *failoverWriter
635 2 : writerCreateFunc := func(dir dirAndFileHandle) switchableWriter {
636 2 : ww, err = newFailoverWriter(fwOpts, dir)
637 2 : if err != nil {
638 0 : return nil
639 0 : }
640 2 : return ww
641 : }
642 2 : wm.monitor.newWriter(writerCreateFunc)
643 2 : if ww != nil {
644 2 : wm.mu.Lock()
645 2 : defer wm.mu.Unlock()
646 2 : wm.mu.ww = ww
647 2 : }
648 2 : return ww, err
649 : }
650 :
651 : // ElevateWriteStallThresholdForFailover implements Manager.
652 2 : func (wm *failoverManager) ElevateWriteStallThresholdForFailover() bool {
653 2 : return wm.monitor.elevateWriteStallThresholdForFailover()
654 2 : }
655 :
656 2 : func (wm *failoverManager) writerClosed(llse logicalLogWithSizesEtc) {
657 2 : wm.monitor.noWriter()
658 2 : wm.mu.Lock()
659 2 : defer wm.mu.Unlock()
660 2 : wm.mu.closedWALs = append(wm.mu.closedWALs, llse)
661 2 : wm.mu.ww = nil
662 2 : }
663 :
664 : // Stats implements Manager.
665 2 : func (wm *failoverManager) Stats() Stats {
666 2 : obsoleteLogsCount, obsoleteLogSize := wm.recycler.Stats()
667 2 : failoverStats := wm.monitor.stats()
668 2 : failoverStats.FailoverWriteAndSyncLatency = wm.opts.FailoverWriteAndSyncLatency
669 2 : wm.mu.Lock()
670 2 : defer wm.mu.Unlock()
671 2 : var liveFileCount int
672 2 : var liveFileSize uint64
673 2 : updateStats := func(segments []segmentWithSizeEtc) {
674 2 : for _, s := range segments {
675 2 : liveFileCount++
676 2 : liveFileSize += s.approxFileSize
677 2 : }
678 : }
679 2 : for _, llse := range wm.mu.closedWALs {
680 2 : updateStats(llse.segments)
681 2 : }
682 2 : if wm.mu.ww != nil {
683 1 : updateStats(wm.mu.ww.getLog().segments)
684 1 : }
685 2 : for i := range wm.initialObsolete {
686 1 : if i == 0 || wm.initialObsolete[i].NumWAL != wm.initialObsolete[i-1].NumWAL {
687 1 : obsoleteLogsCount++
688 1 : }
689 1 : obsoleteLogSize += wm.initialObsolete[i].ApproxFileSize
690 : }
691 2 : return Stats{
692 2 : ObsoleteFileCount: obsoleteLogsCount,
693 2 : ObsoleteFileSize: obsoleteLogSize,
694 2 : LiveFileCount: liveFileCount,
695 2 : LiveFileSize: liveFileSize,
696 2 : Failover: failoverStats,
697 2 : }
698 : }
699 :
700 : // Close implements Manager.
701 2 : func (wm *failoverManager) Close() error {
702 2 : wm.stopper.stop()
703 2 : // Since all goroutines are stopped, can close the dirs.
704 2 : var err error
705 2 : for _, f := range wm.dirHandles {
706 2 : err = firstError(err, f.Close())
707 2 : }
708 2 : return err
709 : }
710 :
711 : // RecyclerForTesting implements Manager.
712 0 : func (wm *failoverManager) RecyclerForTesting() *LogRecycler {
713 0 : return nil
714 0 : }
715 :
716 : // logCreator implements the logCreator func type.
717 : func (wm *failoverManager) logCreator(
718 : dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
719 2 : ) (logFile vfs.File, initialFileSize uint64, err error) {
720 2 : logFilename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li))
721 2 : isPrimary := dir == wm.opts.Primary
722 2 : // Only recycling when logNameIndex is 0 is a somewhat arbitrary choice.
723 2 : considerRecycle := li == 0 && isPrimary
724 2 : createInfo := CreateInfo{
725 2 : JobID: jobID,
726 2 : Path: logFilename,
727 2 : IsSecondary: !isPrimary,
728 2 : Num: wn,
729 2 : Err: nil,
730 2 : }
731 2 : defer func() {
732 2 : createInfo.Err = err
733 2 : if wm.opts.EventListener != nil {
734 2 : wm.opts.EventListener.LogCreated(createInfo)
735 2 : }
736 : }()
737 2 : if considerRecycle {
738 2 : // Try to use a recycled log file. Recycling log files is an important
739 2 : // performance optimization as it is faster to sync a file that has
740 2 : // already been written, than one which is being written for the first
741 2 : // time. This is due to the need to sync file metadata when a file is
742 2 : // being written for the first time. Note this is true even if file
743 2 : // preallocation is performed (e.g. fallocate).
744 2 : var recycleLog base.FileInfo
745 2 : var recycleOK bool
746 2 : func() {
747 2 : wm.recyclerPeekPopMu.Lock()
748 2 : defer wm.recyclerPeekPopMu.Unlock()
749 2 : recycleLog, recycleOK = wm.recycler.Peek()
750 2 : if recycleOK {
751 1 : if err = wm.recycler.Pop(recycleLog.FileNum); err != nil {
752 0 : panic(err)
753 : }
754 : }
755 : }()
756 2 : if recycleOK {
757 1 : createInfo.RecycledFileNum = recycleLog.FileNum
758 1 : recycleLogName := dir.FS.PathJoin(dir.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0))
759 1 : r.writeStart()
760 1 : logFile, err = dir.FS.ReuseForWrite(recycleLogName, logFilename, "pebble-wal")
761 1 : r.writeEnd(err)
762 1 : // TODO(sumeer): should we fatal since primary dir? At some point it is
763 1 : // better to fatal instead of continuing to failover.
764 1 : // base.MustExist(dir.FS, logFilename, wm.opts.Logger, err)
765 1 : if err != nil {
766 0 : // TODO(sumeer): we have popped from the logRecycler, which is
767 0 : // arguably correct, since we don't want to keep trying to reuse a log
768 0 : // that causes some error. But the original or new file may exist, and
769 0 : // no one will clean it up unless the process restarts.
770 0 : return nil, 0, err
771 0 : }
772 : // Figure out the recycled WAL size. This Stat is necessary because
773 : // ReuseForWrite's contract allows for removing the old file and
774 : // creating a new one. We don't know whether the WAL was actually
775 : // recycled.
776 : //
777 : // TODO(jackson): Adding a boolean to the ReuseForWrite return value
778 : // indicating whether or not the file was actually reused would allow us
779 : // to skip the stat and use recycleLog.FileSize.
780 1 : var finfo os.FileInfo
781 1 : finfo, err = logFile.Stat()
782 1 : if err != nil {
783 0 : logFile.Close()
784 0 : return nil, 0, err
785 0 : }
786 1 : initialFileSize = uint64(finfo.Size())
787 1 : return logFile, initialFileSize, nil
788 : }
789 : }
790 : // Did not recycle.
791 : //
792 : // Create file.
793 2 : r.writeStart()
794 2 : logFile, err = dir.FS.Create(logFilename, "pebble-wal")
795 2 : r.writeEnd(err)
796 2 : return logFile, 0, err
797 : }
798 :
799 : type stopper struct {
800 : quiescer chan struct{} // Closed when quiescing
801 : wg sync.WaitGroup
802 : }
803 :
804 2 : func newStopper() *stopper {
805 2 : return &stopper{
806 2 : quiescer: make(chan struct{}),
807 2 : }
808 2 : }
809 :
810 2 : func (s *stopper) runAsync(f func()) {
811 2 : s.wg.Add(1)
812 2 : go func() {
813 2 : f()
814 2 : s.wg.Done()
815 2 : }()
816 : }
817 :
818 : // shouldQuiesce returns a channel which will be closed when stop() has been
819 : // invoked and outstanding goroutines should begin to quiesce.
820 2 : func (s *stopper) shouldQuiesce() <-chan struct{} {
821 2 : return s.quiescer
822 2 : }
823 :
824 2 : func (s *stopper) stop() {
825 2 : close(s.quiescer)
826 2 : s.wg.Wait()
827 2 : }
828 :
829 : // timeSource and tickerI are extracted from CockroachDB's timeutil, with
830 : // removal of support for Timer, and added support in the manual
831 : // implementation for reset.
832 :
833 : // timeSource is used to interact with the clock and tickers. Abstracts
834 : // time.Now and time.NewTicker for testing.
835 : type timeSource interface {
836 : now() time.Time
837 : newTicker(duration time.Duration) tickerI
838 : }
839 :
840 : // tickerI is an interface wrapping time.Ticker.
841 : type tickerI interface {
842 : reset(duration time.Duration)
843 : stop()
844 : ch() <-chan time.Time
845 : }
846 :
847 : // defaultTime is a timeSource using the time package.
848 : type defaultTime struct{}
849 :
850 : var _ timeSource = defaultTime{}
851 :
852 2 : func (defaultTime) now() time.Time {
853 2 : return time.Now()
854 2 : }
855 :
856 2 : func (defaultTime) newTicker(duration time.Duration) tickerI {
857 2 : return (*defaultTicker)(time.NewTicker(duration))
858 2 : }
859 :
860 : // defaultTicker uses time.Ticker.
861 : type defaultTicker time.Ticker
862 :
863 : var _ tickerI = &defaultTicker{}
864 :
865 2 : func (t *defaultTicker) reset(duration time.Duration) {
866 2 : (*time.Ticker)(t).Reset(duration)
867 2 : }
868 :
869 2 : func (t *defaultTicker) stop() {
870 2 : (*time.Ticker)(t).Stop()
871 2 : }
872 :
873 2 : func (t *defaultTicker) ch() <-chan time.Time {
874 2 : return (*time.Ticker)(t).C
875 2 : }
876 :
877 : // Make lint happy.
878 : var _ = (*failoverMonitor).noWriter
879 : var _ = (*failoverManager).writerClosed
880 : var _ = (&stopper{}).shouldQuiesce
|