Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "bytes"
9 : "cmp"
10 : "fmt"
11 : "io"
12 : "slices"
13 : "strings"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble/batchrepr"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/record"
19 : "github.com/cockroachdb/pebble/vfs"
20 : )
21 :
22 : // A LogicalLog identifies a logical WAL and its consituent segment files.
23 : type LogicalLog struct {
24 : Num NumWAL
25 : // segments contains the list of the consistuent physical segment files that
26 : // make up the single logical WAL file. segments is ordered by increasing
27 : // logIndex.
28 : segments []segment
29 : }
30 :
31 : // A segment represents an individual physical file that makes up a contiguous
32 : // segment of a logical WAL. If a failover occurred during a WAL's lifetime, a
33 : // WAL may be composed of multiple segments.
34 : type segment struct {
35 : logNameIndex LogNameIndex
36 : dir Dir
37 : }
38 :
39 : // String implements fmt.Stringer.
40 0 : func (s segment) String() string {
41 0 : return fmt.Sprintf("(%s,%s)", s.dir.Dirname, s.logNameIndex)
42 0 : }
43 :
44 : // NumSegments returns the number of constituent physical log files that make up
45 : // the log.
46 1 : func (ll LogicalLog) NumSegments() int { return len(ll.segments) }
47 :
48 : // SegmentLocation returns the FS and path for the i-th physical segment file.
49 1 : func (ll LogicalLog) SegmentLocation(i int) (vfs.FS, string) {
50 1 : s := ll.segments[i]
51 1 : path := s.dir.FS.PathJoin(s.dir.Dirname, makeLogFilename(ll.Num, s.logNameIndex))
52 1 : return s.dir.FS, path
53 1 : }
54 :
55 : // PhysicalSize stats each of the log's physical files, summing their sizes.
56 0 : func (ll LogicalLog) PhysicalSize() (uint64, error) {
57 0 : var size uint64
58 0 : for i := range ll.segments {
59 0 : fs, path := ll.SegmentLocation(i)
60 0 : stat, err := fs.Stat(path)
61 0 : if err != nil {
62 0 : return 0, err
63 0 : }
64 0 : size += uint64(stat.Size())
65 : }
66 0 : return size, nil
67 : }
68 :
69 : // OpenForRead a logical WAL for reading.
70 1 : func (ll LogicalLog) OpenForRead() Reader {
71 1 : return newVirtualWALReader(ll)
72 1 : }
73 :
74 : // String implements fmt.Stringer.
75 0 : func (ll LogicalLog) String() string {
76 0 : var sb strings.Builder
77 0 : sb.WriteString(base.DiskFileNum(ll.Num).String())
78 0 : sb.WriteString(": {")
79 0 : for i := range ll.segments {
80 0 : if i > 0 {
81 0 : sb.WriteString(", ")
82 0 : }
83 0 : sb.WriteString(ll.segments[i].String())
84 : }
85 0 : sb.WriteString("}")
86 0 : return sb.String()
87 : }
88 :
89 : // appendDeletableLogs appends all of the LogicalLog's constituent physical
90 : // files as DeletableLogs to dst, returning the modified slice.
91 : // AppendDeletableLogs will Stat physical files to determine physical sizes.
92 : // AppendDeletableLogs does not make any judgmenet on whether a log file is
93 : // obsolete, so callers must take care not to delete logs that are still
94 : // unflushed.
95 1 : func appendDeletableLogs(dst []DeletableLog, ll LogicalLog) ([]DeletableLog, error) {
96 1 : for i := range ll.segments {
97 1 : fs, path := ll.SegmentLocation(i)
98 1 : stat, err := fs.Stat(path)
99 1 : if err != nil {
100 0 : return dst, err
101 0 : }
102 1 : dst = append(dst, DeletableLog{
103 1 : FS: fs,
104 1 : Path: path,
105 1 : NumWAL: ll.Num,
106 1 : ApproxFileSize: uint64(stat.Size()),
107 1 : })
108 : }
109 1 : return dst, nil
110 : }
111 :
112 : // Scan finds all log files in the provided directories. It returns an
113 : // ordered list of WALs in increasing NumWAL order.
114 1 : func Scan(dirs ...Dir) (Logs, error) {
115 1 : var fa FileAccumulator
116 1 : for _, d := range dirs {
117 1 : ls, err := d.FS.List(d.Dirname)
118 1 : if err != nil {
119 0 : return nil, errors.Wrapf(err, "reading %q", d.Dirname)
120 0 : }
121 1 : for _, name := range ls {
122 1 : _, err := fa.maybeAccumulate(d.FS, d.Dirname, name)
123 1 : if err != nil {
124 0 : return nil, err
125 0 : }
126 : }
127 : }
128 1 : return fa.wals, nil
129 : }
130 :
131 : // FileAccumulator parses and accumulates log files.
132 : type FileAccumulator struct {
133 : wals []LogicalLog
134 : }
135 :
136 : // MaybeAccumulate parses the provided path's filename. If the filename
137 : // indicates the file is a write-ahead log, MaybeAccumulate updates its internal
138 : // state to remember the file and returns isLogFile=true. An error is returned
139 : // if the file is a duplicate.
140 0 : func (a *FileAccumulator) MaybeAccumulate(fs vfs.FS, path string) (isLogFile bool, err error) {
141 0 : filename := fs.PathBase(path)
142 0 : dirname := fs.PathDir(path)
143 0 : return a.maybeAccumulate(fs, dirname, filename)
144 0 : }
145 :
146 : // Finish returns a Logs constructed from the physical files observed through
147 : // MaybeAccumulate.
148 0 : func (a *FileAccumulator) Finish() Logs {
149 0 : wals := a.wals
150 0 : a.wals = nil
151 0 : return wals
152 0 : }
153 :
154 : func (a *FileAccumulator) maybeAccumulate(
155 : fs vfs.FS, dirname, name string,
156 1 : ) (isLogFile bool, err error) {
157 1 : dfn, li, ok := ParseLogFilename(name)
158 1 : if !ok {
159 1 : return false, nil
160 1 : }
161 : // Have we seen this logical log number yet?
162 1 : i, found := slices.BinarySearchFunc(a.wals, dfn, func(lw LogicalLog, n NumWAL) int {
163 1 : return cmp.Compare(lw.Num, n)
164 1 : })
165 1 : if !found {
166 1 : a.wals = slices.Insert(a.wals, i, LogicalLog{Num: dfn, segments: make([]segment, 0, 1)})
167 1 : }
168 : // Ensure we haven't seen this log index yet, and find where it
169 : // slots within this log's segments.
170 1 : j, found := slices.BinarySearchFunc(a.wals[i].segments, li, func(s segment, li LogNameIndex) int {
171 1 : return cmp.Compare(s.logNameIndex, li)
172 1 : })
173 1 : if found {
174 0 : return false, errors.Errorf("wal: duplicate logIndex=%s for WAL %s in %s and %s",
175 0 : li, dfn, dirname, a.wals[i].segments[j].dir.Dirname)
176 0 : }
177 1 : a.wals[i].segments = slices.Insert(a.wals[i].segments, j, segment{logNameIndex: li, dir: Dir{
178 1 : FS: fs,
179 1 : Dirname: dirname,
180 1 : }})
181 1 : return true, nil
182 : }
183 :
184 : // Logs holds a collection of WAL files, in increasing order of NumWAL.
185 : type Logs []LogicalLog
186 :
187 : // Get retrieves the WAL with the given number if present. The second return
188 : // value indicates whether or not the WAL was found.
189 1 : func (l Logs) Get(num NumWAL) (LogicalLog, bool) {
190 1 : i, found := slices.BinarySearchFunc(l, num, func(lw LogicalLog, n NumWAL) int {
191 1 : return cmp.Compare(lw.Num, n)
192 1 : })
193 1 : if !found {
194 0 : return LogicalLog{}, false
195 0 : }
196 1 : return l[i], true
197 : }
198 :
199 1 : func newVirtualWALReader(wal LogicalLog) *virtualWALReader {
200 1 : return &virtualWALReader{
201 1 : LogicalLog: wal,
202 1 : currIndex: -1,
203 1 : }
204 1 : }
205 :
206 : // A virtualWALReader takes an ordered sequence of physical WAL files
207 : // ("segments") and implements the wal.Reader interface, providing a merged view
208 : // of the WAL's logical contents. It's responsible for filtering duplicate
209 : // records which may be shared by the tail of a segment file and the head of its
210 : // successor.
211 : type virtualWALReader struct {
212 : // VirtualWAL metadata.
213 : LogicalLog
214 :
215 : // State pertaining to the current position of the reader within the virtual
216 : // WAL and its constituent physical files.
217 : currIndex int
218 : currFile vfs.File
219 : currReader *record.Reader
220 : // off describes the current Offset within the WAL.
221 : off Offset
222 : // lastSeqNum is the sequence number of the batch contained within the last
223 : // record returned to the user. A virtual WAL may be split across a sequence
224 : // of several physical WAL files. The tail of one physical WAL may be
225 : // duplicated within the head of the next physical WAL file. We use
226 : // contained batches' sequence numbers to deduplicate. This lastSeqNum field
227 : // should monotonically increase as we iterate over the WAL files. If we
228 : // ever observe a batch encoding a sequence number <= lastSeqNum, we must
229 : // have already returned the batch and should skip it.
230 : lastSeqNum base.SeqNum
231 : // recordBuf is a buffer used to hold the latest record read from a physical
232 : // file, and then returned to the user. A pointer to this buffer is returned
233 : // directly to the caller of NextRecord.
234 : recordBuf bytes.Buffer
235 : }
236 :
237 : // *virtualWALReader implements wal.Reader.
238 : var _ Reader = (*virtualWALReader)(nil)
239 :
240 : // NextRecord returns a reader for the next record. It returns io.EOF if there
241 : // are no more records. The reader returned becomes stale after the next
242 : // NextRecord call, and should no longer be used.
243 1 : func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
244 1 : // On the first call, we need to open the first file.
245 1 : if r.currIndex < 0 {
246 1 : err := r.nextFile()
247 1 : if err != nil {
248 0 : return nil, Offset{}, err
249 0 : }
250 : }
251 :
252 1 : for {
253 1 : // Update our current physical offset to match the current file offset.
254 1 : r.off.Physical = r.currReader.Offset()
255 1 : // Obtain a Reader for the next record within this log file.
256 1 : rec, err := r.currReader.Next()
257 1 : if errors.Is(err, io.EOF) {
258 1 : // This file is exhausted; continue to the next.
259 1 : err := r.nextFile()
260 1 : if err != nil {
261 1 : return nil, r.off, err
262 1 : }
263 1 : continue
264 : }
265 :
266 : // Copy the record into a buffer. This ensures we read its entirety so
267 : // that NextRecord returns the next record, even if the caller never
268 : // exhausts the previous record's Reader. The record.Reader requires the
269 : // record to be exhausted to read all of the record's chunks before
270 : // attempting to read the next record. Buffering also also allows us to
271 : // easily read the header of the batch down below for deduplication.
272 1 : r.recordBuf.Reset()
273 1 : if err == nil {
274 1 : _, err = io.Copy(&r.recordBuf, rec)
275 1 : }
276 : // The record may be malformed. This is expected during a WAL failover,
277 : // because the tail of a WAL may be only partially written or otherwise
278 : // unclean because of WAL recycling and the inability to write the EOF
279 : // trailer record. If this isn't the last file, we silently ignore the
280 : // invalid record at the tail and proceed to the next file. If it is
281 : // the last file, bubble the error up and let the client decide what to
282 : // do with it. If the virtual WAL is the most recent WAL, Open may also
283 : // decide to ignore it because it's consistent with an incomplete
284 : // in-flight write at the time of process exit/crash. See #453.
285 1 : if record.IsInvalidRecord(err) && r.currIndex < len(r.segments)-1 {
286 0 : if err := r.nextFile(); err != nil {
287 0 : return nil, r.off, err
288 0 : }
289 0 : continue
290 1 : } else if err != nil {
291 0 : return nil, r.off, err
292 0 : }
293 :
294 : // We may observe repeat records between the physical files that make up
295 : // a virtual WAL because inflight writes to a file on a stalled disk may
296 : // or may not end up completing. WAL records always contain encoded
297 : // batches, and batches that contain data can be uniquely identifed by
298 : // sequence number.
299 : //
300 : // Parse the batch header.
301 1 : h, ok := batchrepr.ReadHeader(r.recordBuf.Bytes())
302 1 : if !ok {
303 0 : // Failed to read the batch header because the record was smaller
304 0 : // than the length of a batch header. This is unexpected. The record
305 0 : // envelope successfully decoded and the checkums of the individual
306 0 : // record fragment(s) validated, so the writer truly wrote an
307 0 : // invalid batch. During Open WAL recovery treats this as
308 0 : // corruption. We could return the record to the caller, allowing
309 0 : // the caller to interpret it as corruption, but it seems safer to
310 0 : // be explicit and surface the corruption error here.
311 0 : return nil, r.off, base.CorruptionErrorf("pebble: corrupt log file logNum=%d, logNameIndex=%s: invalid batch",
312 0 : r.Num, errors.Safe(r.segments[r.currIndex].logNameIndex))
313 0 : }
314 :
315 : // There's a subtlety necessitated by LogData operations. A LogData
316 : // applied to a batch results in data appended to the WAL in a batch
317 : // format, but the data is never applied to the memtable or LSM. A batch
318 : // only containing LogData will repeat a sequence number. We skip these
319 : // batches because they're not relevant for recovery and we do not want
320 : // to mistakenly deduplicate the batch containing KVs at the same
321 : // sequence number. We can differentiate LogData-only batches through
322 : // their batch headers: they'll encode a count of zero.
323 1 : if h.Count == 0 {
324 1 : continue
325 : }
326 :
327 : // If we've already observed a sequence number >= this batch's sequence
328 : // number, we must've already returned this record to the client. Skip
329 : // it.
330 1 : if h.SeqNum <= r.lastSeqNum {
331 1 : continue
332 : }
333 1 : r.lastSeqNum = h.SeqNum
334 1 : return &r.recordBuf, r.off, nil
335 : }
336 : }
337 :
338 : // Close closes the reader, releasing open resources.
339 1 : func (r *virtualWALReader) Close() error {
340 1 : if r.currFile != nil {
341 1 : if err := r.currFile.Close(); err != nil {
342 0 : return err
343 0 : }
344 : }
345 1 : return nil
346 : }
347 :
348 : // nextFile advances the internal state to the next physical segment file.
349 1 : func (r *virtualWALReader) nextFile() error {
350 1 : if r.currFile != nil {
351 1 : err := r.currFile.Close()
352 1 : r.currFile = nil
353 1 : if err != nil {
354 0 : return err
355 0 : }
356 : }
357 1 : r.currIndex++
358 1 : if r.currIndex >= len(r.segments) {
359 1 : return io.EOF
360 1 : }
361 :
362 1 : fs, path := r.LogicalLog.SegmentLocation(r.currIndex)
363 1 : r.off.PreviousFilesBytes += r.off.Physical
364 1 : r.off.PhysicalFile = path
365 1 : r.off.Physical = 0
366 1 : var err error
367 1 : if r.currFile, err = fs.Open(path); err != nil {
368 0 : return errors.Wrapf(err, "opening WAL file segment %q", path)
369 0 : }
370 1 : r.currReader = record.NewReader(r.currFile, base.DiskFileNum(r.Num))
371 1 : return nil
372 : }
|