Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "strconv"
11 : "strings"
12 : "sync"
13 : "time"
14 :
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/record"
17 : "github.com/cockroachdb/pebble/vfs"
18 : "github.com/cockroachdb/redact"
19 : "github.com/prometheus/client_golang/prometheus"
20 : )
21 :
22 : // TODO(sumeer): write a high-level comment describing the approach.
23 :
24 : // Dir is used for storing log files.
25 : type Dir struct {
26 : FS vfs.FS
27 : Dirname string
28 : }
29 :
30 : // NumWAL is the number of the virtual WAL. It can map to one or more physical
31 : // log files. In standalone mode, it will map to exactly one log file. In
32 : // failover mode, it can map to many log files, which are totally ordered
33 : // (using a dense logNameIndex).
34 : //
35 : // In general, WAL refers to the virtual WAL, and file refers to a log file.
36 : // The Pebble MANIFEST only knows about virtual WALs and assigns numbers to
37 : // them. Additional mapping to one or more files happens in this package. If a
38 : // WAL maps to multiple files, the source of truth regarding that mapping is
39 : // the contents of the directories.
40 : type NumWAL base.DiskFileNum
41 :
42 : // String implements fmt.Stringer.
43 0 : func (s NumWAL) String() string { return base.DiskFileNum(s).String() }
44 :
45 : // LogNameIndex numbers log files within a WAL.
46 : type LogNameIndex uint32
47 :
48 : // String implements fmt.Stringer.
49 1 : func (li LogNameIndex) String() string {
50 1 : return fmt.Sprintf("%03d", li)
51 1 : }
52 :
53 : // makeLogFilename makes a log filename.
54 1 : func makeLogFilename(wn NumWAL, index LogNameIndex) string {
55 1 : if index == 0 {
56 1 : // Use a backward compatible name, for simplicity.
57 1 : return fmt.Sprintf("%s.log", base.DiskFileNum(wn).String())
58 1 : }
59 1 : return fmt.Sprintf("%s-%s.log", base.DiskFileNum(wn).String(), index)
60 : }
61 :
62 : // ParseLogFilename takes a base filename and parses it into its constituent
63 : // NumWAL and LogNameIndex. If the filename is not a log file, it returns false
64 : // for the final return value.
65 1 : func ParseLogFilename(name string) (NumWAL, LogNameIndex, bool) {
66 1 : i := strings.IndexByte(name, '.')
67 1 : if i < 0 || name[i:] != ".log" {
68 1 : return 0, 0, false
69 1 : }
70 1 : j := strings.IndexByte(name[:i], '-')
71 1 : if j < 0 {
72 1 : dfn, ok := base.ParseDiskFileNum(name[:i])
73 1 : if !ok {
74 0 : // We've considered returning an error for filenames that end in
75 0 : // '.log' but fail to parse correctly. We decided against it because
76 0 : // the '.log' suffix is used by Cockroach's daignostics log files.
77 0 : // It's conceivable that some of these found their way into a data
78 0 : // directory, and erroring would cause an issue for an existing
79 0 : // Cockroach deployment.
80 0 : return 0, 0, false
81 0 : }
82 1 : return NumWAL(dfn), 0, true
83 : }
84 1 : dfn, ok := base.ParseDiskFileNum(name[:j])
85 1 : if !ok {
86 0 : return 0, 0, false
87 0 : }
88 1 : li, err := strconv.ParseUint(name[j+1:i], 10, 64)
89 1 : if err != nil {
90 0 : return 0, 0, false
91 0 : }
92 1 : return NumWAL(dfn), LogNameIndex(li), true
93 : }
94 :
95 : // Options provides configuration for the Manager.
96 : type Options struct {
97 : // Primary dir for storing WAL files. It must already be created and synced
98 : // up to the root.
99 : Primary Dir
100 : // Secondary is used for failover. Optional. It must already be created and
101 : // synced up to the root.
102 : Secondary Dir
103 :
104 : // MinUnflushedLogNum is the smallest WAL number corresponding to
105 : // mutations that have not been flushed to a sstable.
106 : MinUnflushedWALNum NumWAL
107 :
108 : // Recycling configuration. Only files in the primary dir are recycled.
109 :
110 : // MaxNumRecyclableLogs is the maximum number of log files to maintain for
111 : // recycling.
112 : MaxNumRecyclableLogs int
113 :
114 : // Configuration for calling vfs.NewSyncingFile.
115 :
116 : // NoSyncOnClose is documented in SyncingFileOptions.
117 : NoSyncOnClose bool
118 : // BytesPerSync is documented in SyncingFileOptions.
119 : BytesPerSync int
120 : // PreallocateSize is documented in SyncingFileOptions.
121 : PreallocateSize func() int
122 :
123 : // MinSyncInterval is documented in Options.WALMinSyncInterval.
124 : MinSyncInterval func() time.Duration
125 : // FsyncLatency records fsync latency. This doesn't differentiate between
126 : // fsyncs on the primary and secondary dir.
127 : //
128 : // TODO(sumeer): consider separating out into two histograms.
129 : FsyncLatency prometheus.Histogram
130 : // QueueSemChan is the channel to pop from when popping from queued records
131 : // that have requested a sync. It's original purpose was to function as a
132 : // semaphore that prevents the record.LogWriter.flusher.syncQueue from
133 : // overflowing (which will cause a panic). It is still useful in that role
134 : // when the WALManager is configured in standalone mode. In failover mode
135 : // there is no syncQueue, so the pushback into the commit pipeline is
136 : // unnecessary, but possibly harmless.
137 : QueueSemChan chan struct{}
138 :
139 : // Logger for logging.
140 : Logger base.Logger
141 :
142 : // EventListener is called on events, like log file creation.
143 : EventListener EventListener
144 :
145 : FailoverOptions
146 : // FailoverWriteAndSyncLatency is only populated when WAL failover is
147 : // configured.
148 : FailoverWriteAndSyncLatency prometheus.Histogram
149 : }
150 :
151 : // Init constructs and initializes a WAL manager from the provided options and
152 : // the set of initial logs.
153 1 : func Init(o Options, initial Logs) (Manager, error) {
154 1 : var m Manager
155 1 : if o.Secondary == (Dir{}) {
156 1 : m = new(StandaloneManager)
157 1 : } else {
158 1 : m = new(failoverManager)
159 1 : }
160 1 : if err := m.init(o, initial); err != nil {
161 0 : return nil, err
162 0 : }
163 1 : return m, nil
164 : }
165 :
166 : // Dirs returns the primary Dir and the secondary if provided.
167 1 : func (o *Options) Dirs() []Dir {
168 1 : if o.Secondary == (Dir{}) {
169 1 : return []Dir{o.Primary}
170 1 : }
171 1 : return []Dir{o.Primary, o.Secondary}
172 : }
173 :
174 : // FailoverOptions are options that are specific to failover mode.
175 : type FailoverOptions struct {
176 : // PrimaryDirProbeInterval is the interval for probing the primary dir, when
177 : // the WAL is being written to the secondary, to decide when to fail back.
178 : PrimaryDirProbeInterval time.Duration
179 : // HealthyProbeLatencyThreshold is the latency threshold to declare that the
180 : // primary is healthy again.
181 : HealthyProbeLatencyThreshold time.Duration
182 : // HealthyInterval is the time interval over which the probes have to be
183 : // healthy. That is, we look at probe history of length
184 : // HealthyInterval/PrimaryDirProbeInterval.
185 : HealthyInterval time.Duration
186 :
187 : // UnhealthySamplingInterval is the interval for sampling ongoing calls and
188 : // errors in the latest LogWriter.
189 : UnhealthySamplingInterval time.Duration
190 : // UnhealthyOperationLatencyThreshold is the latency threshold that is
191 : // considered unhealthy, for operations done by a LogWriter. The second return
192 : // value indicates whether we should consider failover at all. If the second
193 : // return value is false, failover is disabled.
194 : UnhealthyOperationLatencyThreshold func() (time.Duration, bool)
195 :
196 : // ElevatedWriteStallThresholdLag is the duration for which an elevated
197 : // threshold should continue after a switch back to the primary dir. This is
198 : // because we may have accumulated many unflushed memtables and flushing
199 : // them can take some time. Maybe set to 60s.
200 : ElevatedWriteStallThresholdLag time.Duration
201 :
202 : // timeSource is only non-nil for tests.
203 : timeSource
204 :
205 : monitorIterationForTesting chan<- struct{}
206 : proberIterationForTesting chan<- struct{}
207 : monitorStateForTesting func(numSwitches int, ongoingLatencyAtSwitch time.Duration)
208 : logWriterCreatedForTesting chan<- struct{}
209 : }
210 :
211 : // EnsureDefaults ensures that the default values for all options are set if a
212 : // valid value was not already specified.
213 1 : func (o *FailoverOptions) EnsureDefaults() {
214 1 : if o.PrimaryDirProbeInterval == 0 {
215 0 : o.PrimaryDirProbeInterval = time.Second
216 0 : }
217 1 : if o.HealthyProbeLatencyThreshold == 0 {
218 0 : o.HealthyProbeLatencyThreshold = 25 * time.Millisecond
219 0 : }
220 1 : if o.HealthyInterval == 0 {
221 0 : o.HealthyInterval = 15 * time.Second
222 0 : }
223 1 : if o.UnhealthySamplingInterval == 0 {
224 0 : o.UnhealthySamplingInterval = 100 * time.Millisecond
225 0 : }
226 1 : if o.UnhealthyOperationLatencyThreshold == nil {
227 0 : o.UnhealthyOperationLatencyThreshold = func() (time.Duration, bool) {
228 0 : return 100 * time.Millisecond, true
229 0 : }
230 : }
231 1 : if o.ElevatedWriteStallThresholdLag == 0 {
232 0 : o.ElevatedWriteStallThresholdLag = 60 * time.Second
233 0 : }
234 : }
235 :
236 : // EventListener is called on events, like log file creation.
237 : type EventListener interface {
238 : // LogCreated informs the listener of a log file creation.
239 : LogCreated(CreateInfo)
240 : }
241 :
242 : // CreateInfo contains info about a log file creation event.
243 : type CreateInfo struct {
244 : // JobID is the ID of the job the caused the WAL to be created.
245 : //
246 : // TODO(sumeer): for a file created later due to the need to failover, we
247 : // need to provide a JobID generator func in Options.
248 : JobID int
249 : // Path to the file. This includes the NumWAL, and implicitly or explicitly
250 : // includes the logNameIndex.
251 : Path string
252 : // IsSecondary is true if the file was created on the secondary.
253 : IsSecondary bool
254 : // Num is the WAL number.
255 : Num NumWAL
256 : // RecycledFileNum is the file number of a previous log file which was
257 : // recycled to create this one. Zero if recycling did not take place.
258 : RecycledFileNum base.DiskFileNum
259 : // Err contains any error.
260 : Err error
261 : }
262 :
263 : // Stats exposes stats used in Pebble metrics.
264 : //
265 : // NB: Metrics.WAL.{Size,BytesIn,BytesWritten} are not maintained by the wal
266 : // package.
267 : //
268 : // TODO(sumeer): with failover, Metrics.WAL.BytesWritten needs to be
269 : // maintained here.
270 : type Stats struct {
271 : // ObsoleteFileCount is the number of obsolete log files.
272 : ObsoleteFileCount int
273 : // ObsoleteFileSize is the total size of obsolete log files.
274 : ObsoleteFileSize uint64
275 : // LiveFileCount is the number of live log files.
276 : LiveFileCount int
277 : // LiveFileSize is the total size of live log files. This can be higher than
278 : // LiveSize due to log recycling (a live log file may be larger than the
279 : // size used in its latest incarnation), or failover (resulting in multiple
280 : // log files containing the same records).
281 : //
282 : // This is updated only when log files are closed, to minimize
283 : // synchronization.
284 : LiveFileSize uint64
285 : // Failover contains failover stats.
286 : Failover FailoverStats
287 : }
288 :
289 : // FailoverStats contains stats about WAL failover. These are empty if
290 : // failover is not configured.
291 : type FailoverStats struct {
292 : // DirSwitchCount is the number of times WAL writing has switched to a
293 : // different directory, either due to failover, when the current dir is
294 : // unhealthy, or to failback to the primary, when the primary is healthy
295 : // again.
296 : DirSwitchCount int64
297 : // The following durations do not account for continued background writes to
298 : // a directory that has been switched away from. These background writes can
299 : // happen because of queued records.
300 :
301 : // PrimaryWriteDuration is the cumulative duration for which WAL writes are
302 : // using the primary directory.
303 : PrimaryWriteDuration time.Duration
304 : // SecondaryWriteDuration is the cumulative duration for which WAL writes
305 : // are using the secondary directory.
306 : SecondaryWriteDuration time.Duration
307 :
308 : // FailoverWriteAndSyncLatency measures the latency of writing and syncing a
309 : // set of writes that were synced together. Each sample represents the
310 : // highest latency observed across the writes in the set of writes. It gives
311 : // us a sense of the user-observed latency, which can be much lower than the
312 : // underlying fsync latency, when WAL failover is working effectively.
313 : FailoverWriteAndSyncLatency prometheus.Histogram
314 : }
315 :
316 : // Manager handles all WAL work.
317 : //
318 : // - Obsolete can be called concurrently with WAL writing.
319 : // - WAL writing: Is done via Create, and the various Writer methods. These
320 : // are required to be serialized via external synchronization (specifically,
321 : // the caller does it via commitPipeline.mu).
322 : type Manager interface {
323 : // init initializes the Manager. init is called during DB initialization.
324 : init(o Options, initial Logs) error
325 :
326 : // List returns the virtual WALs in ascending order. List must not perform
327 : // I/O.
328 : List() Logs
329 : // Obsolete informs the manager that all virtual WALs less than
330 : // minUnflushedNum are obsolete. The callee can choose to recycle some
331 : // underlying log files, if !noRecycle. The log files that are not recycled,
332 : // and therefore can be deleted, are returned. The deletable files are no
333 : // longer tracked by the manager.
334 : Obsolete(minUnflushedNum NumWAL, noRecycle bool) (toDelete []DeletableLog, err error)
335 : // Create creates a new virtual WAL.
336 : //
337 : // NumWALs passed to successive Create calls must be monotonically
338 : // increasing, and be greater than any NumWAL seen earlier. The caller must
339 : // close the previous Writer before calling Create.
340 : //
341 : // jobID is used for the WALEventListener.
342 : Create(wn NumWAL, jobID int) (Writer, error)
343 : // ElevateWriteStallThresholdForFailover returns true if the caller should
344 : // use a high write stall threshold because the WALs are being written to
345 : // the secondary dir.
346 : ElevateWriteStallThresholdForFailover() bool
347 : // Stats returns the latest Stats.
348 : Stats() Stats
349 : // Close the manager.
350 : // REQUIRES: Writers and Readers have already been closed.
351 : Close() error
352 :
353 : // RecyclerForTesting exposes the internal LogRecycler.
354 : RecyclerForTesting() *LogRecycler
355 : }
356 :
357 : // DeletableLog contains information about a log file that can be deleted.
358 : type DeletableLog struct {
359 : vfs.FS
360 : // Path to the file.
361 : Path string
362 : NumWAL
363 : ApproxFileSize uint64
364 : }
365 :
366 : // SyncOptions has non-nil Done and Err when fsync is requested, else both are
367 : // nil.
368 : type SyncOptions struct {
369 : Done *sync.WaitGroup
370 : Err *error
371 : }
372 :
373 : // Writer writes to a virtual WAL. A Writer in standalone mode maps to a
374 : // single record.LogWriter. In failover mode, it can failover across multiple
375 : // physical log files.
376 : type Writer interface {
377 : // WriteRecord writes a complete record. The record is asynchronously
378 : // persisted to the underlying writer. If SyncOptions.Done != nil, the wait
379 : // group will be notified when durability is guaranteed or an error has
380 : // occurred (set in SyncOptions.Err). External synchronisation provided by
381 : // commitPipeline.mu guarantees that WriteRecord calls are serialized.
382 : //
383 : // The logicalOffset is the logical size of the WAL after this record is
384 : // written. If the WAL corresponds to a single log file, this is the offset
385 : // in that log file.
386 : //
387 : // Some Writer implementations may continue to read p after WriteRecord
388 : // returns. This is an obstacle to reusing p's memory. If the caller would
389 : // like to reuse p's memory, the caller may pass a non-nil [RefCount]. If
390 : // the Writer will retain p, it will invoke the [RefCount] before returning.
391 : // When it's finished, it will invoke [RefCount.Unref] to release its
392 : // reference.
393 : WriteRecord(p []byte, opts SyncOptions, ref RefCount) (logicalOffset int64, err error)
394 : // Close the writer.
395 : Close() (logicalOffset int64, err error)
396 : // Metrics must be called after Close. The callee will no longer modify the
397 : // returned LogWriterMetrics.
398 : Metrics() record.LogWriterMetrics
399 : }
400 :
401 : // RefCount is a reference count associated with a record passed to
402 : // [Writer.WriteRecord]. See the comment on WriteRecord.
403 : type RefCount interface {
404 : // Ref increments the reference count.
405 : Ref()
406 : // Unref increments the reference count.
407 : Unref()
408 : }
409 :
410 : // Reader reads a virtual WAL.
411 : type Reader interface {
412 : // NextRecord returns a reader for the next record. It returns io.EOF if there
413 : // are no more records. The reader returned becomes stale after the next NextRecord
414 : // call, and should no longer be used.
415 : NextRecord() (io.Reader, Offset, error)
416 : // Close the reader.
417 : Close() error
418 : }
419 :
420 : // Offset indicates the offset or position of a record within a WAL.
421 : type Offset struct {
422 : // PhysicalFile is the path to the physical file containing a particular
423 : // record.
424 : PhysicalFile string
425 : // Physical indicates the file offset at which a record begins within
426 : // the physical file named by PhysicalFile.
427 : Physical int64
428 : // PreviousFilesBytes is the bytes read from all the previous physical
429 : // segment files that have been read up to the current log segment. If WAL
430 : // failover is not in use, PreviousFileBytes will always be zero. Otherwise,
431 : // it may be non-zero when replaying records from multiple segment files
432 : // that make up a single logical WAL.
433 : PreviousFilesBytes int64
434 : }
435 :
436 : // String implements fmt.Stringer, returning a string representation of the
437 : // offset.
438 1 : func (o Offset) String() string {
439 1 : return redact.StringWithoutMarkers(o)
440 1 : }
441 :
442 : // SafeFormat implements redact.SafeFormatter.
443 1 : func (o Offset) SafeFormat(w redact.SafePrinter, _ rune) {
444 1 : if o.PreviousFilesBytes > 0 {
445 1 : w.Printf("(%s: %d), %d from previous files", o.PhysicalFile, o.Physical, o.PreviousFilesBytes)
446 1 : return
447 1 : }
448 1 : w.Printf("(%s: %d)", o.PhysicalFile, o.Physical)
449 :
450 : }
|