Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "bytes"
9 : "cmp"
10 : "io"
11 : "slices"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/batchrepr"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/record"
17 : "github.com/cockroachdb/pebble/vfs"
18 : "github.com/cockroachdb/redact"
19 : )
20 :
21 : // A LogicalLog identifies a logical WAL and its consituent segment files.
22 : type LogicalLog struct {
23 : Num NumWAL
24 : // segments contains the list of the consistuent physical segment files that
25 : // make up the single logical WAL file. segments is ordered by increasing
26 : // logIndex.
27 : segments []segment
28 : }
29 :
30 : // A segment represents an individual physical file that makes up a contiguous
31 : // segment of a logical WAL. If a failover occurred during a WAL's lifetime, a
32 : // WAL may be composed of multiple segments.
33 : type segment struct {
34 : logNameIndex LogNameIndex
35 : dir Dir
36 : }
37 :
38 : // String implements fmt.Stringer.
39 0 : func (s segment) String() string {
40 0 : return redact.StringWithoutMarkers(s)
41 0 : }
42 :
43 : // SafeFormat implements redact.SafeFormatter.
44 1 : func (s segment) SafeFormat(w redact.SafePrinter, _ rune) {
45 1 : w.Printf("(%s,%s)", errors.Safe(s.dir.Dirname), s.logNameIndex)
46 1 : }
47 :
48 : // NumSegments returns the number of constituent physical log files that make up
49 : // the log.
50 1 : func (ll LogicalLog) NumSegments() int { return len(ll.segments) }
51 :
52 : // SegmentLocation returns the FS and path for the i-th physical segment file.
53 1 : func (ll LogicalLog) SegmentLocation(i int) (vfs.FS, string) {
54 1 : s := ll.segments[i]
55 1 : path := s.dir.FS.PathJoin(s.dir.Dirname, makeLogFilename(ll.Num, s.logNameIndex))
56 1 : return s.dir.FS, path
57 1 : }
58 :
59 : // PhysicalSize stats each of the log's physical files, summing their sizes.
60 0 : func (ll LogicalLog) PhysicalSize() (uint64, error) {
61 0 : var size uint64
62 0 : for i := range ll.segments {
63 0 : fs, path := ll.SegmentLocation(i)
64 0 : stat, err := fs.Stat(path)
65 0 : if err != nil {
66 0 : return 0, err
67 0 : }
68 0 : size += uint64(stat.Size())
69 : }
70 0 : return size, nil
71 : }
72 :
73 : // OpenForRead a logical WAL for reading.
74 1 : func (ll LogicalLog) OpenForRead() Reader {
75 1 : return newVirtualWALReader(ll)
76 1 : }
77 :
78 : // String implements fmt.Stringer.
79 1 : func (ll LogicalLog) String() string {
80 1 : return redact.StringWithoutMarkers(ll)
81 1 : }
82 :
83 : // SafeFormat implements redact.SafeFormatter.
84 1 : func (ll LogicalLog) SafeFormat(w redact.SafePrinter, _ rune) {
85 1 : w.Printf("%s: {", base.DiskFileNum(ll.Num).String())
86 1 : for i := range ll.segments {
87 1 : if i > 0 {
88 1 : w.SafeString(", ")
89 1 : }
90 1 : w.Print(ll.segments[i])
91 : }
92 1 : w.SafeString("}")
93 : }
94 :
95 : // appendDeletableLogs appends all of the LogicalLog's constituent physical
96 : // files as DeletableLogs to dst, returning the modified slice.
97 : // AppendDeletableLogs will Stat physical files to determine physical sizes.
98 : // AppendDeletableLogs does not make any judgmenet on whether a log file is
99 : // obsolete, so callers must take care not to delete logs that are still
100 : // unflushed.
101 1 : func appendDeletableLogs(dst []DeletableLog, ll LogicalLog) ([]DeletableLog, error) {
102 1 : for i := range ll.segments {
103 1 : fs, path := ll.SegmentLocation(i)
104 1 : stat, err := fs.Stat(path)
105 1 : if err != nil {
106 0 : return dst, err
107 0 : }
108 1 : dst = append(dst, DeletableLog{
109 1 : FS: fs,
110 1 : Path: path,
111 1 : NumWAL: ll.Num,
112 1 : ApproxFileSize: uint64(stat.Size()),
113 1 : })
114 : }
115 1 : return dst, nil
116 : }
117 :
118 : // Scan finds all log files in the provided directories. It returns an
119 : // ordered list of WALs in increasing NumWAL order.
120 1 : func Scan(dirs ...Dir) (Logs, error) {
121 1 : var fa FileAccumulator
122 1 : for _, d := range dirs {
123 1 : ls, err := d.FS.List(d.Dirname)
124 1 : if err != nil {
125 0 : return nil, errors.Wrapf(err, "reading %q", d.Dirname)
126 0 : }
127 1 : for _, name := range ls {
128 1 : _, err := fa.maybeAccumulate(d.FS, d.Dirname, name)
129 1 : if err != nil {
130 0 : return nil, err
131 0 : }
132 : }
133 : }
134 1 : return fa.wals, nil
135 : }
136 :
137 : // FileAccumulator parses and accumulates log files.
138 : type FileAccumulator struct {
139 : wals []LogicalLog
140 : }
141 :
142 : // MaybeAccumulate parses the provided path's filename. If the filename
143 : // indicates the file is a write-ahead log, MaybeAccumulate updates its internal
144 : // state to remember the file and returns isLogFile=true. An error is returned
145 : // if the file is a duplicate.
146 0 : func (a *FileAccumulator) MaybeAccumulate(fs vfs.FS, path string) (isLogFile bool, err error) {
147 0 : filename := fs.PathBase(path)
148 0 : dirname := fs.PathDir(path)
149 0 : return a.maybeAccumulate(fs, dirname, filename)
150 0 : }
151 :
152 : // Finish returns a Logs constructed from the physical files observed through
153 : // MaybeAccumulate.
154 0 : func (a *FileAccumulator) Finish() Logs {
155 0 : wals := a.wals
156 0 : a.wals = nil
157 0 : return wals
158 0 : }
159 :
160 : func (a *FileAccumulator) maybeAccumulate(
161 : fs vfs.FS, dirname, name string,
162 1 : ) (isLogFile bool, err error) {
163 1 : dfn, li, ok := ParseLogFilename(name)
164 1 : if !ok {
165 1 : return false, nil
166 1 : }
167 : // Have we seen this logical log number yet?
168 1 : i, found := slices.BinarySearchFunc(a.wals, dfn, func(lw LogicalLog, n NumWAL) int {
169 1 : return cmp.Compare(lw.Num, n)
170 1 : })
171 1 : if !found {
172 1 : a.wals = slices.Insert(a.wals, i, LogicalLog{Num: dfn, segments: make([]segment, 0, 1)})
173 1 : }
174 : // Ensure we haven't seen this log index yet, and find where it
175 : // slots within this log's segments.
176 1 : j, found := slices.BinarySearchFunc(a.wals[i].segments, li, func(s segment, li LogNameIndex) int {
177 1 : return cmp.Compare(s.logNameIndex, li)
178 1 : })
179 1 : if found {
180 0 : return false, errors.Errorf("wal: duplicate logIndex=%s for WAL %s in %s and %s",
181 0 : li, dfn, dirname, a.wals[i].segments[j].dir.Dirname)
182 0 : }
183 1 : a.wals[i].segments = slices.Insert(a.wals[i].segments, j, segment{logNameIndex: li, dir: Dir{
184 1 : FS: fs,
185 1 : Dirname: dirname,
186 1 : }})
187 1 : return true, nil
188 : }
189 :
190 : // Logs holds a collection of WAL files, in increasing order of NumWAL.
191 : type Logs []LogicalLog
192 :
193 : // Get retrieves the WAL with the given number if present. The second return
194 : // value indicates whether or not the WAL was found.
195 0 : func (l Logs) Get(num NumWAL) (LogicalLog, bool) {
196 0 : i, found := slices.BinarySearchFunc(l, num, func(lw LogicalLog, n NumWAL) int {
197 0 : return cmp.Compare(lw.Num, n)
198 0 : })
199 0 : if !found {
200 0 : return LogicalLog{}, false
201 0 : }
202 0 : return l[i], true
203 : }
204 :
205 1 : func newVirtualWALReader(wal LogicalLog) *virtualWALReader {
206 1 : return &virtualWALReader{
207 1 : LogicalLog: wal,
208 1 : currIndex: -1,
209 1 : }
210 1 : }
211 :
212 : // A virtualWALReader takes an ordered sequence of physical WAL files
213 : // ("segments") and implements the wal.Reader interface, providing a merged view
214 : // of the WAL's logical contents. It's responsible for filtering duplicate
215 : // records which may be shared by the tail of a segment file and the head of its
216 : // successor.
217 : type virtualWALReader struct {
218 : // VirtualWAL metadata.
219 : LogicalLog
220 :
221 : // State pertaining to the current position of the reader within the virtual
222 : // WAL and its constituent physical files.
223 : currIndex int
224 : currFile vfs.File
225 : currReader *record.Reader
226 : // off describes the current Offset within the WAL.
227 : off Offset
228 : // lastSeqNum is the sequence number of the batch contained within the last
229 : // record returned to the user. A virtual WAL may be split across a sequence
230 : // of several physical WAL files. The tail of one physical WAL may be
231 : // duplicated within the head of the next physical WAL file. We use
232 : // contained batches' sequence numbers to deduplicate. This lastSeqNum field
233 : // should monotonically increase as we iterate over the WAL files. If we
234 : // ever observe a batch encoding a sequence number <= lastSeqNum, we must
235 : // have already returned the batch and should skip it.
236 : lastSeqNum base.SeqNum
237 : // recordBuf is a buffer used to hold the latest record read from a physical
238 : // file, and then returned to the user. A pointer to this buffer is returned
239 : // directly to the caller of NextRecord.
240 : recordBuf bytes.Buffer
241 : }
242 :
243 : // *virtualWALReader implements wal.Reader.
244 : var _ Reader = (*virtualWALReader)(nil)
245 :
246 : // NextRecord returns a reader for the next record. It returns io.EOF if there
247 : // are no more records. The reader returned becomes stale after the next
248 : // NextRecord call, and should no longer be used.
249 1 : func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
250 1 : // On the first call, we need to open the first file.
251 1 : if r.currIndex < 0 {
252 1 : err := r.nextFile()
253 1 : if err != nil {
254 0 : return nil, Offset{}, err
255 0 : }
256 : }
257 :
258 1 : for {
259 1 : // Update our current physical offset to match the current file offset.
260 1 : r.off.Physical = r.currReader.Offset()
261 1 : // Obtain a Reader for the next record within this log file.
262 1 : rec, err := r.currReader.Next()
263 1 : if errors.Is(err, io.EOF) {
264 1 : // This file is exhausted; continue to the next.
265 1 : err := r.nextFile()
266 1 : if err != nil {
267 1 : return nil, r.off, err
268 1 : }
269 1 : continue
270 : }
271 :
272 : // Copy the record into a buffer. This ensures we read its entirety so
273 : // that NextRecord returns the next record, even if the caller never
274 : // exhausts the previous record's Reader. The record.Reader requires the
275 : // record to be exhausted to read all of the record's chunks before
276 : // attempting to read the next record. Buffering also also allows us to
277 : // easily read the header of the batch down below for deduplication.
278 1 : r.recordBuf.Reset()
279 1 : if err == nil {
280 1 : _, err = io.Copy(&r.recordBuf, rec)
281 1 : }
282 : // The record may be malformed. This is expected during a WAL failover,
283 : // because the tail of a WAL may be only partially written or otherwise
284 : // unclean because of WAL recycling and the inability to write the EOF
285 : // trailer record. If this isn't the last file, we silently ignore the
286 : // invalid record at the tail and proceed to the next file. If it is
287 : // the last file, bubble the error up and let the client decide what to
288 : // do with it. If the virtual WAL is the most recent WAL, Open may also
289 : // decide to ignore it because it's consistent with an incomplete
290 : // in-flight write at the time of process exit/crash. See #453.
291 1 : if record.IsInvalidRecord(err) && r.currIndex < len(r.segments)-1 {
292 0 : if err := r.nextFile(); err != nil {
293 0 : return nil, r.off, err
294 0 : }
295 0 : continue
296 1 : } else if err != nil {
297 0 : return nil, r.off, err
298 0 : }
299 :
300 : // We may observe repeat records between the physical files that make up
301 : // a virtual WAL because inflight writes to a file on a stalled disk may
302 : // or may not end up completing. WAL records always contain encoded
303 : // batches, and batches that contain data can be uniquely identifed by
304 : // sequence number.
305 : //
306 : // Parse the batch header.
307 1 : h, ok := batchrepr.ReadHeader(r.recordBuf.Bytes())
308 1 : if !ok {
309 0 : // Failed to read the batch header because the record was smaller
310 0 : // than the length of a batch header. This is unexpected. The record
311 0 : // envelope successfully decoded and the checkums of the individual
312 0 : // record fragment(s) validated, so the writer truly wrote an
313 0 : // invalid batch. During Open WAL recovery treats this as
314 0 : // corruption. We could return the record to the caller, allowing
315 0 : // the caller to interpret it as corruption, but it seems safer to
316 0 : // be explicit and surface the corruption error here.
317 0 : return nil, r.off, base.CorruptionErrorf("pebble: corrupt log file logNum=%d, logNameIndex=%s: invalid batch",
318 0 : r.Num, errors.Safe(r.segments[r.currIndex].logNameIndex))
319 0 : }
320 :
321 : // There's a subtlety necessitated by LogData operations. A LogData
322 : // applied to a batch results in data appended to the WAL in a batch
323 : // format, but the data is never applied to the memtable or LSM. A batch
324 : // only containing LogData will repeat a sequence number. We skip these
325 : // batches because they're not relevant for recovery and we do not want
326 : // to mistakenly deduplicate the batch containing KVs at the same
327 : // sequence number. We can differentiate LogData-only batches through
328 : // their batch headers: they'll encode a count of zero.
329 1 : if h.Count == 0 {
330 1 : continue
331 : }
332 :
333 : // If we've already observed a sequence number >= this batch's sequence
334 : // number, we must've already returned this record to the client. Skip
335 : // it.
336 1 : if h.SeqNum <= r.lastSeqNum {
337 1 : continue
338 : }
339 1 : r.lastSeqNum = h.SeqNum
340 1 : return &r.recordBuf, r.off, nil
341 : }
342 : }
343 :
344 : // Close closes the reader, releasing open resources.
345 1 : func (r *virtualWALReader) Close() error {
346 1 : if r.currFile != nil {
347 1 : if err := r.currFile.Close(); err != nil {
348 0 : return err
349 0 : }
350 : }
351 1 : return nil
352 : }
353 :
354 : // nextFile advances the internal state to the next physical segment file.
355 1 : func (r *virtualWALReader) nextFile() error {
356 1 : if r.currFile != nil {
357 1 : err := r.currFile.Close()
358 1 : r.currFile = nil
359 1 : if err != nil {
360 0 : return err
361 0 : }
362 : }
363 1 : r.currIndex++
364 1 : if r.currIndex >= len(r.segments) {
365 1 : return io.EOF
366 1 : }
367 :
368 1 : fs, path := r.LogicalLog.SegmentLocation(r.currIndex)
369 1 : r.off.PreviousFilesBytes += r.off.Physical
370 1 : r.off.PhysicalFile = path
371 1 : r.off.Physical = 0
372 1 : var err error
373 1 : if r.currFile, err = fs.Open(path); err != nil {
374 0 : return errors.Wrapf(err, "opening WAL file segment %q", path)
375 0 : }
376 1 : r.currReader = record.NewReader(r.currFile, base.DiskFileNum(r.Num))
377 1 : return nil
378 : }
|