Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "math/rand/v2"
11 : "os"
12 : "sync"
13 : "time"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/invariants"
18 : "github.com/cockroachdb/pebble/vfs"
19 : )
20 :
21 : // dirProber probes the primary dir, until it is confirmed to be healthy. If
22 : // it doesn't have enough samples, it is deemed to be unhealthy. It is only
23 : // used for failback to the primary.
24 : type dirProber struct {
25 : fs vfs.FS
26 : // The full path of the file to use for the probe. The probe is destructive
27 : // in that it deletes and (re)creates the file.
28 : filename string
29 : // The probing interval, when enabled.
30 : interval time.Duration
31 : timeSource
32 : // buf holds the random bytes that are written during the probe.
33 : buf [100 << 10]byte
34 : // enabled is signaled to enable and disable probing. The initial state of
35 : // the prober is disabled.
36 : enabled chan bool
37 : mu struct {
38 : sync.Mutex
39 : // Circular buffer of history samples.
40 : history [probeHistoryLength]time.Duration
41 : // The history is in [firstProbeIndex, nextProbeIndex).
42 : firstProbeIndex int
43 : nextProbeIndex int
44 : }
45 : iterationForTesting chan<- struct{}
46 : }
47 :
48 : const probeHistoryLength = 128
49 :
50 : // Large value.
51 : const failedProbeDuration = 24 * 60 * 60 * time.Second
52 :
53 : // init takes a stopper in order to connect the dirProber's long-running
54 : // goroutines with the stopper's wait group, but the dirProber has its own
55 : // stop() method that should be invoked to trigger the shutdown.
56 : func (p *dirProber) init(
57 : fs vfs.FS,
58 : filename string,
59 : interval time.Duration,
60 : stopper *stopper,
61 : ts timeSource,
62 : notifyIterationForTesting chan<- struct{},
63 1 : ) {
64 1 : *p = dirProber{
65 1 : fs: fs,
66 1 : filename: filename,
67 1 : interval: interval,
68 1 : timeSource: ts,
69 1 : enabled: make(chan bool),
70 1 : iterationForTesting: notifyIterationForTesting,
71 1 : }
72 1 : // Random bytes for writing, to defeat any FS compression optimization.
73 1 : for i := range p.buf {
74 1 : p.buf[i] = byte(rand.Uint32())
75 1 : }
76 : // dirProber has an explicit stop() method instead of listening on
77 : // stopper.shouldQuiesce. This structure helps negotiate the shutdown
78 : // between failoverMonitor and dirProber. If the dirProber independently
79 : // listened to shouldQuiesce, it might exit before the failover monitor. The
80 : // [enable|disable]Probing methods require sending on a channel expecting
81 : // that the dirProber goroutine will receive on the same channel. If the
82 : // dirProber noticed the queiscence and exited first, the failoverMonitor
83 : // could deadlock waiting for a goroutine that no longer exists.
84 : //
85 : // Instead, shutdown of the dirProber is coordinated by the failoverMonitor:
86 : // - the failoverMonitor listens to stopper.shouldQuiesce.
87 : // - when the failoverMonitor is exiting, it will no longer attempt to
88 : // interact with the dirProber. Only then does it invoke dirProber.stop.
89 1 : stopper.runAsync(p.probeLoop)
90 : }
91 :
92 1 : func (p *dirProber) probeLoop() {
93 1 : ticker := p.timeSource.newTicker(p.interval)
94 1 : ticker.stop()
95 1 : tickerCh := ticker.ch()
96 1 : shouldContinue := true
97 1 : var enabled bool
98 1 : for shouldContinue {
99 1 : select {
100 1 : case <-tickerCh:
101 1 : if !enabled {
102 1 : // Could have a tick waiting before we disabled it. Ignore.
103 1 : continue
104 : }
105 1 : probeDur := func() time.Duration {
106 1 : // Delete, create, write, sync.
107 1 : start := p.timeSource.now()
108 1 : _ = p.fs.Remove(p.filename)
109 1 : f, err := p.fs.Create(p.filename, "pebble-wal")
110 1 : if err != nil {
111 1 : return failedProbeDuration
112 1 : }
113 1 : defer f.Close()
114 1 : n, err := f.Write(p.buf[:])
115 1 : if err != nil {
116 0 : return failedProbeDuration
117 0 : }
118 1 : if n != len(p.buf) {
119 0 : panic("invariant violation")
120 : }
121 1 : err = f.Sync()
122 1 : if err != nil {
123 0 : return failedProbeDuration
124 0 : }
125 1 : return p.timeSource.now().Sub(start)
126 : }()
127 1 : p.mu.Lock()
128 1 : nextIndex := p.mu.nextProbeIndex % probeHistoryLength
129 1 : p.mu.history[nextIndex] = probeDur
130 1 : p.mu.nextProbeIndex++
131 1 : numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex
132 1 : if numSamples > probeHistoryLength {
133 1 : // Length has exceeded capacity, i.e., overwritten the first probe.
134 1 : p.mu.firstProbeIndex++
135 1 : }
136 1 : p.mu.Unlock()
137 :
138 1 : case enabled, shouldContinue = <-p.enabled:
139 1 : if !enabled || !shouldContinue {
140 1 : ticker.stop()
141 1 : p.mu.Lock()
142 1 : p.mu.firstProbeIndex = 0
143 1 : p.mu.nextProbeIndex = 0
144 1 : p.mu.Unlock()
145 1 : } else {
146 1 : ticker.reset(p.interval)
147 1 : }
148 : }
149 1 : if p.iterationForTesting != nil {
150 1 : p.iterationForTesting <- struct{}{}
151 1 : }
152 : }
153 : }
154 :
155 1 : func (p *dirProber) enableProbing() {
156 1 : p.enabled <- true
157 1 : }
158 :
159 1 : func (p *dirProber) disableProbing() {
160 1 : p.enabled <- false
161 1 : }
162 :
163 1 : func (p *dirProber) stop() {
164 1 : close(p.enabled)
165 1 : }
166 :
167 1 : func (p *dirProber) getMeanMax(interval time.Duration) (time.Duration, time.Duration) {
168 1 : p.mu.Lock()
169 1 : defer p.mu.Unlock()
170 1 : numSamples := p.mu.nextProbeIndex - p.mu.firstProbeIndex
171 1 : samplesNeeded := int((interval + p.interval - 1) / p.interval)
172 1 : if samplesNeeded == 0 {
173 0 : panic("interval is too short")
174 1 : } else if samplesNeeded > probeHistoryLength {
175 0 : panic("interval is too long")
176 : }
177 1 : if samplesNeeded > numSamples {
178 1 : // Not enough samples, so assume not yet healthy.
179 1 : return failedProbeDuration, failedProbeDuration
180 1 : }
181 1 : offset := numSamples - samplesNeeded
182 1 : var sum, max time.Duration
183 1 : for i := p.mu.firstProbeIndex + offset; i < p.mu.nextProbeIndex; i++ {
184 1 : sampleDur := p.mu.history[i%probeHistoryLength]
185 1 : sum += sampleDur
186 1 : if max < sampleDur {
187 1 : max = sampleDur
188 1 : }
189 : }
190 1 : mean := sum / time.Duration(samplesNeeded)
191 1 : return mean, max
192 : }
193 :
194 : type dirIndex int
195 :
196 : const (
197 : primaryDirIndex dirIndex = iota
198 : secondaryDirIndex
199 : numDirIndices
200 : )
201 :
202 : type dirAndFileHandle struct {
203 : Dir
204 : vfs.File
205 : }
206 :
207 : // switchableWriter is a subset of failoverWriter needed by failoverMonitor.
208 : type switchableWriter interface {
209 : switchToNewDir(dirAndFileHandle) error
210 : ongoingLatencyOrErrorForCurDir() (time.Duration, error)
211 : }
212 :
213 : type failoverMonitorOptions struct {
214 : // The primary and secondary dir.
215 : dirs [numDirIndices]dirAndFileHandle
216 :
217 : FailoverOptions
218 : stopper *stopper
219 : }
220 :
221 : // failoverMonitor monitors the latency and error observed by the
222 : // switchableWriter, and does failover by switching the dir. It also monitors
223 : // the primary dir for failback.
224 : type failoverMonitor struct {
225 : opts failoverMonitorOptions
226 : prober dirProber
227 : mu struct {
228 : sync.Mutex
229 : // dirIndex and lastFailbackTime are only modified by monitorLoop. They
230 : // are protected by the mutex for concurrent reads.
231 :
232 : // dirIndex is the directory to use by writer (if non-nil), or when the
233 : // writer becomes non-nil.
234 : dirIndex
235 : // The time at which the monitor last failed back to the primary. This is
236 : // only relevant when dirIndex == primaryDirIndex.
237 : lastFailBackTime time.Time
238 : // The current failoverWriter, exposed via a narrower interface.
239 : writer switchableWriter
240 :
241 : // Stats.
242 : dirSwitchCount int64
243 : lastAccumulateIntoDurations time.Time
244 : primaryWriteDuration time.Duration
245 : secondaryWriteDuration time.Duration
246 : }
247 : }
248 :
249 1 : func newFailoverMonitor(opts failoverMonitorOptions) *failoverMonitor {
250 1 : m := &failoverMonitor{
251 1 : opts: opts,
252 1 : }
253 1 : m.mu.lastAccumulateIntoDurations = opts.timeSource.now()
254 1 : m.prober.init(opts.dirs[primaryDirIndex].FS,
255 1 : opts.dirs[primaryDirIndex].FS.PathJoin(opts.dirs[primaryDirIndex].Dirname, "probe-file"),
256 1 : opts.PrimaryDirProbeInterval, opts.stopper, opts.timeSource, opts.proberIterationForTesting)
257 1 : opts.stopper.runAsync(func() {
258 1 : m.monitorLoop(opts.stopper.shouldQuiesce())
259 1 : })
260 1 : return m
261 : }
262 :
263 : // Called when previous writer is closed
264 1 : func (m *failoverMonitor) noWriter() {
265 1 : m.mu.Lock()
266 1 : defer m.mu.Unlock()
267 1 : m.mu.writer = nil
268 1 : m.accumulateDurationLocked(m.opts.timeSource.now())
269 1 : }
270 :
271 : // writerCreateFunc is allowed to return nil, if there is an error. It is not
272 : // the responsibility of failoverMonitor to handle that error. So this should
273 : // not be due to an IO error (which failoverMonitor is interested in).
274 1 : func (m *failoverMonitor) newWriter(writerCreateFunc func(dir dirAndFileHandle) switchableWriter) {
275 1 : m.mu.Lock()
276 1 : defer m.mu.Unlock()
277 1 : if m.mu.writer != nil {
278 0 : panic("previous writer not closed")
279 : }
280 1 : m.mu.writer = writerCreateFunc(m.opts.dirs[m.mu.dirIndex])
281 : }
282 :
283 1 : func (m *failoverMonitor) elevateWriteStallThresholdForFailover() bool {
284 1 : m.mu.Lock()
285 1 : defer m.mu.Unlock()
286 1 : if m.mu.dirIndex == secondaryDirIndex {
287 1 : return true
288 1 : }
289 1 : intervalSinceFailedback := m.opts.timeSource.now().Sub(m.mu.lastFailBackTime)
290 1 : return intervalSinceFailedback < m.opts.ElevatedWriteStallThresholdLag
291 : }
292 :
293 1 : func (m *failoverMonitor) accumulateDurationLocked(now time.Time) {
294 1 : dur := now.Sub(m.mu.lastAccumulateIntoDurations)
295 1 : if invariants.Enabled && dur < 0 {
296 0 : panic(errors.AssertionFailedf("time regressed: last accumulated %s; now is %s",
297 0 : m.mu.lastAccumulateIntoDurations, now))
298 : }
299 1 : m.mu.lastAccumulateIntoDurations = now
300 1 : if m.mu.dirIndex == primaryDirIndex {
301 1 : m.mu.primaryWriteDuration += dur
302 1 : return
303 1 : }
304 1 : m.mu.secondaryWriteDuration += dur
305 : }
306 :
307 1 : func (m *failoverMonitor) stats() FailoverStats {
308 1 : m.mu.Lock()
309 1 : defer m.mu.Unlock()
310 1 : m.accumulateDurationLocked(m.opts.timeSource.now())
311 1 : return FailoverStats{
312 1 : DirSwitchCount: m.mu.dirSwitchCount,
313 1 : PrimaryWriteDuration: m.mu.primaryWriteDuration,
314 1 : SecondaryWriteDuration: m.mu.secondaryWriteDuration,
315 1 : }
316 1 : }
317 :
318 : // lastWriterInfo is state maintained in the monitorLoop for the latest
319 : // switchable writer. It is mainly used to dampen the switching.
320 : type lastWriterInfo struct {
321 : writer switchableWriter
322 : numSwitches int
323 : ongoingLatencyAtSwitch time.Duration
324 : errorCounts [numDirIndices]int
325 : }
326 :
327 1 : func (m *failoverMonitor) monitorLoop(shouldQuiesce <-chan struct{}) {
328 1 : ticker := m.opts.timeSource.newTicker(m.opts.UnhealthySamplingInterval)
329 1 : if m.opts.monitorIterationForTesting != nil {
330 1 : m.opts.monitorIterationForTesting <- struct{}{}
331 1 : }
332 1 : tickerCh := ticker.ch()
333 1 : dirIndex := primaryDirIndex
334 1 : var lastWriter lastWriterInfo
335 1 : for {
336 1 : select {
337 1 : case <-shouldQuiesce:
338 1 : ticker.stop()
339 1 : m.prober.stop()
340 1 : return
341 1 : case <-tickerCh:
342 1 : writerOngoingLatency, writerErr := func() (time.Duration, error) {
343 1 : m.mu.Lock()
344 1 : defer m.mu.Unlock()
345 1 : if m.mu.writer != lastWriter.writer {
346 1 : lastWriter = lastWriterInfo{writer: m.mu.writer}
347 1 : }
348 1 : if lastWriter.writer == nil {
349 1 : return 0, nil
350 1 : }
351 1 : return lastWriter.writer.ongoingLatencyOrErrorForCurDir()
352 : }()
353 1 : switchDir := false
354 1 : // Arbitrary value.
355 1 : const highSecondaryErrorCountThreshold = 2
356 1 : // We don't consider a switch if currently using the primary dir and the
357 1 : // secondary dir has high enough errors. It is more likely that someone
358 1 : // has misconfigured a secondary e.g. wrong permissions or not enough
359 1 : // disk space. We only remember the error history in the context of the
360 1 : // lastWriter since an operator can fix the underlying misconfiguration.
361 1 : unhealthyThreshold, failoverEnabled := m.opts.UnhealthyOperationLatencyThreshold()
362 1 :
363 1 : if !(lastWriter.errorCounts[secondaryDirIndex] >= highSecondaryErrorCountThreshold &&
364 1 : dirIndex == primaryDirIndex) && failoverEnabled {
365 1 : // Switching heuristics. Subject to change based on real world experience.
366 1 : if writerErr != nil {
367 1 : // An error causes an immediate switch, since a LogWriter with an
368 1 : // error is useless.
369 1 : lastWriter.errorCounts[dirIndex]++
370 1 : switchDir = true
371 1 : } else if writerOngoingLatency > unhealthyThreshold {
372 1 : // Arbitrary value.
373 1 : const switchImmediatelyCountThreshold = 2
374 1 : // High latency. Switch immediately if the number of switches that
375 1 : // have been done is below the threshold, since that gives us an
376 1 : // observation of both dirs. Once above that threshold, decay the
377 1 : // switch rate by increasing the observed latency needed for a
378 1 : // switch.
379 1 : if lastWriter.numSwitches < switchImmediatelyCountThreshold ||
380 1 : writerOngoingLatency > 2*lastWriter.ongoingLatencyAtSwitch {
381 1 : switchDir = true
382 1 : lastWriter.ongoingLatencyAtSwitch = writerOngoingLatency
383 1 : }
384 : // Else high latency, but not high enough yet to motivate switch.
385 1 : } else if dirIndex == secondaryDirIndex {
386 1 : // The writer looks healthy. We can still switch if the writer is using the
387 1 : // secondary dir and the primary is healthy again.
388 1 : primaryMean, primaryMax := m.prober.getMeanMax(m.opts.HealthyInterval)
389 1 : if primaryMean < m.opts.HealthyProbeLatencyThreshold &&
390 1 : primaryMax < m.opts.HealthyProbeLatencyThreshold {
391 1 : switchDir = true
392 1 : }
393 : }
394 : }
395 1 : if switchDir {
396 1 : lastWriter.numSwitches++
397 1 : if dirIndex == secondaryDirIndex {
398 1 : // Switching back to primary, so don't need to probe to see if
399 1 : // primary is healthy.
400 1 : m.prober.disableProbing()
401 1 : dirIndex = primaryDirIndex
402 1 : } else {
403 1 : m.prober.enableProbing()
404 1 : dirIndex = secondaryDirIndex
405 1 : }
406 1 : dir := m.opts.dirs[dirIndex]
407 1 : m.mu.Lock()
408 1 : now := m.opts.timeSource.now()
409 1 : m.accumulateDurationLocked(now)
410 1 : m.mu.dirIndex = dirIndex
411 1 : m.mu.dirSwitchCount++
412 1 : if dirIndex == primaryDirIndex {
413 1 : m.mu.lastFailBackTime = now
414 1 : }
415 1 : if m.mu.writer != nil {
416 1 : m.mu.writer.switchToNewDir(dir)
417 1 : }
418 1 : m.mu.Unlock()
419 : }
420 : }
421 1 : if m.opts.monitorStateForTesting != nil {
422 1 : m.opts.monitorStateForTesting(lastWriter.numSwitches, lastWriter.ongoingLatencyAtSwitch)
423 1 : }
424 1 : if m.opts.monitorIterationForTesting != nil {
425 1 : m.opts.monitorIterationForTesting <- struct{}{}
426 1 : }
427 : }
428 : }
429 :
430 : type logicalLogWithSizesEtc struct {
431 : num NumWAL
432 : segments []segmentWithSizeEtc
433 : }
434 :
435 : type segmentWithSizeEtc struct {
436 : segment
437 : approxFileSize uint64
438 : synchronouslyClosed bool
439 : }
440 :
441 : type failoverManager struct {
442 : opts Options
443 : // initialObsolete holds the set of DeletableLogs that formed the logs
444 : // passed into Init. The initialObsolete logs are all obsolete. Once
445 : // returned via Manager.Obsolete, initialObsolete is cleared. The
446 : // initialObsolete logs are stored separately from mu.queue because they may
447 : // include logs that were NOT created by the standalone manager, and
448 : // multiple physical log files may form one logical WAL.
449 : initialObsolete []DeletableLog
450 :
451 : // TODO(jackson/sumeer): read-path etc.
452 :
453 : dirHandles [numDirIndices]vfs.File
454 : stopper *stopper
455 : monitor *failoverMonitor
456 : mu struct {
457 : sync.Mutex
458 : closedWALs []logicalLogWithSizesEtc
459 : ww *failoverWriter
460 : }
461 : recycler LogRecycler
462 : // Due to async creation of files in failoverWriter, multiple goroutines can
463 : // concurrently try to get a file from the recycler. This mutex protects the
464 : // logRecycler.{Peek,Pop} pair.
465 : recyclerPeekPopMu sync.Mutex
466 : }
467 :
468 : var _ Manager = &failoverManager{}
469 :
470 : // TODO(sumeer):
471 : // - log deletion: if record.LogWriter did not close yet, the cleaner may
472 : // get an error when deleting or renaming (only under windows?).
473 :
474 : // init implements Manager.
475 1 : func (wm *failoverManager) init(o Options, initial Logs) error {
476 1 : if o.timeSource == nil {
477 1 : o.timeSource = defaultTime{}
478 1 : }
479 1 : o.FailoverOptions.EnsureDefaults()
480 1 :
481 1 : // Synchronously ensure that we're able to write to the secondary before we
482 1 : // proceed. An operator doesn't want to encounter an issue writing to the
483 1 : // secondary the first time there's a need to failover. We write a bit of
484 1 : // metadata to a file in the secondary's directory.
485 1 : f, err := o.Secondary.FS.Create(o.Secondary.FS.PathJoin(o.Secondary.Dirname, "failover_source"), "pebble-wal")
486 1 : if err != nil {
487 1 : return errors.Newf("failed to write to WAL secondary dir: %v", err)
488 1 : }
489 1 : if _, err := io.WriteString(f, fmt.Sprintf("primary: %s\nprocess start: %s\n",
490 1 : o.Primary.Dirname,
491 1 : time.Now(),
492 1 : )); err != nil {
493 0 : return errors.Newf("failed to write metadata to WAL secondary dir: %v", err)
494 0 : }
495 1 : if err := errors.CombineErrors(f.Sync(), f.Close()); err != nil {
496 0 : return err
497 0 : }
498 :
499 1 : stopper := newStopper()
500 1 : var dirs [numDirIndices]dirAndFileHandle
501 1 : for i, dir := range []Dir{o.Primary, o.Secondary} {
502 1 : dirs[i].Dir = dir
503 1 : f, err := dir.FS.OpenDir(dir.Dirname)
504 1 : if err != nil {
505 0 : return err
506 0 : }
507 1 : dirs[i].File = f
508 : }
509 1 : fmOpts := failoverMonitorOptions{
510 1 : dirs: dirs,
511 1 : FailoverOptions: o.FailoverOptions,
512 1 : stopper: stopper,
513 1 : }
514 1 : monitor := newFailoverMonitor(fmOpts)
515 1 : *wm = failoverManager{
516 1 : opts: o,
517 1 : dirHandles: [numDirIndices]vfs.File{dirs[primaryDirIndex].File, dirs[secondaryDirIndex].File},
518 1 : stopper: stopper,
519 1 : monitor: monitor,
520 1 : }
521 1 : wm.recycler.Init(o.MaxNumRecyclableLogs)
522 1 : for _, ll := range initial {
523 1 : if wm.recycler.MinRecycleLogNum() <= ll.Num {
524 1 : wm.recycler.SetMinRecycleLogNum(ll.Num + 1)
525 1 : }
526 1 : var err error
527 1 : wm.initialObsolete, err = appendDeletableLogs(wm.initialObsolete, ll)
528 1 : if err != nil {
529 0 : return err
530 0 : }
531 : }
532 1 : return nil
533 : }
534 :
535 : // List implements Manager.
536 1 : func (wm *failoverManager) List() Logs {
537 1 : wm.mu.Lock()
538 1 : defer wm.mu.Unlock()
539 1 : n := len(wm.mu.closedWALs)
540 1 : if wm.mu.ww != nil {
541 1 : n++
542 1 : }
543 1 : wals := make(Logs, n)
544 1 : setLogicalLog := func(index int, llse logicalLogWithSizesEtc) {
545 1 : segments := make([]segment, len(llse.segments))
546 1 : for j := range llse.segments {
547 1 : segments[j] = llse.segments[j].segment
548 1 : }
549 1 : wals[index] = LogicalLog{
550 1 : Num: llse.num,
551 1 : segments: segments,
552 1 : }
553 : }
554 1 : for i, llse := range wm.mu.closedWALs {
555 1 : setLogicalLog(i, llse)
556 1 : }
557 1 : if wm.mu.ww != nil {
558 1 : setLogicalLog(n-1, wm.mu.ww.getLog())
559 1 : }
560 1 : return wals
561 : }
562 :
563 : // Obsolete implements Manager.
564 : func (wm *failoverManager) Obsolete(
565 : minUnflushedNum NumWAL, noRecycle bool,
566 1 : ) (toDelete []DeletableLog, err error) {
567 1 : wm.mu.Lock()
568 1 : defer wm.mu.Unlock()
569 1 :
570 1 : // If this is the first call to Obsolete after Open, we may have deletable
571 1 : // logs outside the queue.
572 1 : toDelete, wm.initialObsolete = wm.initialObsolete, nil
573 1 :
574 1 : i := 0
575 1 : for ; i < len(wm.mu.closedWALs); i++ {
576 1 : ll := wm.mu.closedWALs[i]
577 1 : if ll.num >= minUnflushedNum {
578 1 : break
579 : }
580 : // Recycle only the primary at logNameIndex=0, if there was no failover,
581 : // and synchronously closed. It may not be safe to recycle a file that is
582 : // still being written to. And recycling when there was a failover may
583 : // fill up the recycler with smaller log files. The restriction regarding
584 : // logNameIndex=0 is because logRecycler.Peek only exposes the
585 : // DiskFileNum, and we need to use that to construct the path -- we could
586 : // remove this restriction by changing the logRecycler interface, but we
587 : // don't bother.
588 1 : canRecycle := !noRecycle && len(ll.segments) == 1 && ll.segments[0].synchronouslyClosed &&
589 1 : ll.segments[0].logNameIndex == 0 &&
590 1 : ll.segments[0].dir == wm.opts.Primary
591 1 : if !canRecycle || !wm.recycler.Add(base.FileInfo{
592 1 : FileNum: base.DiskFileNum(ll.num),
593 1 : FileSize: ll.segments[0].approxFileSize,
594 1 : }) {
595 1 : for _, s := range ll.segments {
596 1 : toDelete = append(toDelete, DeletableLog{
597 1 : FS: s.dir.FS,
598 1 : Path: s.dir.FS.PathJoin(s.dir.Dirname, makeLogFilename(ll.num, s.logNameIndex)),
599 1 : NumWAL: ll.num,
600 1 : ApproxFileSize: s.approxFileSize,
601 1 : })
602 1 : }
603 : }
604 : }
605 1 : wm.mu.closedWALs = wm.mu.closedWALs[i:]
606 1 : return toDelete, nil
607 : }
608 :
609 : // Create implements Manager.
610 1 : func (wm *failoverManager) Create(wn NumWAL, jobID int) (Writer, error) {
611 1 : func() {
612 1 : wm.mu.Lock()
613 1 : defer wm.mu.Unlock()
614 1 : if wm.mu.ww != nil {
615 0 : panic("previous wal.Writer not closed")
616 : }
617 : }()
618 1 : fwOpts := failoverWriterOpts{
619 1 : wn: wn,
620 1 : logger: wm.opts.Logger,
621 1 : timeSource: wm.opts.timeSource,
622 1 : jobID: jobID,
623 1 : logCreator: wm.logCreator,
624 1 : noSyncOnClose: wm.opts.NoSyncOnClose,
625 1 : bytesPerSync: wm.opts.BytesPerSync,
626 1 : preallocateSize: wm.opts.PreallocateSize,
627 1 : minSyncInterval: wm.opts.MinSyncInterval,
628 1 : fsyncLatency: wm.opts.FsyncLatency,
629 1 : queueSemChan: wm.opts.QueueSemChan,
630 1 : stopper: wm.stopper,
631 1 : failoverWriteAndSyncLatency: wm.opts.FailoverWriteAndSyncLatency,
632 1 : writerClosed: wm.writerClosed,
633 1 : writerCreatedForTest: wm.opts.logWriterCreatedForTesting,
634 1 : }
635 1 : var err error
636 1 : var ww *failoverWriter
637 1 : writerCreateFunc := func(dir dirAndFileHandle) switchableWriter {
638 1 : ww, err = newFailoverWriter(fwOpts, dir)
639 1 : if err != nil {
640 0 : return nil
641 0 : }
642 1 : return ww
643 : }
644 1 : wm.monitor.newWriter(writerCreateFunc)
645 1 : if ww != nil {
646 1 : wm.mu.Lock()
647 1 : defer wm.mu.Unlock()
648 1 : wm.mu.ww = ww
649 1 : }
650 1 : return ww, err
651 : }
652 :
653 : // ElevateWriteStallThresholdForFailover implements Manager.
654 1 : func (wm *failoverManager) ElevateWriteStallThresholdForFailover() bool {
655 1 : return wm.monitor.elevateWriteStallThresholdForFailover()
656 1 : }
657 :
658 1 : func (wm *failoverManager) writerClosed(llse logicalLogWithSizesEtc) {
659 1 : wm.monitor.noWriter()
660 1 : wm.mu.Lock()
661 1 : defer wm.mu.Unlock()
662 1 : wm.mu.closedWALs = append(wm.mu.closedWALs, llse)
663 1 : wm.mu.ww = nil
664 1 : }
665 :
666 : // Stats implements Manager.
667 1 : func (wm *failoverManager) Stats() Stats {
668 1 : obsoleteLogsCount, obsoleteLogSize := wm.recycler.Stats()
669 1 : failoverStats := wm.monitor.stats()
670 1 : failoverStats.FailoverWriteAndSyncLatency = wm.opts.FailoverWriteAndSyncLatency
671 1 : wm.mu.Lock()
672 1 : defer wm.mu.Unlock()
673 1 : var liveFileCount int
674 1 : var liveFileSize uint64
675 1 : updateStats := func(segments []segmentWithSizeEtc) {
676 1 : for _, s := range segments {
677 1 : liveFileCount++
678 1 : liveFileSize += s.approxFileSize
679 1 : }
680 : }
681 1 : for _, llse := range wm.mu.closedWALs {
682 1 : updateStats(llse.segments)
683 1 : }
684 1 : if wm.mu.ww != nil {
685 1 : updateStats(wm.mu.ww.getLog().segments)
686 1 : }
687 1 : for i := range wm.initialObsolete {
688 1 : if i == 0 || wm.initialObsolete[i].NumWAL != wm.initialObsolete[i-1].NumWAL {
689 1 : obsoleteLogsCount++
690 1 : }
691 1 : obsoleteLogSize += wm.initialObsolete[i].ApproxFileSize
692 : }
693 1 : return Stats{
694 1 : ObsoleteFileCount: obsoleteLogsCount,
695 1 : ObsoleteFileSize: obsoleteLogSize,
696 1 : LiveFileCount: liveFileCount,
697 1 : LiveFileSize: liveFileSize,
698 1 : Failover: failoverStats,
699 1 : }
700 : }
701 :
702 : // Close implements Manager.
703 1 : func (wm *failoverManager) Close() error {
704 1 : wm.stopper.stop()
705 1 : // Since all goroutines are stopped, can close the dirs.
706 1 : var err error
707 1 : for _, f := range wm.dirHandles {
708 1 : err = firstError(err, f.Close())
709 1 : }
710 1 : return err
711 : }
712 :
713 : // RecyclerForTesting implements Manager.
714 0 : func (wm *failoverManager) RecyclerForTesting() *LogRecycler {
715 0 : return nil
716 0 : }
717 :
718 : // logCreator implements the logCreator func type.
719 : func (wm *failoverManager) logCreator(
720 : dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
721 1 : ) (logFile vfs.File, initialFileSize uint64, err error) {
722 1 : logFilename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li))
723 1 : isPrimary := dir == wm.opts.Primary
724 1 : // Only recycling when logNameIndex is 0 is a somewhat arbitrary choice.
725 1 : considerRecycle := li == 0 && isPrimary
726 1 : createInfo := CreateInfo{
727 1 : JobID: jobID,
728 1 : Path: logFilename,
729 1 : IsSecondary: !isPrimary,
730 1 : Num: wn,
731 1 : Err: nil,
732 1 : }
733 1 : defer func() {
734 1 : createInfo.Err = err
735 1 : if wm.opts.EventListener != nil {
736 1 : wm.opts.EventListener.LogCreated(createInfo)
737 1 : }
738 : }()
739 1 : if considerRecycle {
740 1 : // Try to use a recycled log file. Recycling log files is an important
741 1 : // performance optimization as it is faster to sync a file that has
742 1 : // already been written, than one which is being written for the first
743 1 : // time. This is due to the need to sync file metadata when a file is
744 1 : // being written for the first time. Note this is true even if file
745 1 : // preallocation is performed (e.g. fallocate).
746 1 : var recycleLog base.FileInfo
747 1 : var recycleOK bool
748 1 : func() {
749 1 : wm.recyclerPeekPopMu.Lock()
750 1 : defer wm.recyclerPeekPopMu.Unlock()
751 1 : recycleLog, recycleOK = wm.recycler.Peek()
752 1 : if recycleOK {
753 1 : if err = wm.recycler.Pop(recycleLog.FileNum); err != nil {
754 0 : panic(err)
755 : }
756 : }
757 : }()
758 1 : if recycleOK {
759 1 : createInfo.RecycledFileNum = recycleLog.FileNum
760 1 : recycleLogName := dir.FS.PathJoin(dir.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0))
761 1 : r.writeStart()
762 1 : logFile, err = dir.FS.ReuseForWrite(recycleLogName, logFilename, "pebble-wal")
763 1 : r.writeEnd(err)
764 1 : // TODO(sumeer): should we fatal since primary dir? At some point it is
765 1 : // better to fatal instead of continuing to failover.
766 1 : // base.MustExist(dir.FS, logFilename, wm.opts.Logger, err)
767 1 : if err != nil {
768 0 : // TODO(sumeer): we have popped from the logRecycler, which is
769 0 : // arguably correct, since we don't want to keep trying to reuse a log
770 0 : // that causes some error. But the original or new file may exist, and
771 0 : // no one will clean it up unless the process restarts.
772 0 : return nil, 0, err
773 0 : }
774 : // Figure out the recycled WAL size. This Stat is necessary because
775 : // ReuseForWrite's contract allows for removing the old file and
776 : // creating a new one. We don't know whether the WAL was actually
777 : // recycled.
778 : //
779 : // TODO(jackson): Adding a boolean to the ReuseForWrite return value
780 : // indicating whether or not the file was actually reused would allow us
781 : // to skip the stat and use recycleLog.FileSize.
782 1 : var finfo os.FileInfo
783 1 : finfo, err = logFile.Stat()
784 1 : if err != nil {
785 0 : logFile.Close()
786 0 : return nil, 0, err
787 0 : }
788 1 : initialFileSize = uint64(finfo.Size())
789 1 : return logFile, initialFileSize, nil
790 : }
791 : }
792 : // Did not recycle.
793 : //
794 : // Create file.
795 1 : r.writeStart()
796 1 : logFile, err = dir.FS.Create(logFilename, "pebble-wal")
797 1 : r.writeEnd(err)
798 1 : return logFile, 0, err
799 : }
800 :
801 : type stopper struct {
802 : quiescer chan struct{} // Closed when quiescing
803 : wg sync.WaitGroup
804 : }
805 :
806 1 : func newStopper() *stopper {
807 1 : return &stopper{
808 1 : quiescer: make(chan struct{}),
809 1 : }
810 1 : }
811 :
812 1 : func (s *stopper) runAsync(f func()) {
813 1 : s.wg.Add(1)
814 1 : go func() {
815 1 : f()
816 1 : s.wg.Done()
817 1 : }()
818 : }
819 :
820 : // shouldQuiesce returns a channel which will be closed when stop() has been
821 : // invoked and outstanding goroutines should begin to quiesce.
822 1 : func (s *stopper) shouldQuiesce() <-chan struct{} {
823 1 : return s.quiescer
824 1 : }
825 :
826 1 : func (s *stopper) stop() {
827 1 : close(s.quiescer)
828 1 : s.wg.Wait()
829 1 : }
830 :
831 : // timeSource and tickerI are extracted from CockroachDB's timeutil, with
832 : // removal of support for Timer, and added support in the manual
833 : // implementation for reset.
834 :
835 : // timeSource is used to interact with the clock and tickers. Abstracts
836 : // time.Now and time.NewTicker for testing.
837 : type timeSource interface {
838 : now() time.Time
839 : newTicker(duration time.Duration) tickerI
840 : }
841 :
842 : // tickerI is an interface wrapping time.Ticker.
843 : type tickerI interface {
844 : reset(duration time.Duration)
845 : stop()
846 : ch() <-chan time.Time
847 : }
848 :
849 : // defaultTime is a timeSource using the time package.
850 : type defaultTime struct{}
851 :
852 : var _ timeSource = defaultTime{}
853 :
854 1 : func (defaultTime) now() time.Time {
855 1 : return time.Now()
856 1 : }
857 :
858 1 : func (defaultTime) newTicker(duration time.Duration) tickerI {
859 1 : return (*defaultTicker)(time.NewTicker(duration))
860 1 : }
861 :
862 : // defaultTicker uses time.Ticker.
863 : type defaultTicker time.Ticker
864 :
865 : var _ tickerI = &defaultTicker{}
866 :
867 1 : func (t *defaultTicker) reset(duration time.Duration) {
868 1 : (*time.Ticker)(t).Reset(duration)
869 1 : }
870 :
871 1 : func (t *defaultTicker) stop() {
872 1 : (*time.Ticker)(t).Stop()
873 1 : }
874 :
875 1 : func (t *defaultTicker) ch() <-chan time.Time {
876 1 : return (*time.Ticker)(t).C
877 1 : }
878 :
879 : // Make lint happy.
880 : var _ = (*failoverMonitor).noWriter
881 : var _ = (*failoverManager).writerClosed
882 : var _ = (&stopper{}).shouldQuiesce
|