Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "strconv"
11 : "strings"
12 : "sync"
13 : "time"
14 :
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/record"
17 : "github.com/cockroachdb/pebble/vfs"
18 : "github.com/prometheus/client_golang/prometheus"
19 : )
20 :
21 : // TODO(sumeer): write a high-level comment describing the approach.
22 :
23 : // Dir is used for storing log files.
24 : type Dir struct {
25 : FS vfs.FS
26 : Dirname string
27 : }
28 :
29 : // NumWAL is the number of the virtual WAL. It can map to one or more physical
30 : // log files. In standalone mode, it will map to exactly one log file. In
31 : // failover mode, it can map to many log files, which are totally ordered
32 : // (using a dense logNameIndex).
33 : //
34 : // In general, WAL refers to the virtual WAL, and file refers to a log file.
35 : // The Pebble MANIFEST only knows about virtual WALs and assigns numbers to
36 : // them. Additional mapping to one or more files happens in this package. If a
37 : // WAL maps to multiple files, the source of truth regarding that mapping is
38 : // the contents of the directories.
39 : type NumWAL base.DiskFileNum
40 :
41 : // String implements fmt.Stringer.
42 1 : func (s NumWAL) String() string { return base.DiskFileNum(s).String() }
43 :
44 : // LogNameIndex numbers log files within a WAL.
45 : type LogNameIndex uint32
46 :
47 : // String implements fmt.Stringer.
48 2 : func (li LogNameIndex) String() string {
49 2 : return fmt.Sprintf("%03d", li)
50 2 : }
51 :
52 : // makeLogFilename makes a log filename.
53 2 : func makeLogFilename(wn NumWAL, index LogNameIndex) string {
54 2 : if index == 0 {
55 2 : // Use a backward compatible name, for simplicity.
56 2 : return fmt.Sprintf("%s.log", base.DiskFileNum(wn).String())
57 2 : }
58 2 : return fmt.Sprintf("%s-%s.log", base.DiskFileNum(wn).String(), index)
59 : }
60 :
61 : // ParseLogFilename takes a base filename and parses it into its constituent
62 : // NumWAL and LogNameIndex. If the filename is not a log file, it returns false
63 : // for the final return value.
64 2 : func ParseLogFilename(name string) (NumWAL, LogNameIndex, bool) {
65 2 : i := strings.IndexByte(name, '.')
66 2 : if i < 0 || name[i:] != ".log" {
67 2 : return 0, 0, false
68 2 : }
69 2 : j := strings.IndexByte(name[:i], '-')
70 2 : if j < 0 {
71 2 : dfn, ok := base.ParseDiskFileNum(name[:i])
72 2 : if !ok {
73 1 : // We've considered returning an error for filenames that end in
74 1 : // '.log' but fail to parse correctly. We decided against it because
75 1 : // the '.log' suffix is used by Cockroach's daignostics log files.
76 1 : // It's conceivable that some of these found their way into a data
77 1 : // directory, and erroring would cause an issue for an existing
78 1 : // Cockroach deployment.
79 1 : return 0, 0, false
80 1 : }
81 2 : return NumWAL(dfn), 0, true
82 : }
83 2 : dfn, ok := base.ParseDiskFileNum(name[:j])
84 2 : if !ok {
85 1 : return 0, 0, false
86 1 : }
87 2 : li, err := strconv.ParseUint(name[j+1:i], 10, 64)
88 2 : if err != nil {
89 1 : return 0, 0, false
90 1 : }
91 2 : return NumWAL(dfn), LogNameIndex(li), true
92 : }
93 :
94 : // Options provides configuration for the Manager.
95 : type Options struct {
96 : // Primary dir for storing WAL files. It must already be created and synced
97 : // up to the root.
98 : Primary Dir
99 : // Secondary is used for failover. Optional. It must already be created and
100 : // synced up to the root.
101 : Secondary Dir
102 :
103 : // MinUnflushedLogNum is the smallest WAL number corresponding to
104 : // mutations that have not been flushed to a sstable.
105 : MinUnflushedWALNum NumWAL
106 :
107 : // Recycling configuration. Only files in the primary dir are recycled.
108 :
109 : // MaxNumRecyclableLogs is the maximum number of log files to maintain for
110 : // recycling.
111 : MaxNumRecyclableLogs int
112 :
113 : // Configuration for calling vfs.NewSyncingFile.
114 :
115 : // NoSyncOnClose is documented in SyncingFileOptions.
116 : NoSyncOnClose bool
117 : // BytesPerSync is documented in SyncingFileOptions.
118 : BytesPerSync int
119 : // PreallocateSize is documented in SyncingFileOptions.
120 : PreallocateSize func() int
121 :
122 : // MinSyncInterval is documented in Options.WALMinSyncInterval.
123 : MinSyncInterval func() time.Duration
124 : // FsyncLatency records fsync latency. This doesn't differentiate between
125 : // fsyncs on the primary and secondary dir.
126 : //
127 : // TODO(sumeer): consider separating out into two histograms.
128 : FsyncLatency prometheus.Histogram
129 : // QueueSemChan is the channel to pop from when popping from queued records
130 : // that have requested a sync. It's original purpose was to function as a
131 : // semaphore that prevents the record.LogWriter.flusher.syncQueue from
132 : // overflowing (which will cause a panic). It is still useful in that role
133 : // when the WALManager is configured in standalone mode. In failover mode
134 : // there is no syncQueue, so the pushback into the commit pipeline is
135 : // unnecessary, but possibly harmless.
136 : QueueSemChan chan struct{}
137 :
138 : // Logger for logging.
139 : Logger base.Logger
140 :
141 : // EventListener is called on events, like log file creation.
142 : EventListener EventListener
143 :
144 : FailoverOptions
145 : }
146 :
147 : // Init constructs and initializes a WAL manager from the provided options and
148 : // the set of initial logs.
149 2 : func Init(o Options, initial Logs) (Manager, error) {
150 2 : var m Manager
151 2 : if o.Secondary == (Dir{}) {
152 2 : m = new(StandaloneManager)
153 2 : } else {
154 2 : m = new(failoverManager)
155 2 : }
156 2 : if err := m.init(o, initial); err != nil {
157 1 : return nil, err
158 1 : }
159 2 : return m, nil
160 : }
161 :
162 : // Dirs returns the primary Dir and the secondary if provided.
163 2 : func (o *Options) Dirs() []Dir {
164 2 : if o.Secondary == (Dir{}) {
165 2 : return []Dir{o.Primary}
166 2 : }
167 2 : return []Dir{o.Primary, o.Secondary}
168 : }
169 :
170 : // FailoverOptions are options that are specific to failover mode.
171 : type FailoverOptions struct {
172 : // PrimaryDirProbeInterval is the interval for probing the primary dir, when
173 : // the WAL is being written to the secondary, to decide when to fail back.
174 : PrimaryDirProbeInterval time.Duration
175 : // HealthyProbeLatencyThreshold is the latency threshold to declare that the
176 : // primary is healthy again.
177 : HealthyProbeLatencyThreshold time.Duration
178 : // HealthyInterval is the time interval over which the probes have to be
179 : // healthy. That is, we look at probe history of length
180 : // HealthyInterval/PrimaryDirProbeInterval.
181 : HealthyInterval time.Duration
182 :
183 : // UnhealthySamplingInterval is the interval for sampling ongoing calls and
184 : // errors in the latest LogWriter.
185 : UnhealthySamplingInterval time.Duration
186 : // UnhealthyOperationLatencyThreshold is the latency threshold that is
187 : // considered unhealthy, for operations done by a LogWriter. The second return
188 : // value indicates whether we should consider failover at all. If the second
189 : // return value is false, failover is disabled.
190 : UnhealthyOperationLatencyThreshold func() (time.Duration, bool)
191 :
192 : // ElevatedWriteStallThresholdLag is the duration for which an elevated
193 : // threshold should continue after a switch back to the primary dir. This is
194 : // because we may have accumulated many unflushed memtables and flushing
195 : // them can take some time. Maybe set to 60s.
196 : ElevatedWriteStallThresholdLag time.Duration
197 :
198 : // timeSource is only non-nil for tests.
199 : timeSource
200 :
201 : monitorIterationForTesting chan<- struct{}
202 : proberIterationForTesting chan<- struct{}
203 : monitorStateForTesting func(numSwitches int, ongoingLatencyAtSwitch time.Duration)
204 : logWriterCreatedForTesting chan<- struct{}
205 : }
206 :
207 : // EnsureDefaults ensures that the default values for all options are set if a
208 : // valid value was not already specified.
209 2 : func (o *FailoverOptions) EnsureDefaults() {
210 2 : if o.PrimaryDirProbeInterval == 0 {
211 1 : o.PrimaryDirProbeInterval = time.Second
212 1 : }
213 2 : if o.HealthyProbeLatencyThreshold == 0 {
214 1 : o.HealthyProbeLatencyThreshold = 25 * time.Millisecond
215 1 : }
216 2 : if o.HealthyInterval == 0 {
217 1 : o.HealthyInterval = 15 * time.Second
218 1 : }
219 2 : if o.UnhealthySamplingInterval == 0 {
220 1 : o.UnhealthySamplingInterval = 100 * time.Millisecond
221 1 : }
222 2 : if o.UnhealthyOperationLatencyThreshold == nil {
223 1 : o.UnhealthyOperationLatencyThreshold = func() (time.Duration, bool) {
224 1 : return 100 * time.Millisecond, true
225 1 : }
226 : }
227 2 : if o.ElevatedWriteStallThresholdLag == 0 {
228 1 : o.ElevatedWriteStallThresholdLag = 60 * time.Second
229 1 : }
230 : }
231 :
232 : // EventListener is called on events, like log file creation.
233 : type EventListener interface {
234 : // LogCreated informs the listener of a log file creation.
235 : LogCreated(CreateInfo)
236 : }
237 :
238 : // CreateInfo contains info about a log file creation event.
239 : type CreateInfo struct {
240 : // JobID is the ID of the job the caused the WAL to be created.
241 : //
242 : // TODO(sumeer): for a file created later due to the need to failover, we
243 : // need to provide a JobID generator func in Options.
244 : JobID int
245 : // Path to the file. This includes the NumWAL, and implicitly or explicitly
246 : // includes the logNameIndex.
247 : Path string
248 : // IsSecondary is true if the file was created on the secondary.
249 : IsSecondary bool
250 : // Num is the WAL number.
251 : Num NumWAL
252 : // RecycledFileNum is the file number of a previous log file which was
253 : // recycled to create this one. Zero if recycling did not take place.
254 : RecycledFileNum base.DiskFileNum
255 : // Err contains any error.
256 : Err error
257 : }
258 :
259 : // Stats exposes stats used in Pebble metrics.
260 : //
261 : // NB: Metrics.WAL.{Size,BytesIn,BytesWritten} are not maintained by the wal
262 : // package.
263 : //
264 : // TODO(sumeer): with failover, Metrics.WAL.BytesWritten needs to be
265 : // maintained here.
266 : type Stats struct {
267 : // ObsoleteFileCount is the number of obsolete log files.
268 : ObsoleteFileCount int
269 : // ObsoleteFileSize is the total size of obsolete log files.
270 : ObsoleteFileSize uint64
271 : // LiveFileCount is the number of live log files.
272 : LiveFileCount int
273 : // LiveFileSize is the total size of live log files. This can be higher than
274 : // LiveSize due to log recycling (a live log file may be larger than the
275 : // size used in its latest incarnation), or failover (resulting in multiple
276 : // log files containing the same records).
277 : //
278 : // This is updated only when log files are closed, to minimize
279 : // synchronization.
280 : LiveFileSize uint64
281 : // Failover contains failover stats.
282 : Failover FailoverStats
283 : }
284 :
285 : // FailoverStats contains stats about WAL failover. These are empty if
286 : // failover is not configured.
287 : type FailoverStats struct {
288 : // DirSwitchCount is the number of times WAL writing has switched to a
289 : // different directory, either due to failover, when the current dir is
290 : // unhealthy, or to failback to the primary, when the primary is healthy
291 : // again.
292 : DirSwitchCount int64
293 : // The following durations do not account for continued background writes to
294 : // a directory that has been switched away from. These background writes can
295 : // happen because of queued records.
296 :
297 : // PrimaryWriteDuration is the cumulative duration for which WAL writes are
298 : // using the primary directory.
299 : PrimaryWriteDuration time.Duration
300 : // SecondaryWriteDuration is the cumulative duration for which WAL writes
301 : // are using the secondary directory.
302 : SecondaryWriteDuration time.Duration
303 : }
304 :
305 : // Manager handles all WAL work.
306 : //
307 : // - Obsolete can be called concurrently with WAL writing.
308 : // - WAL writing: Is done via Create, and the various Writer methods. These
309 : // are required to be serialized via external synchronization (specifically,
310 : // the caller does it via commitPipeline.mu).
311 : type Manager interface {
312 : // init initializes the Manager. init is called during DB initialization.
313 : init(o Options, initial Logs) error
314 :
315 : // List returns the virtual WALs in ascending order.
316 : List() (Logs, error)
317 : // Obsolete informs the manager that all virtual WALs less than
318 : // minUnflushedNum are obsolete. The callee can choose to recycle some
319 : // underlying log files, if !noRecycle. The log files that are not recycled,
320 : // and therefore can be deleted, are returned. The deletable files are no
321 : // longer tracked by the manager.
322 : Obsolete(minUnflushedNum NumWAL, noRecycle bool) (toDelete []DeletableLog, err error)
323 : // Create creates a new virtual WAL.
324 : //
325 : // NumWALs passed to successive Create calls must be monotonically
326 : // increasing, and be greater than any NumWAL seen earlier. The caller must
327 : // close the previous Writer before calling Create.
328 : //
329 : // jobID is used for the WALEventListener.
330 : Create(wn NumWAL, jobID int) (Writer, error)
331 : // ElevateWriteStallThresholdForFailover returns true if the caller should
332 : // use a high write stall threshold because the WALs are being written to
333 : // the secondary dir.
334 : ElevateWriteStallThresholdForFailover() bool
335 : // Stats returns the latest Stats.
336 : Stats() Stats
337 : // Close the manager.
338 : // REQUIRES: Writers and Readers have already been closed.
339 : Close() error
340 :
341 : // RecyclerForTesting exposes the internal LogRecycler.
342 : RecyclerForTesting() *LogRecycler
343 : }
344 :
345 : // DeletableLog contains information about a log file that can be deleted.
346 : type DeletableLog struct {
347 : vfs.FS
348 : // Path to the file.
349 : Path string
350 : NumWAL
351 : ApproxFileSize uint64
352 : }
353 :
354 : // SyncOptions has non-nil Done and Err when fsync is requested, else both are
355 : // nil.
356 : type SyncOptions struct {
357 : Done *sync.WaitGroup
358 : Err *error
359 : }
360 :
361 : // Writer writes to a virtual WAL. A Writer in standalone mode maps to a
362 : // single record.LogWriter. In failover mode, it can failover across multiple
363 : // physical log files.
364 : type Writer interface {
365 : // WriteRecord writes a complete record. The record is asynchronously
366 : // persisted to the underlying writer. If SyncOptions.Done != nil, the wait
367 : // group will be notified when durability is guaranteed or an error has
368 : // occurred (set in SyncOptions.Err). External synchronisation provided by
369 : // commitPipeline.mu guarantees that WriteRecord calls are serialized.
370 : //
371 : // The logicalOffset is the logical size of the WAL after this record is
372 : // written. If the WAL corresponds to a single log file, this is the offset
373 : // in that log file.
374 : //
375 : // Some Writer implementations may continue to read p after WriteRecord
376 : // returns. This is an obstacle to reusing p's memory. If the caller would
377 : // like to reuse p's memory, the caller may pass a non-nil [RefFunc].
378 : // If the Writer will retain p, it will invoke the [RefFunc] before
379 : // returning. When it's finished, it will invoke the func returned by the
380 : // [RefFunc] to release its reference.
381 : WriteRecord(p []byte, opts SyncOptions, ref RefFunc) (logicalOffset int64, err error)
382 : // Close the writer.
383 : Close() (logicalOffset int64, err error)
384 : // Metrics must be called after Close. The callee will no longer modify the
385 : // returned LogWriterMetrics.
386 : Metrics() record.LogWriterMetrics
387 : }
388 :
389 : // RefFunc holds funcs to increment a reference count associated with a record
390 : // passed to [Writer.WriteRecord]. See the comment on WriteRecord.
391 : type RefFunc func() (unref func())
392 :
393 : // Reader reads a virtual WAL.
394 : type Reader interface {
395 : // NextRecord returns a reader for the next record. It returns io.EOF if there
396 : // are no more records. The reader returned becomes stale after the next Next
397 : // call, and should no longer be used.
398 : NextRecord() (io.Reader, Offset, error)
399 : // Close the reader.
400 : Close() error
401 : }
402 :
403 : // Offset indicates the offset or position of a record within a WAL.
404 : type Offset struct {
405 : // PhysicalFile is the path to the physical file containing a particular
406 : // record.
407 : PhysicalFile string
408 : // Physical indicates the file offset at which a record begins within
409 : // the physical file named by PhysicalFile.
410 : Physical int64
411 : }
412 :
413 : // String implements fmt.Stringer, returning a string representation of the
414 : // offset.
415 1 : func (o Offset) String() string {
416 1 : return fmt.Sprintf("(%s: %d)", o.PhysicalFile, o.Physical)
417 1 : }
|