Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "strconv"
11 : "strings"
12 : "sync"
13 : "time"
14 :
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/record"
17 : "github.com/cockroachdb/pebble/vfs"
18 : "github.com/prometheus/client_golang/prometheus"
19 : )
20 :
21 : // TODO(sumeer): write a high-level comment describing the approach.
22 :
23 : // Dir is used for storing log files.
24 : type Dir struct {
25 : FS vfs.FS
26 : Dirname string
27 : }
28 :
29 : // NumWAL is the number of the virtual WAL. It can map to one or more physical
30 : // log files. In standalone mode, it will map to exactly one log file. In
31 : // failover mode, it can map to many log files, which are totally ordered
32 : // (using a dense logNameIndex).
33 : //
34 : // In general, WAL refers to the virtual WAL, and file refers to a log file.
35 : // The Pebble MANIFEST only knows about virtual WALs and assigns numbers to
36 : // them. Additional mapping to one or more files happens in this package. If a
37 : // WAL maps to multiple files, the source of truth regarding that mapping is
38 : // the contents of the directories.
39 : type NumWAL base.DiskFileNum
40 :
41 : // String implements fmt.Stringer.
42 0 : func (s NumWAL) String() string { return base.DiskFileNum(s).String() }
43 :
44 : // LogNameIndex numbers log files within a WAL.
45 : type LogNameIndex uint32
46 :
47 : // String implements fmt.Stringer.
48 0 : func (li LogNameIndex) String() string {
49 0 : return fmt.Sprintf("%03d", li)
50 0 : }
51 :
52 : // makeLogFilename makes a log filename.
53 1 : func makeLogFilename(wn NumWAL, index LogNameIndex) string {
54 1 : if index == 0 {
55 1 : // Use a backward compatible name, for simplicity.
56 1 : return fmt.Sprintf("%s.log", base.DiskFileNum(wn).String())
57 1 : }
58 0 : return fmt.Sprintf("%s-%s.log", base.DiskFileNum(wn).String(), index)
59 : }
60 :
61 : // ParseLogFilename takes a base filename and parses it into its constituent
62 : // NumWAL and LogNameIndex. If the filename is not a log file, it returns false
63 : // for the final return value.
64 1 : func ParseLogFilename(name string) (NumWAL, LogNameIndex, bool) {
65 1 : i := strings.IndexByte(name, '.')
66 1 : if i < 0 || name[i:] != ".log" {
67 1 : return 0, 0, false
68 1 : }
69 1 : j := strings.IndexByte(name[:i], '-')
70 1 : if j < 0 {
71 1 : dfn, ok := base.ParseDiskFileNum(name[:i])
72 1 : if !ok {
73 0 : // We've considered returning an error for filenames that end in
74 0 : // '.log' but fail to parse correctly. We decided against it because
75 0 : // the '.log' suffix is used by Cockroach's daignostics log files.
76 0 : // It's conceivable that some of these found their way into a data
77 0 : // directory, and erroring would cause an issue for an existing
78 0 : // Cockroach deployment.
79 0 : return 0, 0, false
80 0 : }
81 1 : return NumWAL(dfn), 0, true
82 : }
83 0 : dfn, ok := base.ParseDiskFileNum(name[:j])
84 0 : if !ok {
85 0 : return 0, 0, false
86 0 : }
87 0 : li, err := strconv.ParseUint(name[j+1:i], 10, 64)
88 0 : if err != nil {
89 0 : return 0, 0, false
90 0 : }
91 0 : return NumWAL(dfn), LogNameIndex(li), true
92 : }
93 :
94 : // Options provides configuration for the Manager.
95 : type Options struct {
96 : // Primary dir for storing WAL files. It must already be created and synced
97 : // up to the root.
98 : Primary Dir
99 : // Secondary is used for failover. Optional. It must already be created and
100 : // synced up to the root.
101 : Secondary Dir
102 :
103 : // MinUnflushedLogNum is the smallest WAL number corresponding to
104 : // mutations that have not been flushed to a sstable.
105 : MinUnflushedWALNum NumWAL
106 :
107 : // Recycling configuration. Only files in the primary dir are recycled.
108 :
109 : // MaxNumRecyclableLogs is the maximum number of log files to maintain for
110 : // recycling.
111 : MaxNumRecyclableLogs int
112 :
113 : // Configuration for calling vfs.NewSyncingFile.
114 :
115 : // NoSyncOnClose is documented in SyncingFileOptions.
116 : NoSyncOnClose bool
117 : // BytesPerSync is documented in SyncingFileOptions.
118 : BytesPerSync int
119 : // PreallocateSize is documented in SyncingFileOptions.
120 : PreallocateSize func() int
121 :
122 : // MinSyncInterval is documented in Options.WALMinSyncInterval.
123 : MinSyncInterval func() time.Duration
124 : // FsyncLatency records fsync latency. This doesn't differentiate between
125 : // fsyncs on the primary and secondary dir.
126 : //
127 : // TODO(sumeer): consider separating out into two histograms.
128 : FsyncLatency prometheus.Histogram
129 : // QueueSemChan is the channel to pop from when popping from queued records
130 : // that have requested a sync. It's original purpose was to function as a
131 : // semaphore that prevents the record.LogWriter.flusher.syncQueue from
132 : // overflowing (which will cause a panic). It is still useful in that role
133 : // when the WALManager is configured in standalone mode. In failover mode
134 : // there is no syncQueue, so the pushback into the commit pipeline is
135 : // unnecessary, but possibly harmless.
136 : QueueSemChan chan struct{}
137 :
138 : // Logger for logging.
139 : Logger base.Logger
140 :
141 : // EventListener is called on events, like log file creation.
142 : EventListener EventListener
143 :
144 : FailoverOptions
145 : }
146 :
147 : // Dirs returns the primary Dir and the secondary if provided.
148 1 : func (o *Options) Dirs() []Dir {
149 1 : if o.Secondary == (Dir{}) {
150 1 : return []Dir{o.Primary}
151 1 : }
152 0 : return []Dir{o.Primary, o.Secondary}
153 : }
154 :
155 : // FailoverOptions are options that are specific to failover mode.
156 : type FailoverOptions struct {
157 : // PrimaryDirProbeInterval is the interval for probing the primary dir, when
158 : // the WAL is being written to the secondary, to decide when to fail back.
159 : PrimaryDirProbeInterval time.Duration
160 : // HealthyProbeLatencyThreshold is the latency threshold to declare that the
161 : // primary is healthy again.
162 : HealthyProbeLatencyThreshold time.Duration
163 : // HealthyInterval is the time interval over which the probes have to be
164 : // healthy. That is, we look at probe history of length
165 : // HealthyInterval/PrimaryDirProbeInterval.
166 : HealthyInterval time.Duration
167 :
168 : // UnhealthySamplingInterval is the interval for sampling ongoing calls and
169 : // errors in the latest LogWriter.
170 : UnhealthySamplingInterval time.Duration
171 : // UnhealthyOperationLatencyThreshold is the latency threshold that is
172 : // considered unhealthy, for operations done by a LogWriter.
173 : UnhealthyOperationLatencyThreshold func() time.Duration
174 :
175 : // ElevatedWriteStallThresholdLag is the duration for which an elevated
176 : // threshold should continue after a switch back to the primary dir. This is
177 : // because we may have accumulated many unflushed memtables and flushing
178 : // them can take some time. Maybe set to 60s.
179 : ElevatedWriteStallThresholdLag time.Duration
180 :
181 : // timeSource is only non-nil for tests.
182 : timeSource
183 :
184 : monitorIterationForTesting chan<- struct{}
185 : proberIterationForTesting chan<- struct{}
186 : monitorStateForTesting func(numSwitches int, ongoingLatencyAtSwitch time.Duration)
187 : logWriterCreatedForTesting chan<- struct{}
188 : }
189 :
190 : // EventListener is called on events, like log file creation.
191 : type EventListener interface {
192 : // LogCreated informs the listener of a log file creation.
193 : LogCreated(CreateInfo)
194 : }
195 :
196 : // CreateInfo contains info about a log file creation event.
197 : type CreateInfo struct {
198 : // JobID is the ID of the job the caused the WAL to be created.
199 : //
200 : // TODO(sumeer): for a file created later due to the need to failover, we
201 : // need to provide a JobID generator func in Options.
202 : JobID int
203 : // Path to the file. This includes the NumWAL, and implicitly or explicitly
204 : // includes the logNameIndex.
205 : Path string
206 : // IsSecondary is true if the file was created on the secondary.
207 : IsSecondary bool
208 : // Num is the WAL number.
209 : Num NumWAL
210 : // RecycledFileNum is the file number of a previous log file which was
211 : // recycled to create this one. Zero if recycling did not take place.
212 : RecycledFileNum base.DiskFileNum
213 : // Err contains any error.
214 : Err error
215 : }
216 :
217 : // Stats exposes stats used in Pebble metrics.
218 : //
219 : // NB: Metrics.WAL.{Size,BytesIn,BytesWritten} are not maintained by the wal
220 : // package.
221 : //
222 : // TODO(sumeer): with failover, Metrics.WAL.BytesWritten needs to be
223 : // maintained here.
224 : type Stats struct {
225 : // ObsoleteFileCount is the number of obsolete log files.
226 : ObsoleteFileCount int
227 : // ObsoleteFileSize is the total size of obsolete log files.
228 : ObsoleteFileSize uint64
229 : // LiveFileCount is the number of live log files.
230 : LiveFileCount int
231 : // LiveFileSize is the total size of live log files. This can be higher than
232 : // LiveSize due to log recycling (a live log file may be larger than the
233 : // size used in its latest incarnation), or failover (resulting in multiple
234 : // log files containing the same records).
235 : //
236 : // This is updated only when log files are closed, to minimize
237 : // synchronization.
238 : LiveFileSize uint64
239 : }
240 :
241 : // Manager handles all WAL work.
242 : //
243 : // - Init will be called during DB initialization.
244 : // - Obsolete can be called concurrently with WAL writing.
245 : // - WAL writing: Is done via Create, and the various Writer methods. These
246 : // are required to be serialized via external synchronization (specifically,
247 : // the caller does it via commitPipeline.mu).
248 : type Manager interface {
249 : // Init initializes the Manager.
250 : Init(o Options, initial Logs) error
251 : // List returns the virtual WALs in ascending order.
252 : List() (Logs, error)
253 : // Obsolete informs the manager that all virtual WALs less than
254 : // minUnflushedNum are obsolete. The callee can choose to recycle some
255 : // underlying log files, if !noRecycle. The log files that are not recycled,
256 : // and therefore can be deleted, are returned. The deletable files are no
257 : // longer tracked by the manager.
258 : Obsolete(minUnflushedNum NumWAL, noRecycle bool) (toDelete []DeletableLog, err error)
259 : // Create creates a new virtual WAL.
260 : //
261 : // NumWALs passed to successive Create calls must be monotonically
262 : // increasing, and be greater than any NumWAL seen earlier. The caller must
263 : // close the previous Writer before calling Create.
264 : //
265 : // jobID is used for the WALEventListener.
266 : Create(wn NumWAL, jobID int) (Writer, error)
267 : // ElevateWriteStallThresholdForFailover returns true if the caller should
268 : // use a high write stall threshold because the WALs are being written to
269 : // the secondary dir.
270 : ElevateWriteStallThresholdForFailover() bool
271 : // Stats returns the latest Stats.
272 : Stats() Stats
273 : // Close the manager.
274 : // REQUIRES: Writers and Readers have already been closed.
275 : Close() error
276 :
277 : // RecyclerForTesting exposes the internal LogRecycler.
278 : RecyclerForTesting() *LogRecycler
279 : }
280 :
281 : // DeletableLog contains information about a log file that can be deleted.
282 : type DeletableLog struct {
283 : vfs.FS
284 : // Path to the file.
285 : Path string
286 : NumWAL
287 : ApproxFileSize uint64
288 : }
289 :
290 : // SyncOptions has non-nil Done and Err when fsync is requested, else both are
291 : // nil.
292 : type SyncOptions struct {
293 : Done *sync.WaitGroup
294 : Err *error
295 : }
296 :
297 : // Writer writes to a virtual WAL. A Writer in standalone mode maps to a
298 : // single record.LogWriter. In failover mode, it can failover across multiple
299 : // physical log files.
300 : type Writer interface {
301 : // WriteRecord writes a complete record. The record is asynchronously
302 : // persisted to the underlying writer. If SyncOptions.Done != nil, the wait
303 : // group will be notified when durability is guaranteed or an error has
304 : // occurred (set in SyncOptions.Err). External synchronisation provided by
305 : // commitPipeline.mu guarantees that WriteRecord calls are serialized.
306 : //
307 : // The logicalOffset is the logical size of the WAL after this record is
308 : // written. If the WAL corresponds to a single log file, this is the offset
309 : // in that log file.
310 : WriteRecord(p []byte, opts SyncOptions) (logicalOffset int64, err error)
311 : // Close the writer.
312 : Close() (logicalOffset int64, err error)
313 : // Metrics must be called after Close. The callee will no longer modify the
314 : // returned LogWriterMetrics.
315 : Metrics() record.LogWriterMetrics
316 : }
317 :
318 : // Reader reads a virtual WAL.
319 : type Reader interface {
320 : // NextRecord returns a reader for the next record. It returns io.EOF if there
321 : // are no more records. The reader returned becomes stale after the next Next
322 : // call, and should no longer be used.
323 : NextRecord() (io.Reader, Offset, error)
324 : // Close the reader.
325 : Close() error
326 : }
327 :
328 : // Offset indicates the offset or position of a record within a WAL.
329 : type Offset struct {
330 : // PhysicalFile is the path to the physical file containing a particular
331 : // record.
332 : PhysicalFile string
333 : // Physical indicates the file offset at which a record begins within
334 : // the physical file named by PhysicalFile.
335 : Physical int64
336 : }
337 :
338 : // String implements fmt.Stringer, returning a string representation of the
339 : // offset.
340 0 : func (o Offset) String() string {
341 0 : return fmt.Sprintf("(%s: %d)", o.PhysicalFile, o.Physical)
342 0 : }
|