Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "os"
11 : "sync"
12 : "time"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/vfs"
17 : "golang.org/x/exp/rand"
18 : )
19 :
20 : // dirProber probes the primary dir, until it is confirmed to be healthy. If
21 : // it doesn't have enough samples, it is deemed to be unhealthy. It is only
22 : // used for failback to the primary.
23 : type dirProber struct {
24 : fs vfs.FS
25 : // The full path of the file to use for the probe. The probe is destructive
26 : // in that it deletes and (re)creates the file.
27 : filename string
28 : // The probing interval, when enabled.
29 : interval time.Duration
30 : timeSource
31 : // buf holds the random bytes that are written during the probe.
32 : buf [100 << 10]byte
33 : // enabled is signaled to enable and disable probing. The initial state of
34 : // the prober is disabled.
35 : enabled chan bool
36 : mu struct {
37 : sync.Mutex
38 : // Circular buffer of history samples.
39 : history [probeHistoryLength]time.Duration
40 : // The history is in [firstProbeIndex, nextProbeIndex).
41 : firstProbeIndex int
42 : nextProbeIndex int
43 : }
44 : iterationForTesting chan<- struct{}
45 : }
46 :
47 : const probeHistoryLength = 128
48 :
49 : // Large value.
50 : const failedProbeDuration = 24 * 60 * 60 * time.Second
51 :
52 : // init takes a stopper in order to connect the dirProber's long-running
53 : // goroutines with the stopper's wait group, but the dirProber has its own
54 : // stop() method that should be invoked to trigger the shutdown.
55 : func (p *dirProber) init(
56 : fs vfs.FS,
57 : filename string,
58 : interval time.Duration,
59 : stopper *stopper,
60 : ts timeSource,
61 : notifyIterationForTesting chan<- struct{},
62 1 : ) {
63 1 : *p = dirProber{
64 1 : fs: fs,
65 1 : filename: filename,
66 1 : interval: interval,
67 1 : timeSource: ts,
68 1 : enabled: make(chan bool),
69 1 : iterationForTesting: notifyIterationForTesting,
70 1 : }
71 1 : // Random bytes for writing, to defeat any FS compression optimization.
72 1 : _, err := rand.Read(p.buf[:])
73 1 : if err != nil {
74 0 : panic(err)
75 : }
76 : // dirProber has an explicit stop() method instead of listening on
77 : // stopper.shouldQuiesce. This structure helps negotiate the shutdown
78 : // between failoverMonitor and dirProber. If the dirProber independently
79 : // listened to shouldQuiesce, it might exit before the failover monitor. The
80 : // [enable|disable]Probing methods require sending on a channel expecting
81 : // that the dirProber goroutine will receive on the same channel. If the
82 : // dirProber noticed the queiscence and exited first, the failoverMonitor
83 : // could deadlock waiting for a goroutine that no longer exists.
84 : //
85 : // Instead, shutdown of the dirProber is coordinated by the failoverMonitor:
86 : // - the failoverMonitor listens to stopper.shouldQuiesce.
87 : // - when the failoverMonitor is exiting, it will no longer attempt to
88 : // interact with the dirProber. Only then does it invoke dirProber.stop.
89 1 : stopper.runAsync(p.probeLoop)
90 : }
91 :
92 1 : func (p *dirProber) probeLoop() {
93 1 : ticker := p.timeSource.newTicker(p.interval)
94 1 : ticker.stop()
95 1 : tickerCh := ticker.ch()
96 1 : shouldContinue := true
97 1 : var enabled bool
98 1 : for shouldContinue {
99 1 : select {
100 1 : case <-tickerCh:
101 1 : if !enabled {
102 1 : // Could have a tick waiting before we disabled it. Ignore.
103 1 : continue
104 : }
105 1 : probeDur := func() time.Duration {
106 1 : // Delete, create, write, sync.
107 1 : start := p.timeSource.now()
108 1 : _ = p.fs.Remove(p.filename)
109 1 : f, err := p.fs.Create(p.filename, "pebble-wal")
110 1 : if err != nil {
111 0 : return failedProbeDuration
112 0 : }
113 1 : defer f.Close()
114 1 : n, err := f.Write(p.buf[:])
115 1 : if err != nil {
116 0 : return failedProbeDuration
117 0 : }
118 1 : if n != len(p.buf) {
119 0 : panic("invariant violation")
120 : }
121 1 : err = f.Sync()
122 1 : if err != nil {
123 0 : return failedProbeDuration
124 0 : }
125 1 : return p.timeSource.now().Sub(start)
126 : }()
127 1 : p.mu.Lock()
128 1 : nextIndex := p.mu.nextProbeIndex % probeHistoryLength
129 1 : p.mu.history[nextIndex] = probeDur
130 1 : p.mu.nextProbeIndex++
131 1 : numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex
132 1 : if numSamples > probeHistoryLength {
133 1 : // Length has exceeded capacity, i.e., overwritten the first probe.
134 1 : p.mu.firstProbeIndex++
135 1 : }
136 1 : p.mu.Unlock()
137 :
138 1 : case enabled, shouldContinue = <-p.enabled:
139 1 : if !enabled || !shouldContinue {
140 1 : ticker.stop()
141 1 : p.mu.Lock()
142 1 : p.mu.firstProbeIndex = 0
143 1 : p.mu.nextProbeIndex = 0
144 1 : p.mu.Unlock()
145 1 : } else {
146 1 : ticker.reset(p.interval)
147 1 : }
148 : }
149 1 : if p.iterationForTesting != nil {
150 0 : p.iterationForTesting <- struct{}{}
151 0 : }
152 : }
153 : }
154 :
155 1 : func (p *dirProber) enableProbing() {
156 1 : p.enabled <- true
157 1 : }
158 :
159 1 : func (p *dirProber) disableProbing() {
160 1 : p.enabled <- false
161 1 : }
162 :
163 1 : func (p *dirProber) stop() {
164 1 : close(p.enabled)
165 1 : }
166 :
167 1 : func (p *dirProber) getMeanMax(interval time.Duration) (time.Duration, time.Duration) {
168 1 : p.mu.Lock()
169 1 : defer p.mu.Unlock()
170 1 : numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex
171 1 : samplesNeeded := int((interval + p.interval - 1) / p.interval)
172 1 : if samplesNeeded == 0 {
173 0 : panic("interval is too short")
174 1 : } else if samplesNeeded > probeHistoryLength {
175 0 : panic("interval is too long")
176 : }
177 1 : if samplesNeeded > numSamples {
178 1 : // Not enough samples, so assume not yet healthy.
179 1 : return failedProbeDuration, failedProbeDuration
180 1 : }
181 1 : offset := numSamples - samplesNeeded
182 1 : var sum, max time.Duration
183 1 : for i := p.mu.firstProbeIndex + offset; i < p.mu.nextProbeIndex; i++ {
184 1 : sampleDur := p.mu.history[i%probeHistoryLength]
185 1 : sum += sampleDur
186 1 : if max < sampleDur {
187 1 : max = sampleDur
188 1 : }
189 : }
190 1 : mean := sum / time.Duration(samplesNeeded)
191 1 : return mean, max
192 : }
193 :
194 : type dirIndex int
195 :
196 : const (
197 : primaryDirIndex dirIndex = iota
198 : secondaryDirIndex
199 : numDirIndices
200 : )
201 :
202 : type dirAndFileHandle struct {
203 : Dir
204 : vfs.File
205 : }
206 :
207 : // switchableWriter is a subset of failoverWriter needed by failoverMonitor.
208 : type switchableWriter interface {
209 : switchToNewDir(dirAndFileHandle) error
210 : ongoingLatencyOrErrorForCurDir() (time.Duration, error)
211 : }
212 :
213 : type failoverMonitorOptions struct {
214 : // The primary and secondary dir.
215 : dirs [numDirIndices]dirAndFileHandle
216 :
217 : FailoverOptions
218 : stopper *stopper
219 : }
220 :
221 : // failoverMonitor monitors the latency and error observed by the
222 : // switchableWriter, and does failover by switching the dir. It also monitors
223 : // the primary dir for failback.
224 : type failoverMonitor struct {
225 : opts failoverMonitorOptions
226 : prober dirProber
227 : mu struct {
228 : sync.Mutex
229 : // dirIndex and lastFailbackTime are only modified by monitorLoop. They
230 : // are protected by the mutex for concurrent reads.
231 :
232 : // dirIndex is the directory to use by writer (if non-nil), or when the
233 : // writer becomes non-nil.
234 : dirIndex
235 : // The time at which the monitor last failed back to the primary. This is
236 : // only relevant when dirIndex == primaryDirIndex.
237 : lastFailBackTime time.Time
238 : // The current failoverWriter, exposed via a narrower interface.
239 : writer switchableWriter
240 :
241 : // Stats.
242 : dirSwitchCount int64
243 : lastAccumulateIntoDurations time.Time
244 : primaryWriteDuration time.Duration
245 : secondaryWriteDuration time.Duration
246 : }
247 : }
248 :
249 1 : func newFailoverMonitor(opts failoverMonitorOptions) *failoverMonitor {
250 1 : m := &failoverMonitor{
251 1 : opts: opts,
252 1 : }
253 1 : m.mu.lastAccumulateIntoDurations = opts.timeSource.now()
254 1 : m.prober.init(opts.dirs[primaryDirIndex].FS,
255 1 : opts.dirs[primaryDirIndex].FS.PathJoin(opts.dirs[primaryDirIndex].Dirname, "probe-file"),
256 1 : opts.PrimaryDirProbeInterval, opts.stopper, opts.timeSource, opts.proberIterationForTesting)
257 1 : opts.stopper.runAsync(func() {
258 1 : m.monitorLoop(opts.stopper.shouldQuiesce())
259 1 : })
260 1 : return m
261 : }
262 :
263 : // Called when previous writer is closed
264 1 : func (m *failoverMonitor) noWriter() {
265 1 : now := m.opts.timeSource.now()
266 1 : m.mu.Lock()
267 1 : defer m.mu.Unlock()
268 1 : m.mu.writer = nil
269 1 : m.accumulateDurationLocked(now)
270 1 : }
271 :
272 : // writerCreateFunc is allowed to return nil, if there is an error. It is not
273 : // the responsibility of failoverMonitor to handle that error. So this should
274 : // not be due to an IO error (which failoverMonitor is interested in).
275 1 : func (m *failoverMonitor) newWriter(writerCreateFunc func(dir dirAndFileHandle) switchableWriter) {
276 1 : m.mu.Lock()
277 1 : defer m.mu.Unlock()
278 1 : if m.mu.writer != nil {
279 0 : panic("previous writer not closed")
280 : }
281 1 : m.mu.writer = writerCreateFunc(m.opts.dirs[m.mu.dirIndex])
282 : }
283 :
284 1 : func (m *failoverMonitor) elevateWriteStallThresholdForFailover() bool {
285 1 : m.mu.Lock()
286 1 : defer m.mu.Unlock()
287 1 : if m.mu.dirIndex == secondaryDirIndex {
288 1 : return true
289 1 : }
290 1 : intervalSinceFailedback := m.opts.timeSource.now().Sub(m.mu.lastFailBackTime)
291 1 : return intervalSinceFailedback < m.opts.ElevatedWriteStallThresholdLag
292 : }
293 :
294 1 : func (m *failoverMonitor) accumulateDurationLocked(now time.Time) {
295 1 : dur := now.Sub(m.mu.lastAccumulateIntoDurations)
296 1 : m.mu.lastAccumulateIntoDurations = now
297 1 : if m.mu.dirIndex == primaryDirIndex {
298 1 : m.mu.primaryWriteDuration += dur
299 1 : return
300 1 : }
301 1 : m.mu.secondaryWriteDuration += dur
302 : }
303 :
304 1 : func (m *failoverMonitor) stats() FailoverStats {
305 1 : now := m.opts.timeSource.now()
306 1 : m.mu.Lock()
307 1 : defer m.mu.Unlock()
308 1 : m.accumulateDurationLocked(now)
309 1 : return FailoverStats{
310 1 : DirSwitchCount: m.mu.dirSwitchCount,
311 1 : PrimaryWriteDuration: m.mu.primaryWriteDuration,
312 1 : SecondaryWriteDuration: m.mu.secondaryWriteDuration,
313 1 : }
314 1 : }
315 :
316 : // lastWriterInfo is state maintained in the monitorLoop for the latest
317 : // switchable writer. It is mainly used to dampen the switching.
318 : type lastWriterInfo struct {
319 : writer switchableWriter
320 : numSwitches int
321 : ongoingLatencyAtSwitch time.Duration
322 : errorCounts [numDirIndices]int
323 : }
324 :
325 1 : func (m *failoverMonitor) monitorLoop(shouldQuiesce <-chan struct{}) {
326 1 : ticker := m.opts.timeSource.newTicker(m.opts.UnhealthySamplingInterval)
327 1 : if m.opts.monitorIterationForTesting != nil {
328 0 : m.opts.monitorIterationForTesting <- struct{}{}
329 0 : }
330 1 : tickerCh := ticker.ch()
331 1 : dirIndex := primaryDirIndex
332 1 : var lastWriter lastWriterInfo
333 1 : for {
334 1 : select {
335 1 : case <-shouldQuiesce:
336 1 : ticker.stop()
337 1 : m.prober.stop()
338 1 : return
339 1 : case <-tickerCh:
340 1 : writerOngoingLatency, writerErr := func() (time.Duration, error) {
341 1 : m.mu.Lock()
342 1 : defer m.mu.Unlock()
343 1 : if m.mu.writer != lastWriter.writer {
344 1 : lastWriter = lastWriterInfo{writer: m.mu.writer}
345 1 : }
346 1 : if lastWriter.writer == nil {
347 1 : return 0, nil
348 1 : }
349 1 : return lastWriter.writer.ongoingLatencyOrErrorForCurDir()
350 : }()
351 1 : switchDir := false
352 1 : // Arbitrary value.
353 1 : const highSecondaryErrorCountThreshold = 2
354 1 : // We don't consider a switch if currently using the primary dir and the
355 1 : // secondary dir has high enough errors. It is more likely that someone
356 1 : // has misconfigured a secondary e.g. wrong permissions or not enough
357 1 : // disk space. We only remember the error history in the context of the
358 1 : // lastWriter since an operator can fix the underlying misconfiguration.
359 1 : unhealthyThreshold, failoverEnabled := m.opts.UnhealthyOperationLatencyThreshold()
360 1 :
361 1 : if !(lastWriter.errorCounts[secondaryDirIndex] >= highSecondaryErrorCountThreshold &&
362 1 : dirIndex == primaryDirIndex) && failoverEnabled {
363 1 : // Switching heuristics. Subject to change based on real world experience.
364 1 : if writerErr != nil {
365 0 : // An error causes an immediate switch, since a LogWriter with an
366 0 : // error is useless.
367 0 : lastWriter.errorCounts[dirIndex]++
368 0 : switchDir = true
369 1 : } else if writerOngoingLatency > unhealthyThreshold {
370 1 : // Arbitrary value.
371 1 : const switchImmediatelyCountThreshold = 2
372 1 : // High latency. Switch immediately if the number of switches that
373 1 : // have been done is below the threshold, since that gives us an
374 1 : // observation of both dirs. Once above that threshold, decay the
375 1 : // switch rate by increasing the observed latency needed for a
376 1 : // switch.
377 1 : if lastWriter.numSwitches < switchImmediatelyCountThreshold ||
378 1 : writerOngoingLatency > 2*lastWriter.ongoingLatencyAtSwitch {
379 1 : switchDir = true
380 1 : lastWriter.ongoingLatencyAtSwitch = writerOngoingLatency
381 1 : }
382 : // Else high latency, but not high enough yet to motivate switch.
383 1 : } else if dirIndex == secondaryDirIndex {
384 1 : // The writer looks healthy. We can still switch if the writer is using the
385 1 : // secondary dir and the primary is healthy again.
386 1 : primaryMean, primaryMax := m.prober.getMeanMax(m.opts.HealthyInterval)
387 1 : if primaryMean < m.opts.HealthyProbeLatencyThreshold &&
388 1 : primaryMax < m.opts.HealthyProbeLatencyThreshold {
389 1 : switchDir = true
390 1 : }
391 : }
392 : }
393 1 : if switchDir {
394 1 : lastWriter.numSwitches++
395 1 : if dirIndex == secondaryDirIndex {
396 1 : // Switching back to primary, so don't need to probe to see if
397 1 : // primary is healthy.
398 1 : m.prober.disableProbing()
399 1 : dirIndex = primaryDirIndex
400 1 : } else {
401 1 : m.prober.enableProbing()
402 1 : dirIndex = secondaryDirIndex
403 1 : }
404 1 : dir := m.opts.dirs[dirIndex]
405 1 : now := m.opts.timeSource.now()
406 1 : m.mu.Lock()
407 1 : m.accumulateDurationLocked(now)
408 1 : m.mu.dirIndex = dirIndex
409 1 : m.mu.dirSwitchCount++
410 1 : if dirIndex == primaryDirIndex {
411 1 : m.mu.lastFailBackTime = now
412 1 : }
413 1 : if m.mu.writer != nil {
414 1 : m.mu.writer.switchToNewDir(dir)
415 1 : }
416 1 : m.mu.Unlock()
417 : }
418 : }
419 1 : if m.opts.monitorStateForTesting != nil {
420 0 : m.opts.monitorStateForTesting(lastWriter.numSwitches, lastWriter.ongoingLatencyAtSwitch)
421 0 : }
422 1 : if m.opts.monitorIterationForTesting != nil {
423 0 : m.opts.monitorIterationForTesting <- struct{}{}
424 0 : }
425 : }
426 : }
427 :
428 : type logicalLogWithSizesEtc struct {
429 : num NumWAL
430 : segments []segmentWithSizeEtc
431 : }
432 :
433 : type segmentWithSizeEtc struct {
434 : segment
435 : approxFileSize uint64
436 : synchronouslyClosed bool
437 : }
438 :
439 : type failoverManager struct {
440 : opts Options
441 : // initialObsolete holds the set of DeletableLogs that formed the logs
442 : // passed into Init. The initialObsolete logs are all obsolete. Once
443 : // returned via Manager.Obsolete, initialObsolete is cleared. The
444 : // initialObsolete logs are stored separately from mu.queue because they may
445 : // include logs that were NOT created by the standalone manager, and
446 : // multiple physical log files may form one logical WAL.
447 : initialObsolete []DeletableLog
448 :
449 : // TODO(jackson/sumeer): read-path etc.
450 :
451 : dirHandles [numDirIndices]vfs.File
452 : stopper *stopper
453 : monitor *failoverMonitor
454 : mu struct {
455 : sync.Mutex
456 : closedWALs []logicalLogWithSizesEtc
457 : ww *failoverWriter
458 : }
459 : recycler LogRecycler
460 : // Due to async creation of files in failoverWriter, multiple goroutines can
461 : // concurrently try to get a file from the recycler. This mutex protects the
462 : // logRecycler.{Peek,Pop} pair.
463 : recyclerPeekPopMu sync.Mutex
464 : }
465 :
466 : var _ Manager = &failoverManager{}
467 :
468 : // TODO(sumeer):
469 : // - log deletion: if record.LogWriter did not close yet, the cleaner may
470 : // get an error when deleting or renaming (only under windows?).
471 :
472 : // init implements Manager.
473 1 : func (wm *failoverManager) init(o Options, initial Logs) error {
474 1 : if o.timeSource == nil {
475 1 : o.timeSource = defaultTime{}
476 1 : }
477 1 : o.FailoverOptions.EnsureDefaults()
478 1 :
479 1 : // Synchronously ensure that we're able to write to the secondary before we
480 1 : // proceed. An operator doesn't want to encounter an issue writing to the
481 1 : // secondary the first time there's a need to failover. We write a bit of
482 1 : // metadata to a file in the secondary's directory.
483 1 : f, err := o.Secondary.FS.Create(o.Secondary.FS.PathJoin(o.Secondary.Dirname, "failover_source"), "pebble-wal")
484 1 : if err != nil {
485 0 : return errors.Newf("failed to write to WAL secondary dir: %v", err)
486 0 : }
487 1 : if _, err := io.WriteString(f, fmt.Sprintf("primary: %s\nprocess start: %s\n",
488 1 : o.Primary.Dirname,
489 1 : time.Now(),
490 1 : )); err != nil {
491 0 : return errors.Newf("failed to write metadata to WAL secondary dir: %v", err)
492 0 : }
493 1 : if err := errors.CombineErrors(f.Sync(), f.Close()); err != nil {
494 0 : return err
495 0 : }
496 :
497 1 : stopper := newStopper()
498 1 : var dirs [numDirIndices]dirAndFileHandle
499 1 : for i, dir := range []Dir{o.Primary, o.Secondary} {
500 1 : dirs[i].Dir = dir
501 1 : f, err := dir.FS.OpenDir(dir.Dirname)
502 1 : if err != nil {
503 0 : return err
504 0 : }
505 1 : dirs[i].File = f
506 : }
507 1 : fmOpts := failoverMonitorOptions{
508 1 : dirs: dirs,
509 1 : FailoverOptions: o.FailoverOptions,
510 1 : stopper: stopper,
511 1 : }
512 1 : monitor := newFailoverMonitor(fmOpts)
513 1 : *wm = failoverManager{
514 1 : opts: o,
515 1 : dirHandles: [numDirIndices]vfs.File{dirs[primaryDirIndex].File, dirs[secondaryDirIndex].File},
516 1 : stopper: stopper,
517 1 : monitor: monitor,
518 1 : }
519 1 : wm.recycler.Init(o.MaxNumRecyclableLogs)
520 1 : for _, ll := range initial {
521 1 : if wm.recycler.MinRecycleLogNum() <= ll.Num {
522 1 : wm.recycler.SetMinRecycleLogNum(ll.Num + 1)
523 1 : }
524 1 : var err error
525 1 : wm.initialObsolete, err = appendDeletableLogs(wm.initialObsolete, ll)
526 1 : if err != nil {
527 0 : return err
528 0 : }
529 : }
530 1 : return nil
531 : }
532 :
533 : // List implements Manager.
534 1 : func (wm *failoverManager) List() (Logs, error) {
535 1 : wm.mu.Lock()
536 1 : defer wm.mu.Unlock()
537 1 : n := len(wm.mu.closedWALs)
538 1 : if wm.mu.ww != nil {
539 1 : n++
540 1 : }
541 1 : wals := make(Logs, n)
542 1 : setLogicalLog := func(index int, llse logicalLogWithSizesEtc) {
543 1 : segments := make([]segment, len(llse.segments))
544 1 : for j := range llse.segments {
545 1 : segments[j] = llse.segments[j].segment
546 1 : }
547 1 : wals[index] = LogicalLog{
548 1 : Num: llse.num,
549 1 : segments: segments,
550 1 : }
551 : }
552 1 : for i, llse := range wm.mu.closedWALs {
553 1 : setLogicalLog(i, llse)
554 1 : }
555 1 : if wm.mu.ww != nil {
556 1 : setLogicalLog(n-1, wm.mu.ww.getLog())
557 1 : }
558 1 : return wals, nil
559 : }
560 :
561 : // Obsolete implements Manager.
562 : func (wm *failoverManager) Obsolete(
563 : minUnflushedNum NumWAL, noRecycle bool,
564 1 : ) (toDelete []DeletableLog, err error) {
565 1 : wm.mu.Lock()
566 1 : defer wm.mu.Unlock()
567 1 :
568 1 : // If this is the first call to Obsolete after Open, we may have deletable
569 1 : // logs outside the queue.
570 1 : toDelete, wm.initialObsolete = wm.initialObsolete, nil
571 1 :
572 1 : i := 0
573 1 : for ; i < len(wm.mu.closedWALs); i++ {
574 1 : ll := wm.mu.closedWALs[i]
575 1 : if ll.num >= minUnflushedNum {
576 1 : break
577 : }
578 : // Recycle only the primary at logNameIndex=0, if there was no failover,
579 : // and synchronously closed. It may not be safe to recycle a file that is
580 : // still being written to. And recycling when there was a failover may
581 : // fill up the recycler with smaller log files. The restriction regarding
582 : // logNameIndex=0 is because logRecycler.Peek only exposes the
583 : // DiskFileNum, and we need to use that to construct the path -- we could
584 : // remove this restriction by changing the logRecycler interface, but we
585 : // don't bother.
586 1 : canRecycle := !noRecycle && len(ll.segments) == 1 && ll.segments[0].synchronouslyClosed &&
587 1 : ll.segments[0].logNameIndex == 0 &&
588 1 : ll.segments[0].dir == wm.opts.Primary
589 1 : if !canRecycle || !wm.recycler.Add(base.FileInfo{
590 1 : FileNum: base.DiskFileNum(ll.num),
591 1 : FileSize: ll.segments[0].approxFileSize,
592 1 : }) {
593 1 : for _, s := range ll.segments {
594 1 : toDelete = append(toDelete, DeletableLog{
595 1 : FS: s.dir.FS,
596 1 : Path: s.dir.FS.PathJoin(s.dir.Dirname, makeLogFilename(ll.num, s.logNameIndex)),
597 1 : NumWAL: ll.num,
598 1 : ApproxFileSize: s.approxFileSize,
599 1 : })
600 1 : }
601 : }
602 : }
603 1 : wm.mu.closedWALs = wm.mu.closedWALs[i:]
604 1 : return toDelete, nil
605 : }
606 :
607 : // Create implements Manager.
608 1 : func (wm *failoverManager) Create(wn NumWAL, jobID int) (Writer, error) {
609 1 : func() {
610 1 : wm.mu.Lock()
611 1 : defer wm.mu.Unlock()
612 1 : if wm.mu.ww != nil {
613 0 : panic("previous wal.Writer not closed")
614 : }
615 : }()
616 1 : fwOpts := failoverWriterOpts{
617 1 : wn: wn,
618 1 : logger: wm.opts.Logger,
619 1 : timeSource: wm.opts.timeSource,
620 1 : jobID: jobID,
621 1 : logCreator: wm.logCreator,
622 1 : noSyncOnClose: wm.opts.NoSyncOnClose,
623 1 : bytesPerSync: wm.opts.BytesPerSync,
624 1 : preallocateSize: wm.opts.PreallocateSize,
625 1 : minSyncInterval: wm.opts.MinSyncInterval,
626 1 : fsyncLatency: wm.opts.FsyncLatency,
627 1 : queueSemChan: wm.opts.QueueSemChan,
628 1 : stopper: wm.stopper,
629 1 : failoverWriteAndSyncLatency: wm.opts.FailoverWriteAndSyncLatency,
630 1 : writerClosed: wm.writerClosed,
631 1 : writerCreatedForTest: wm.opts.logWriterCreatedForTesting,
632 1 : }
633 1 : var err error
634 1 : var ww *failoverWriter
635 1 : writerCreateFunc := func(dir dirAndFileHandle) switchableWriter {
636 1 : ww, err = newFailoverWriter(fwOpts, dir)
637 1 : if err != nil {
638 0 : return nil
639 0 : }
640 1 : return ww
641 : }
642 1 : wm.monitor.newWriter(writerCreateFunc)
643 1 : if ww != nil {
644 1 : wm.mu.Lock()
645 1 : defer wm.mu.Unlock()
646 1 : wm.mu.ww = ww
647 1 : }
648 1 : return ww, err
649 : }
650 :
651 : // ElevateWriteStallThresholdForFailover implements Manager.
652 1 : func (wm *failoverManager) ElevateWriteStallThresholdForFailover() bool {
653 1 : return wm.monitor.elevateWriteStallThresholdForFailover()
654 1 : }
655 :
656 1 : func (wm *failoverManager) writerClosed(llse logicalLogWithSizesEtc) {
657 1 : wm.monitor.noWriter()
658 1 : wm.mu.Lock()
659 1 : defer wm.mu.Unlock()
660 1 : wm.mu.closedWALs = append(wm.mu.closedWALs, llse)
661 1 : wm.mu.ww = nil
662 1 : }
663 :
664 : // Stats implements Manager.
665 1 : func (wm *failoverManager) Stats() Stats {
666 1 : obsoleteLogsCount, obsoleteLogSize := wm.recycler.Stats()
667 1 : failoverStats := wm.monitor.stats()
668 1 : failoverStats.FailoverWriteAndSyncLatency = wm.opts.FailoverWriteAndSyncLatency
669 1 : wm.mu.Lock()
670 1 : defer wm.mu.Unlock()
671 1 : var liveFileCount int
672 1 : var liveFileSize uint64
673 1 : updateStats := func(segments []segmentWithSizeEtc) {
674 1 : for _, s := range segments {
675 1 : liveFileCount++
676 1 : liveFileSize += s.approxFileSize
677 1 : }
678 : }
679 1 : for _, llse := range wm.mu.closedWALs {
680 1 : updateStats(llse.segments)
681 1 : }
682 1 : if wm.mu.ww != nil {
683 0 : updateStats(wm.mu.ww.getLog().segments)
684 0 : }
685 1 : for i := range wm.initialObsolete {
686 0 : if i == 0 || wm.initialObsolete[i].NumWAL != wm.initialObsolete[i-1].NumWAL {
687 0 : obsoleteLogsCount++
688 0 : }
689 0 : obsoleteLogSize += wm.initialObsolete[i].ApproxFileSize
690 : }
691 1 : return Stats{
692 1 : ObsoleteFileCount: obsoleteLogsCount,
693 1 : ObsoleteFileSize: obsoleteLogSize,
694 1 : LiveFileCount: liveFileCount,
695 1 : LiveFileSize: liveFileSize,
696 1 : Failover: failoverStats,
697 1 : }
698 : }
699 :
700 : // Close implements Manager.
701 1 : func (wm *failoverManager) Close() error {
702 1 : wm.stopper.stop()
703 1 : // Since all goroutines are stopped, can close the dirs.
704 1 : var err error
705 1 : for _, f := range wm.dirHandles {
706 1 : err = firstError(err, f.Close())
707 1 : }
708 1 : return err
709 : }
710 :
711 : // RecyclerForTesting implements Manager.
712 0 : func (wm *failoverManager) RecyclerForTesting() *LogRecycler {
713 0 : return nil
714 0 : }
715 :
716 : // logCreator implements the logCreator func type.
717 : func (wm *failoverManager) logCreator(
718 : dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
719 1 : ) (logFile vfs.File, initialFileSize uint64, err error) {
720 1 : logFilename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li))
721 1 : isPrimary := dir == wm.opts.Primary
722 1 : // Only recycling when logNameIndex is 0 is a somewhat arbitrary choice.
723 1 : considerRecycle := li == 0 && isPrimary
724 1 : createInfo := CreateInfo{
725 1 : JobID: jobID,
726 1 : Path: logFilename,
727 1 : IsSecondary: !isPrimary,
728 1 : Num: wn,
729 1 : Err: nil,
730 1 : }
731 1 : defer func() {
732 1 : createInfo.Err = err
733 1 : if wm.opts.EventListener != nil {
734 1 : wm.opts.EventListener.LogCreated(createInfo)
735 1 : }
736 : }()
737 1 : if considerRecycle {
738 1 : // Try to use a recycled log file. Recycling log files is an important
739 1 : // performance optimization as it is faster to sync a file that has
740 1 : // already been written, than one which is being written for the first
741 1 : // time. This is due to the need to sync file metadata when a file is
742 1 : // being written for the first time. Note this is true even if file
743 1 : // preallocation is performed (e.g. fallocate).
744 1 : var recycleLog base.FileInfo
745 1 : var recycleOK bool
746 1 : func() {
747 1 : wm.recyclerPeekPopMu.Lock()
748 1 : defer wm.recyclerPeekPopMu.Unlock()
749 1 : recycleLog, recycleOK = wm.recycler.Peek()
750 1 : if recycleOK {
751 0 : if err = wm.recycler.Pop(recycleLog.FileNum); err != nil {
752 0 : panic(err)
753 : }
754 : }
755 : }()
756 1 : if recycleOK {
757 0 : createInfo.RecycledFileNum = recycleLog.FileNum
758 0 : recycleLogName := dir.FS.PathJoin(dir.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0))
759 0 : r.writeStart()
760 0 : logFile, err = dir.FS.ReuseForWrite(recycleLogName, logFilename, "pebble-wal")
761 0 : r.writeEnd(err)
762 0 : // TODO(sumeer): should we fatal since primary dir? At some point it is
763 0 : // better to fatal instead of continuing to failover.
764 0 : // base.MustExist(dir.FS, logFilename, wm.opts.Logger, err)
765 0 : if err != nil {
766 0 : // TODO(sumeer): we have popped from the logRecycler, which is
767 0 : // arguably correct, since we don't want to keep trying to reuse a log
768 0 : // that causes some error. But the original or new file may exist, and
769 0 : // no one will clean it up unless the process restarts.
770 0 : return nil, 0, err
771 0 : }
772 : // Figure out the recycled WAL size. This Stat is necessary because
773 : // ReuseForWrite's contract allows for removing the old file and
774 : // creating a new one. We don't know whether the WAL was actually
775 : // recycled.
776 : //
777 : // TODO(jackson): Adding a boolean to the ReuseForWrite return value
778 : // indicating whether or not the file was actually reused would allow us
779 : // to skip the stat and use recycleLog.FileSize.
780 0 : var finfo os.FileInfo
781 0 : finfo, err = logFile.Stat()
782 0 : if err != nil {
783 0 : logFile.Close()
784 0 : return nil, 0, err
785 0 : }
786 0 : initialFileSize = uint64(finfo.Size())
787 0 : return logFile, initialFileSize, nil
788 : }
789 : }
790 : // Did not recycle.
791 : //
792 : // Create file.
793 1 : r.writeStart()
794 1 : logFile, err = dir.FS.Create(logFilename, "pebble-wal")
795 1 : r.writeEnd(err)
796 1 : return logFile, 0, err
797 : }
798 :
799 : type stopper struct {
800 : quiescer chan struct{} // Closed when quiescing
801 : wg sync.WaitGroup
802 : }
803 :
804 1 : func newStopper() *stopper {
805 1 : return &stopper{
806 1 : quiescer: make(chan struct{}),
807 1 : }
808 1 : }
809 :
810 1 : func (s *stopper) runAsync(f func()) {
811 1 : s.wg.Add(1)
812 1 : go func() {
813 1 : f()
814 1 : s.wg.Done()
815 1 : }()
816 : }
817 :
818 : // shouldQuiesce returns a channel which will be closed when stop() has been
819 : // invoked and outstanding goroutines should begin to quiesce.
820 1 : func (s *stopper) shouldQuiesce() <-chan struct{} {
821 1 : return s.quiescer
822 1 : }
823 :
824 1 : func (s *stopper) stop() {
825 1 : close(s.quiescer)
826 1 : s.wg.Wait()
827 1 : }
828 :
829 : // timeSource and tickerI are extracted from CockroachDB's timeutil, with
830 : // removal of support for Timer, and added support in the manual
831 : // implementation for reset.
832 :
833 : // timeSource is used to interact with the clock and tickers. Abstracts
834 : // time.Now and time.NewTicker for testing.
835 : type timeSource interface {
836 : now() time.Time
837 : newTicker(duration time.Duration) tickerI
838 : }
839 :
840 : // tickerI is an interface wrapping time.Ticker.
841 : type tickerI interface {
842 : reset(duration time.Duration)
843 : stop()
844 : ch() <-chan time.Time
845 : }
846 :
847 : // defaultTime is a timeSource using the time package.
848 : type defaultTime struct{}
849 :
850 : var _ timeSource = defaultTime{}
851 :
852 1 : func (defaultTime) now() time.Time {
853 1 : return time.Now()
854 1 : }
855 :
856 1 : func (defaultTime) newTicker(duration time.Duration) tickerI {
857 1 : return (*defaultTicker)(time.NewTicker(duration))
858 1 : }
859 :
860 : // defaultTicker uses time.Ticker.
861 : type defaultTicker time.Ticker
862 :
863 : var _ tickerI = &defaultTicker{}
864 :
865 1 : func (t *defaultTicker) reset(duration time.Duration) {
866 1 : (*time.Ticker)(t).Reset(duration)
867 1 : }
868 :
869 1 : func (t *defaultTicker) stop() {
870 1 : (*time.Ticker)(t).Stop()
871 1 : }
872 :
873 1 : func (t *defaultTicker) ch() <-chan time.Time {
874 1 : return (*time.Ticker)(t).C
875 1 : }
876 :
877 : // Make lint happy.
878 : var _ = (*failoverMonitor).noWriter
879 : var _ = (*failoverManager).writerClosed
880 : var _ = (&stopper{}).shouldQuiesce
|