Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "strconv"
11 : "strings"
12 : "sync"
13 : "time"
14 :
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/record"
17 : "github.com/cockroachdb/pebble/vfs"
18 : "github.com/prometheus/client_golang/prometheus"
19 : )
20 :
21 : // TODO(sumeer): write a high-level comment describing the approach.
22 :
23 : // Dir is used for storing log files.
24 : type Dir struct {
25 : FS vfs.FS
26 : Dirname string
27 : }
28 :
29 : // NumWAL is the number of the virtual WAL. It can map to one or more physical
30 : // log files. In standalone mode, it will map to exactly one log file. In
31 : // failover mode, it can map to many log files, which are totally ordered
32 : // (using a dense logNameIndex).
33 : //
34 : // In general, WAL refers to the virtual WAL, and file refers to a log file.
35 : // The Pebble MANIFEST only knows about virtual WALs and assigns numbers to
36 : // them. Additional mapping to one or more files happens in this package. If a
37 : // WAL maps to multiple files, the source of truth regarding that mapping is
38 : // the contents of the directories.
39 : type NumWAL base.DiskFileNum
40 :
41 : // String implements fmt.Stringer.
42 0 : func (s NumWAL) String() string { return base.DiskFileNum(s).String() }
43 :
44 : // LogNameIndex numbers log files within a WAL.
45 : type LogNameIndex uint32
46 :
47 : // String implements fmt.Stringer.
48 1 : func (li LogNameIndex) String() string {
49 1 : return fmt.Sprintf("%03d", li)
50 1 : }
51 :
52 : // makeLogFilename makes a log filename.
53 1 : func makeLogFilename(wn NumWAL, index LogNameIndex) string {
54 1 : if index == 0 {
55 1 : // Use a backward compatible name, for simplicity.
56 1 : return fmt.Sprintf("%s.log", base.DiskFileNum(wn).String())
57 1 : }
58 1 : return fmt.Sprintf("%s-%s.log", base.DiskFileNum(wn).String(), index)
59 : }
60 :
61 : // ParseLogFilename takes a base filename and parses it into its constituent
62 : // NumWAL and LogNameIndex. If the filename is not a log file, it returns false
63 : // for the final return value.
64 1 : func ParseLogFilename(name string) (NumWAL, LogNameIndex, bool) {
65 1 : i := strings.IndexByte(name, '.')
66 1 : if i < 0 || name[i:] != ".log" {
67 1 : return 0, 0, false
68 1 : }
69 1 : j := strings.IndexByte(name[:i], '-')
70 1 : if j < 0 {
71 1 : dfn, ok := base.ParseDiskFileNum(name[:i])
72 1 : if !ok {
73 0 : // We've considered returning an error for filenames that end in
74 0 : // '.log' but fail to parse correctly. We decided against it because
75 0 : // the '.log' suffix is used by Cockroach's daignostics log files.
76 0 : // It's conceivable that some of these found their way into a data
77 0 : // directory, and erroring would cause an issue for an existing
78 0 : // Cockroach deployment.
79 0 : return 0, 0, false
80 0 : }
81 1 : return NumWAL(dfn), 0, true
82 : }
83 1 : dfn, ok := base.ParseDiskFileNum(name[:j])
84 1 : if !ok {
85 0 : return 0, 0, false
86 0 : }
87 1 : li, err := strconv.ParseUint(name[j+1:i], 10, 64)
88 1 : if err != nil {
89 0 : return 0, 0, false
90 0 : }
91 1 : return NumWAL(dfn), LogNameIndex(li), true
92 : }
93 :
94 : // Options provides configuration for the Manager.
95 : type Options struct {
96 : // Primary dir for storing WAL files. It must already be created and synced
97 : // up to the root.
98 : Primary Dir
99 : // Secondary is used for failover. Optional. It must already be created and
100 : // synced up to the root.
101 : Secondary Dir
102 :
103 : // MinUnflushedLogNum is the smallest WAL number corresponding to
104 : // mutations that have not been flushed to a sstable.
105 : MinUnflushedWALNum NumWAL
106 :
107 : // Recycling configuration. Only files in the primary dir are recycled.
108 :
109 : // MaxNumRecyclableLogs is the maximum number of log files to maintain for
110 : // recycling.
111 : MaxNumRecyclableLogs int
112 :
113 : // Configuration for calling vfs.NewSyncingFile.
114 :
115 : // NoSyncOnClose is documented in SyncingFileOptions.
116 : NoSyncOnClose bool
117 : // BytesPerSync is documented in SyncingFileOptions.
118 : BytesPerSync int
119 : // PreallocateSize is documented in SyncingFileOptions.
120 : PreallocateSize func() int
121 :
122 : // MinSyncInterval is documented in Options.WALMinSyncInterval.
123 : MinSyncInterval func() time.Duration
124 : // FsyncLatency records fsync latency. This doesn't differentiate between
125 : // fsyncs on the primary and secondary dir.
126 : //
127 : // TODO(sumeer): consider separating out into two histograms.
128 : FsyncLatency prometheus.Histogram
129 : // QueueSemChan is the channel to pop from when popping from queued records
130 : // that have requested a sync. It's original purpose was to function as a
131 : // semaphore that prevents the record.LogWriter.flusher.syncQueue from
132 : // overflowing (which will cause a panic). It is still useful in that role
133 : // when the WALManager is configured in standalone mode. In failover mode
134 : // there is no syncQueue, so the pushback into the commit pipeline is
135 : // unnecessary, but possibly harmless.
136 : QueueSemChan chan struct{}
137 :
138 : // Logger for logging.
139 : Logger base.Logger
140 :
141 : // EventListener is called on events, like log file creation.
142 : EventListener EventListener
143 :
144 : FailoverOptions
145 : // FailoverWriteAndSyncLatency is only populated when WAL failover is
146 : // configured.
147 : FailoverWriteAndSyncLatency prometheus.Histogram
148 : }
149 :
150 : // Init constructs and initializes a WAL manager from the provided options and
151 : // the set of initial logs.
152 1 : func Init(o Options, initial Logs) (Manager, error) {
153 1 : var m Manager
154 1 : if o.Secondary == (Dir{}) {
155 1 : m = new(StandaloneManager)
156 1 : } else {
157 1 : m = new(failoverManager)
158 1 : }
159 1 : if err := m.init(o, initial); err != nil {
160 0 : return nil, err
161 0 : }
162 1 : return m, nil
163 : }
164 :
165 : // Dirs returns the primary Dir and the secondary if provided.
166 1 : func (o *Options) Dirs() []Dir {
167 1 : if o.Secondary == (Dir{}) {
168 1 : return []Dir{o.Primary}
169 1 : }
170 1 : return []Dir{o.Primary, o.Secondary}
171 : }
172 :
173 : // FailoverOptions are options that are specific to failover mode.
174 : type FailoverOptions struct {
175 : // PrimaryDirProbeInterval is the interval for probing the primary dir, when
176 : // the WAL is being written to the secondary, to decide when to fail back.
177 : PrimaryDirProbeInterval time.Duration
178 : // HealthyProbeLatencyThreshold is the latency threshold to declare that the
179 : // primary is healthy again.
180 : HealthyProbeLatencyThreshold time.Duration
181 : // HealthyInterval is the time interval over which the probes have to be
182 : // healthy. That is, we look at probe history of length
183 : // HealthyInterval/PrimaryDirProbeInterval.
184 : HealthyInterval time.Duration
185 :
186 : // UnhealthySamplingInterval is the interval for sampling ongoing calls and
187 : // errors in the latest LogWriter.
188 : UnhealthySamplingInterval time.Duration
189 : // UnhealthyOperationLatencyThreshold is the latency threshold that is
190 : // considered unhealthy, for operations done by a LogWriter. The second return
191 : // value indicates whether we should consider failover at all. If the second
192 : // return value is false, failover is disabled.
193 : UnhealthyOperationLatencyThreshold func() (time.Duration, bool)
194 :
195 : // ElevatedWriteStallThresholdLag is the duration for which an elevated
196 : // threshold should continue after a switch back to the primary dir. This is
197 : // because we may have accumulated many unflushed memtables and flushing
198 : // them can take some time. Maybe set to 60s.
199 : ElevatedWriteStallThresholdLag time.Duration
200 :
201 : // timeSource is only non-nil for tests.
202 : timeSource
203 :
204 : monitorIterationForTesting chan<- struct{}
205 : proberIterationForTesting chan<- struct{}
206 : monitorStateForTesting func(numSwitches int, ongoingLatencyAtSwitch time.Duration)
207 : logWriterCreatedForTesting chan<- struct{}
208 : }
209 :
210 : // EnsureDefaults ensures that the default values for all options are set if a
211 : // valid value was not already specified.
212 1 : func (o *FailoverOptions) EnsureDefaults() {
213 1 : if o.PrimaryDirProbeInterval == 0 {
214 0 : o.PrimaryDirProbeInterval = time.Second
215 0 : }
216 1 : if o.HealthyProbeLatencyThreshold == 0 {
217 0 : o.HealthyProbeLatencyThreshold = 25 * time.Millisecond
218 0 : }
219 1 : if o.HealthyInterval == 0 {
220 0 : o.HealthyInterval = 15 * time.Second
221 0 : }
222 1 : if o.UnhealthySamplingInterval == 0 {
223 0 : o.UnhealthySamplingInterval = 100 * time.Millisecond
224 0 : }
225 1 : if o.UnhealthyOperationLatencyThreshold == nil {
226 0 : o.UnhealthyOperationLatencyThreshold = func() (time.Duration, bool) {
227 0 : return 100 * time.Millisecond, true
228 0 : }
229 : }
230 1 : if o.ElevatedWriteStallThresholdLag == 0 {
231 0 : o.ElevatedWriteStallThresholdLag = 60 * time.Second
232 0 : }
233 : }
234 :
235 : // EventListener is called on events, like log file creation.
236 : type EventListener interface {
237 : // LogCreated informs the listener of a log file creation.
238 : LogCreated(CreateInfo)
239 : }
240 :
241 : // CreateInfo contains info about a log file creation event.
242 : type CreateInfo struct {
243 : // JobID is the ID of the job the caused the WAL to be created.
244 : //
245 : // TODO(sumeer): for a file created later due to the need to failover, we
246 : // need to provide a JobID generator func in Options.
247 : JobID int
248 : // Path to the file. This includes the NumWAL, and implicitly or explicitly
249 : // includes the logNameIndex.
250 : Path string
251 : // IsSecondary is true if the file was created on the secondary.
252 : IsSecondary bool
253 : // Num is the WAL number.
254 : Num NumWAL
255 : // RecycledFileNum is the file number of a previous log file which was
256 : // recycled to create this one. Zero if recycling did not take place.
257 : RecycledFileNum base.DiskFileNum
258 : // Err contains any error.
259 : Err error
260 : }
261 :
262 : // Stats exposes stats used in Pebble metrics.
263 : //
264 : // NB: Metrics.WAL.{Size,BytesIn,BytesWritten} are not maintained by the wal
265 : // package.
266 : //
267 : // TODO(sumeer): with failover, Metrics.WAL.BytesWritten needs to be
268 : // maintained here.
269 : type Stats struct {
270 : // ObsoleteFileCount is the number of obsolete log files.
271 : ObsoleteFileCount int
272 : // ObsoleteFileSize is the total size of obsolete log files.
273 : ObsoleteFileSize uint64
274 : // LiveFileCount is the number of live log files.
275 : LiveFileCount int
276 : // LiveFileSize is the total size of live log files. This can be higher than
277 : // LiveSize due to log recycling (a live log file may be larger than the
278 : // size used in its latest incarnation), or failover (resulting in multiple
279 : // log files containing the same records).
280 : //
281 : // This is updated only when log files are closed, to minimize
282 : // synchronization.
283 : LiveFileSize uint64
284 : // Failover contains failover stats.
285 : Failover FailoverStats
286 : }
287 :
288 : // FailoverStats contains stats about WAL failover. These are empty if
289 : // failover is not configured.
290 : type FailoverStats struct {
291 : // DirSwitchCount is the number of times WAL writing has switched to a
292 : // different directory, either due to failover, when the current dir is
293 : // unhealthy, or to failback to the primary, when the primary is healthy
294 : // again.
295 : DirSwitchCount int64
296 : // The following durations do not account for continued background writes to
297 : // a directory that has been switched away from. These background writes can
298 : // happen because of queued records.
299 :
300 : // PrimaryWriteDuration is the cumulative duration for which WAL writes are
301 : // using the primary directory.
302 : PrimaryWriteDuration time.Duration
303 : // SecondaryWriteDuration is the cumulative duration for which WAL writes
304 : // are using the secondary directory.
305 : SecondaryWriteDuration time.Duration
306 :
307 : // FailoverWriteAndSyncLatency measures the latency of writing and syncing a
308 : // set of writes that were synced together. Each sample represents the
309 : // highest latency observed across the writes in the set of writes. It gives
310 : // us a sense of the user-observed latency, which can be much lower than the
311 : // underlying fsync latency, when WAL failover is working effectively.
312 : FailoverWriteAndSyncLatency prometheus.Histogram
313 : }
314 :
315 : // Manager handles all WAL work.
316 : //
317 : // - Obsolete can be called concurrently with WAL writing.
318 : // - WAL writing: Is done via Create, and the various Writer methods. These
319 : // are required to be serialized via external synchronization (specifically,
320 : // the caller does it via commitPipeline.mu).
321 : type Manager interface {
322 : // init initializes the Manager. init is called during DB initialization.
323 : init(o Options, initial Logs) error
324 :
325 : // List returns the virtual WALs in ascending order.
326 : List() (Logs, error)
327 : // Obsolete informs the manager that all virtual WALs less than
328 : // minUnflushedNum are obsolete. The callee can choose to recycle some
329 : // underlying log files, if !noRecycle. The log files that are not recycled,
330 : // and therefore can be deleted, are returned. The deletable files are no
331 : // longer tracked by the manager.
332 : Obsolete(minUnflushedNum NumWAL, noRecycle bool) (toDelete []DeletableLog, err error)
333 : // Create creates a new virtual WAL.
334 : //
335 : // NumWALs passed to successive Create calls must be monotonically
336 : // increasing, and be greater than any NumWAL seen earlier. The caller must
337 : // close the previous Writer before calling Create.
338 : //
339 : // jobID is used for the WALEventListener.
340 : Create(wn NumWAL, jobID int) (Writer, error)
341 : // ElevateWriteStallThresholdForFailover returns true if the caller should
342 : // use a high write stall threshold because the WALs are being written to
343 : // the secondary dir.
344 : ElevateWriteStallThresholdForFailover() bool
345 : // Stats returns the latest Stats.
346 : Stats() Stats
347 : // Close the manager.
348 : // REQUIRES: Writers and Readers have already been closed.
349 : Close() error
350 :
351 : // RecyclerForTesting exposes the internal LogRecycler.
352 : RecyclerForTesting() *LogRecycler
353 : }
354 :
355 : // DeletableLog contains information about a log file that can be deleted.
356 : type DeletableLog struct {
357 : vfs.FS
358 : // Path to the file.
359 : Path string
360 : NumWAL
361 : ApproxFileSize uint64
362 : }
363 :
364 : // SyncOptions has non-nil Done and Err when fsync is requested, else both are
365 : // nil.
366 : type SyncOptions struct {
367 : Done *sync.WaitGroup
368 : Err *error
369 : }
370 :
371 : // Writer writes to a virtual WAL. A Writer in standalone mode maps to a
372 : // single record.LogWriter. In failover mode, it can failover across multiple
373 : // physical log files.
374 : type Writer interface {
375 : // WriteRecord writes a complete record. The record is asynchronously
376 : // persisted to the underlying writer. If SyncOptions.Done != nil, the wait
377 : // group will be notified when durability is guaranteed or an error has
378 : // occurred (set in SyncOptions.Err). External synchronisation provided by
379 : // commitPipeline.mu guarantees that WriteRecord calls are serialized.
380 : //
381 : // The logicalOffset is the logical size of the WAL after this record is
382 : // written. If the WAL corresponds to a single log file, this is the offset
383 : // in that log file.
384 : //
385 : // Some Writer implementations may continue to read p after WriteRecord
386 : // returns. This is an obstacle to reusing p's memory. If the caller would
387 : // like to reuse p's memory, the caller may pass a non-nil [RefCount]. If
388 : // the Writer will retain p, it will invoke the [RefCount] before returning.
389 : // When it's finished, it will invoke [RefCount.Unref] to release its
390 : // reference.
391 : WriteRecord(p []byte, opts SyncOptions, ref RefCount) (logicalOffset int64, err error)
392 : // Close the writer.
393 : Close() (logicalOffset int64, err error)
394 : // Metrics must be called after Close. The callee will no longer modify the
395 : // returned LogWriterMetrics.
396 : Metrics() record.LogWriterMetrics
397 : }
398 :
399 : // RefCount is a reference count associated with a record passed to
400 : // [Writer.WriteRecord]. See the comment on WriteRecord.
401 : type RefCount interface {
402 : // Ref increments the reference count.
403 : Ref()
404 : // Unref increments the reference count.
405 : Unref()
406 : }
407 :
408 : // Reader reads a virtual WAL.
409 : type Reader interface {
410 : // NextRecord returns a reader for the next record. It returns io.EOF if there
411 : // are no more records. The reader returned becomes stale after the next Next
412 : // call, and should no longer be used.
413 : NextRecord() (io.Reader, Offset, error)
414 : // Close the reader.
415 : Close() error
416 : }
417 :
418 : // Offset indicates the offset or position of a record within a WAL.
419 : type Offset struct {
420 : // PhysicalFile is the path to the physical file containing a particular
421 : // record.
422 : PhysicalFile string
423 : // Physical indicates the file offset at which a record begins within
424 : // the physical file named by PhysicalFile.
425 : Physical int64
426 : }
427 :
428 : // String implements fmt.Stringer, returning a string representation of the
429 : // offset.
430 0 : func (o Offset) String() string {
431 0 : return fmt.Sprintf("(%s: %d)", o.PhysicalFile, o.Physical)
432 0 : }
|