Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "strconv"
11 : "strings"
12 : "sync"
13 : "time"
14 :
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/record"
17 : "github.com/cockroachdb/pebble/vfs"
18 : "github.com/prometheus/client_golang/prometheus"
19 : )
20 :
21 : // TODO(sumeer): write a high-level comment describing the approach.
22 :
23 : // Dir is used for storing log files.
24 : type Dir struct {
25 : FS vfs.FS
26 : Dirname string
27 : }
28 :
29 : // NumWAL is the number of the virtual WAL. It can map to one or more physical
30 : // log files. In standalone mode, it will map to exactly one log file. In
31 : // failover mode, it can map to many log files, which are totally ordered
32 : // (using a dense logNameIndex).
33 : //
34 : // In general, WAL refers to the virtual WAL, and file refers to a log file.
35 : // The Pebble MANIFEST only knows about virtual WALs and assigns numbers to
36 : // them. Additional mapping to one or more files happens in this package. If a
37 : // WAL maps to multiple files, the source of truth regarding that mapping is
38 : // the contents of the directories.
39 : type NumWAL base.DiskFileNum
40 :
41 : // String implements fmt.Stringer.
42 1 : func (s NumWAL) String() string { return base.DiskFileNum(s).String() }
43 :
44 : // LogNameIndex numbers log files within a WAL.
45 : type LogNameIndex uint32
46 :
47 : // String implements fmt.Stringer.
48 1 : func (li LogNameIndex) String() string {
49 1 : return fmt.Sprintf("%03d", li)
50 1 : }
51 :
52 : // makeLogFilename makes a log filename.
53 2 : func makeLogFilename(wn NumWAL, index LogNameIndex) string {
54 2 : if index == 0 {
55 2 : // Use a backward compatible name, for simplicity.
56 2 : return fmt.Sprintf("%s.log", base.DiskFileNum(wn).String())
57 2 : }
58 1 : return fmt.Sprintf("%s-%s.log", base.DiskFileNum(wn).String(), index)
59 : }
60 :
61 : // ParseLogFilename takes a base filename and parses it into its constituent
62 : // NumWAL and LogNameIndex. If the filename is not a log file, it returns false
63 : // for the final return value.
64 2 : func ParseLogFilename(name string) (NumWAL, LogNameIndex, bool) {
65 2 : i := strings.IndexByte(name, '.')
66 2 : if i < 0 || name[i:] != ".log" {
67 2 : return 0, 0, false
68 2 : }
69 2 : j := strings.IndexByte(name[:i], '-')
70 2 : if j < 0 {
71 2 : dfn, ok := base.ParseDiskFileNum(name[:i])
72 2 : if !ok {
73 1 : // We've considered returning an error for filenames that end in
74 1 : // '.log' but fail to parse correctly. We decided against it because
75 1 : // the '.log' suffix is used by Cockroach's daignostics log files.
76 1 : // It's conceivable that some of these found their way into a data
77 1 : // directory, and erroring would cause an issue for an existing
78 1 : // Cockroach deployment.
79 1 : return 0, 0, false
80 1 : }
81 2 : return NumWAL(dfn), 0, true
82 : }
83 1 : dfn, ok := base.ParseDiskFileNum(name[:j])
84 1 : if !ok {
85 1 : return 0, 0, false
86 1 : }
87 1 : li, err := strconv.ParseUint(name[j+1:i], 10, 64)
88 1 : if err != nil {
89 1 : return 0, 0, false
90 1 : }
91 1 : return NumWAL(dfn), LogNameIndex(li), true
92 : }
93 :
94 : // Options provides configuration for the Manager.
95 : type Options struct {
96 : // Primary dir for storing WAL files. It must already be created and synced
97 : // up to the root.
98 : Primary Dir
99 : // Secondary is used for failover. Optional. It must already be created and
100 : // synced up to the root.
101 : Secondary Dir
102 :
103 : // MinUnflushedLogNum is the smallest WAL number corresponding to
104 : // mutations that have not been flushed to a sstable.
105 : MinUnflushedWALNum NumWAL
106 :
107 : // Recycling configuration. Only files in the primary dir are recycled.
108 :
109 : // MaxNumRecyclableLogs is the maximum number of log files to maintain for
110 : // recycling.
111 : MaxNumRecyclableLogs int
112 :
113 : // Configuration for calling vfs.NewSyncingFile.
114 :
115 : // NoSyncOnClose is documented in SyncingFileOptions.
116 : NoSyncOnClose bool
117 : // BytesPerSync is documented in SyncingFileOptions.
118 : BytesPerSync int
119 : // PreallocateSize is documented in SyncingFileOptions.
120 : PreallocateSize func() int
121 :
122 : // MinSyncInterval is documented in Options.WALMinSyncInterval.
123 : MinSyncInterval func() time.Duration
124 : // FsyncLatency records fsync latency. This doesn't differentiate between
125 : // fsyncs on the primary and secondary dir.
126 : //
127 : // TODO(sumeer): consider separating out into two histograms.
128 : FsyncLatency prometheus.Histogram
129 : // QueueSemChan is the channel to pop from when popping from queued records
130 : // that have requested a sync. It's original purpose was to function as a
131 : // semaphore that prevents the record.LogWriter.flusher.syncQueue from
132 : // overflowing (which will cause a panic). It is still useful in that role
133 : // when the WALManager is configured in standalone mode. In failover mode
134 : // there is no syncQueue, so the pushback into the commit pipeline is
135 : // unnecessary, but possibly harmless.
136 : QueueSemChan chan struct{}
137 :
138 : // Logger for logging.
139 : Logger base.Logger
140 :
141 : // EventListener is called on events, like log file creation.
142 : EventListener EventListener
143 :
144 : FailoverOptions
145 : }
146 :
147 : // Init constructs and initializes a WAL manager from the provided options and
148 : // the set of initial logs.
149 2 : func Init(o Options, initial Logs) (Manager, error) {
150 2 : var m Manager
151 2 : if o.Secondary == (Dir{}) {
152 2 : m = new(StandaloneManager)
153 2 : } else {
154 1 : m = new(failoverManager)
155 1 : }
156 2 : if err := m.init(o, initial); err != nil {
157 1 : return nil, err
158 1 : }
159 2 : return m, nil
160 : }
161 :
162 : // Dirs returns the primary Dir and the secondary if provided.
163 2 : func (o *Options) Dirs() []Dir {
164 2 : if o.Secondary == (Dir{}) {
165 2 : return []Dir{o.Primary}
166 2 : }
167 1 : return []Dir{o.Primary, o.Secondary}
168 : }
169 :
170 : // FailoverOptions are options that are specific to failover mode.
171 : type FailoverOptions struct {
172 : // PrimaryDirProbeInterval is the interval for probing the primary dir, when
173 : // the WAL is being written to the secondary, to decide when to fail back.
174 : PrimaryDirProbeInterval time.Duration
175 : // HealthyProbeLatencyThreshold is the latency threshold to declare that the
176 : // primary is healthy again.
177 : HealthyProbeLatencyThreshold time.Duration
178 : // HealthyInterval is the time interval over which the probes have to be
179 : // healthy. That is, we look at probe history of length
180 : // HealthyInterval/PrimaryDirProbeInterval.
181 : HealthyInterval time.Duration
182 :
183 : // UnhealthySamplingInterval is the interval for sampling ongoing calls and
184 : // errors in the latest LogWriter.
185 : UnhealthySamplingInterval time.Duration
186 : // UnhealthyOperationLatencyThreshold is the latency threshold that is
187 : // considered unhealthy, for operations done by a LogWriter.
188 : UnhealthyOperationLatencyThreshold func() time.Duration
189 :
190 : // ElevatedWriteStallThresholdLag is the duration for which an elevated
191 : // threshold should continue after a switch back to the primary dir. This is
192 : // because we may have accumulated many unflushed memtables and flushing
193 : // them can take some time. Maybe set to 60s.
194 : ElevatedWriteStallThresholdLag time.Duration
195 :
196 : // timeSource is only non-nil for tests.
197 : timeSource
198 :
199 : monitorIterationForTesting chan<- struct{}
200 : proberIterationForTesting chan<- struct{}
201 : monitorStateForTesting func(numSwitches int, ongoingLatencyAtSwitch time.Duration)
202 : logWriterCreatedForTesting chan<- struct{}
203 : }
204 :
205 : // EnsureDefaults ensures that the default values for all options are set if a
206 : // valid value was not already specified.
207 1 : func (o *FailoverOptions) EnsureDefaults() {
208 1 : if o.PrimaryDirProbeInterval == 0 {
209 1 : o.PrimaryDirProbeInterval = time.Second
210 1 : }
211 1 : if o.HealthyProbeLatencyThreshold == 0 {
212 1 : o.HealthyProbeLatencyThreshold = 100 * time.Millisecond
213 1 : }
214 1 : if o.HealthyInterval == 0 {
215 1 : o.HealthyInterval = 2 * time.Minute
216 1 : }
217 1 : if o.UnhealthySamplingInterval == 0 {
218 1 : o.UnhealthySamplingInterval = 100 * time.Millisecond
219 1 : }
220 1 : if o.UnhealthyOperationLatencyThreshold == nil {
221 1 : o.UnhealthyOperationLatencyThreshold = func() time.Duration {
222 1 : return 200 * time.Millisecond
223 1 : }
224 : }
225 : }
226 :
227 : // EventListener is called on events, like log file creation.
228 : type EventListener interface {
229 : // LogCreated informs the listener of a log file creation.
230 : LogCreated(CreateInfo)
231 : }
232 :
233 : // CreateInfo contains info about a log file creation event.
234 : type CreateInfo struct {
235 : // JobID is the ID of the job the caused the WAL to be created.
236 : //
237 : // TODO(sumeer): for a file created later due to the need to failover, we
238 : // need to provide a JobID generator func in Options.
239 : JobID int
240 : // Path to the file. This includes the NumWAL, and implicitly or explicitly
241 : // includes the logNameIndex.
242 : Path string
243 : // IsSecondary is true if the file was created on the secondary.
244 : IsSecondary bool
245 : // Num is the WAL number.
246 : Num NumWAL
247 : // RecycledFileNum is the file number of a previous log file which was
248 : // recycled to create this one. Zero if recycling did not take place.
249 : RecycledFileNum base.DiskFileNum
250 : // Err contains any error.
251 : Err error
252 : }
253 :
254 : // Stats exposes stats used in Pebble metrics.
255 : //
256 : // NB: Metrics.WAL.{Size,BytesIn,BytesWritten} are not maintained by the wal
257 : // package.
258 : //
259 : // TODO(sumeer): with failover, Metrics.WAL.BytesWritten needs to be
260 : // maintained here.
261 : type Stats struct {
262 : // ObsoleteFileCount is the number of obsolete log files.
263 : ObsoleteFileCount int
264 : // ObsoleteFileSize is the total size of obsolete log files.
265 : ObsoleteFileSize uint64
266 : // LiveFileCount is the number of live log files.
267 : LiveFileCount int
268 : // LiveFileSize is the total size of live log files. This can be higher than
269 : // LiveSize due to log recycling (a live log file may be larger than the
270 : // size used in its latest incarnation), or failover (resulting in multiple
271 : // log files containing the same records).
272 : //
273 : // This is updated only when log files are closed, to minimize
274 : // synchronization.
275 : LiveFileSize uint64
276 : // Failover contains failover stats.
277 : Failover FailoverStats
278 : }
279 :
280 : // FailoverStats contains stats about WAL failover. These are empty if
281 : // failover is not configured.
282 : type FailoverStats struct {
283 : // DirSwitchCount is the number of times WAL writing has switched to a
284 : // different directory, either due to failover, when the current dir is
285 : // unhealthy, or to failback to the primary, when the primary is healthy
286 : // again.
287 : DirSwitchCount int64
288 : // The following durations do not account for continued background writes to
289 : // a directory that has been switched away from. These background writes can
290 : // happen because of queued records.
291 :
292 : // PrimaryWriteDuration is the cumulative duration for which WAL writes are
293 : // using the primary directory.
294 : PrimaryWriteDuration time.Duration
295 : // SecondaryWriteDuration is the cumulative duration for which WAL writes
296 : // are using the secondary directory.
297 : SecondaryWriteDuration time.Duration
298 : }
299 :
300 : // Manager handles all WAL work.
301 : //
302 : // - Obsolete can be called concurrently with WAL writing.
303 : // - WAL writing: Is done via Create, and the various Writer methods. These
304 : // are required to be serialized via external synchronization (specifically,
305 : // the caller does it via commitPipeline.mu).
306 : type Manager interface {
307 : // init initializes the Manager. init is called during DB initialization.
308 : init(o Options, initial Logs) error
309 :
310 : // List returns the virtual WALs in ascending order.
311 : List() (Logs, error)
312 : // Obsolete informs the manager that all virtual WALs less than
313 : // minUnflushedNum are obsolete. The callee can choose to recycle some
314 : // underlying log files, if !noRecycle. The log files that are not recycled,
315 : // and therefore can be deleted, are returned. The deletable files are no
316 : // longer tracked by the manager.
317 : Obsolete(minUnflushedNum NumWAL, noRecycle bool) (toDelete []DeletableLog, err error)
318 : // Create creates a new virtual WAL.
319 : //
320 : // NumWALs passed to successive Create calls must be monotonically
321 : // increasing, and be greater than any NumWAL seen earlier. The caller must
322 : // close the previous Writer before calling Create.
323 : //
324 : // jobID is used for the WALEventListener.
325 : Create(wn NumWAL, jobID int) (Writer, error)
326 : // ElevateWriteStallThresholdForFailover returns true if the caller should
327 : // use a high write stall threshold because the WALs are being written to
328 : // the secondary dir.
329 : ElevateWriteStallThresholdForFailover() bool
330 : // Stats returns the latest Stats.
331 : Stats() Stats
332 : // Close the manager.
333 : // REQUIRES: Writers and Readers have already been closed.
334 : Close() error
335 :
336 : // RecyclerForTesting exposes the internal LogRecycler.
337 : RecyclerForTesting() *LogRecycler
338 : }
339 :
340 : // DeletableLog contains information about a log file that can be deleted.
341 : type DeletableLog struct {
342 : vfs.FS
343 : // Path to the file.
344 : Path string
345 : NumWAL
346 : ApproxFileSize uint64
347 : }
348 :
349 : // SyncOptions has non-nil Done and Err when fsync is requested, else both are
350 : // nil.
351 : type SyncOptions struct {
352 : Done *sync.WaitGroup
353 : Err *error
354 : }
355 :
356 : // Writer writes to a virtual WAL. A Writer in standalone mode maps to a
357 : // single record.LogWriter. In failover mode, it can failover across multiple
358 : // physical log files.
359 : type Writer interface {
360 : // WriteRecord writes a complete record. The record is asynchronously
361 : // persisted to the underlying writer. If SyncOptions.Done != nil, the wait
362 : // group will be notified when durability is guaranteed or an error has
363 : // occurred (set in SyncOptions.Err). External synchronisation provided by
364 : // commitPipeline.mu guarantees that WriteRecord calls are serialized.
365 : //
366 : // The logicalOffset is the logical size of the WAL after this record is
367 : // written. If the WAL corresponds to a single log file, this is the offset
368 : // in that log file.
369 : WriteRecord(p []byte, opts SyncOptions) (logicalOffset int64, err error)
370 : // Close the writer.
371 : Close() (logicalOffset int64, err error)
372 : // Metrics must be called after Close. The callee will no longer modify the
373 : // returned LogWriterMetrics.
374 : Metrics() record.LogWriterMetrics
375 : }
376 :
377 : // Reader reads a virtual WAL.
378 : type Reader interface {
379 : // NextRecord returns a reader for the next record. It returns io.EOF if there
380 : // are no more records. The reader returned becomes stale after the next Next
381 : // call, and should no longer be used.
382 : NextRecord() (io.Reader, Offset, error)
383 : // Close the reader.
384 : Close() error
385 : }
386 :
387 : // Offset indicates the offset or position of a record within a WAL.
388 : type Offset struct {
389 : // PhysicalFile is the path to the physical file containing a particular
390 : // record.
391 : PhysicalFile string
392 : // Physical indicates the file offset at which a record begins within
393 : // the physical file named by PhysicalFile.
394 : Physical int64
395 : }
396 :
397 : // String implements fmt.Stringer, returning a string representation of the
398 : // offset.
399 1 : func (o Offset) String() string {
400 1 : return fmt.Sprintf("(%s: %d)", o.PhysicalFile, o.Physical)
401 1 : }
|