LCOV - code coverage report
Current view: top level - pebble/wal - reader.go (source / functions) Hit Total Coverage
Test: 2024-12-25 08:17Z 78d53457 - meta test only.lcov Lines: 122 187 65.2 %
Date: 2024-12-25 08:18:29 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package wal
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "cmp"
      10             :         "io"
      11             :         "slices"
      12             : 
      13             :         "github.com/cockroachdb/errors"
      14             :         "github.com/cockroachdb/pebble/batchrepr"
      15             :         "github.com/cockroachdb/pebble/internal/base"
      16             :         "github.com/cockroachdb/pebble/record"
      17             :         "github.com/cockroachdb/pebble/vfs"
      18             :         "github.com/cockroachdb/redact"
      19             : )
      20             : 
      21             : // A LogicalLog identifies a logical WAL and its consituent segment files.
      22             : type LogicalLog struct {
      23             :         Num NumWAL
      24             :         // segments contains the list of the consistuent physical segment files that
      25             :         // make up the single logical WAL file. segments is ordered by increasing
      26             :         // logIndex.
      27             :         segments []segment
      28             : }
      29             : 
      30             : // A segment represents an individual physical file that makes up a contiguous
      31             : // segment of a logical WAL. If a failover occurred during a WAL's lifetime, a
      32             : // WAL may be composed of multiple segments.
      33             : type segment struct {
      34             :         logNameIndex LogNameIndex
      35             :         dir          Dir
      36             : }
      37             : 
      38             : // String implements fmt.Stringer.
      39           0 : func (s segment) String() string {
      40           0 :         return redact.StringWithoutMarkers(s)
      41           0 : }
      42             : 
      43             : // SafeFormat implements redact.SafeFormatter.
      44           1 : func (s segment) SafeFormat(w redact.SafePrinter, _ rune) {
      45           1 :         w.Printf("(%s,%s)", errors.Safe(s.dir.Dirname), s.logNameIndex)
      46           1 : }
      47             : 
      48             : // NumSegments returns the number of constituent physical log files that make up
      49             : // the log.
      50           1 : func (ll LogicalLog) NumSegments() int { return len(ll.segments) }
      51             : 
      52             : // SegmentLocation returns the FS and path for the i-th physical segment file.
      53           1 : func (ll LogicalLog) SegmentLocation(i int) (vfs.FS, string) {
      54           1 :         s := ll.segments[i]
      55           1 :         path := s.dir.FS.PathJoin(s.dir.Dirname, makeLogFilename(ll.Num, s.logNameIndex))
      56           1 :         return s.dir.FS, path
      57           1 : }
      58             : 
      59             : // PhysicalSize stats each of the log's physical files, summing their sizes.
      60           0 : func (ll LogicalLog) PhysicalSize() (uint64, error) {
      61           0 :         var size uint64
      62           0 :         for i := range ll.segments {
      63           0 :                 fs, path := ll.SegmentLocation(i)
      64           0 :                 stat, err := fs.Stat(path)
      65           0 :                 if err != nil {
      66           0 :                         return 0, err
      67           0 :                 }
      68           0 :                 size += uint64(stat.Size())
      69             :         }
      70           0 :         return size, nil
      71             : }
      72             : 
      73             : // OpenForRead a logical WAL for reading.
      74           1 : func (ll LogicalLog) OpenForRead() Reader {
      75           1 :         return newVirtualWALReader(ll)
      76           1 : }
      77             : 
      78             : // String implements fmt.Stringer.
      79           1 : func (ll LogicalLog) String() string {
      80           1 :         return redact.StringWithoutMarkers(ll)
      81           1 : }
      82             : 
      83             : // SafeFormat implements redact.SafeFormatter.
      84           1 : func (ll LogicalLog) SafeFormat(w redact.SafePrinter, _ rune) {
      85           1 :         w.Printf("%s: {", base.DiskFileNum(ll.Num).String())
      86           1 :         for i := range ll.segments {
      87           1 :                 if i > 0 {
      88           1 :                         w.SafeString(", ")
      89           1 :                 }
      90           1 :                 w.Print(ll.segments[i])
      91             :         }
      92           1 :         w.SafeString("}")
      93             : }
      94             : 
      95             : // appendDeletableLogs appends all of the LogicalLog's constituent physical
      96             : // files as DeletableLogs to dst, returning the modified slice.
      97             : // AppendDeletableLogs will Stat physical files to determine physical sizes.
      98             : // AppendDeletableLogs does not make any judgmenet on whether a log file is
      99             : // obsolete, so callers must take care not to delete logs that are still
     100             : // unflushed.
     101           1 : func appendDeletableLogs(dst []DeletableLog, ll LogicalLog) ([]DeletableLog, error) {
     102           1 :         for i := range ll.segments {
     103           1 :                 fs, path := ll.SegmentLocation(i)
     104           1 :                 stat, err := fs.Stat(path)
     105           1 :                 if err != nil {
     106           0 :                         return dst, err
     107           0 :                 }
     108           1 :                 dst = append(dst, DeletableLog{
     109           1 :                         FS:             fs,
     110           1 :                         Path:           path,
     111           1 :                         NumWAL:         ll.Num,
     112           1 :                         ApproxFileSize: uint64(stat.Size()),
     113           1 :                 })
     114             :         }
     115           1 :         return dst, nil
     116             : }
     117             : 
     118             : // Scan finds all log files in the provided directories. It returns an
     119             : // ordered list of WALs in increasing NumWAL order.
     120           1 : func Scan(dirs ...Dir) (Logs, error) {
     121           1 :         var fa FileAccumulator
     122           1 :         for _, d := range dirs {
     123           1 :                 ls, err := d.FS.List(d.Dirname)
     124           1 :                 if err != nil {
     125           0 :                         return nil, errors.Wrapf(err, "reading %q", d.Dirname)
     126           0 :                 }
     127           1 :                 for _, name := range ls {
     128           1 :                         _, err := fa.maybeAccumulate(d.FS, d.Dirname, name)
     129           1 :                         if err != nil {
     130           0 :                                 return nil, err
     131           0 :                         }
     132             :                 }
     133             :         }
     134           1 :         return fa.wals, nil
     135             : }
     136             : 
     137             : // FileAccumulator parses and accumulates log files.
     138             : type FileAccumulator struct {
     139             :         wals []LogicalLog
     140             : }
     141             : 
     142             : // MaybeAccumulate parses the provided path's filename. If the filename
     143             : // indicates the file is a write-ahead log, MaybeAccumulate updates its internal
     144             : // state to remember the file and returns isLogFile=true. An error is returned
     145             : // if the file is a duplicate.
     146           0 : func (a *FileAccumulator) MaybeAccumulate(fs vfs.FS, path string) (isLogFile bool, err error) {
     147           0 :         filename := fs.PathBase(path)
     148           0 :         dirname := fs.PathDir(path)
     149           0 :         return a.maybeAccumulate(fs, dirname, filename)
     150           0 : }
     151             : 
     152             : // Finish returns a Logs constructed from the physical files observed through
     153             : // MaybeAccumulate.
     154           0 : func (a *FileAccumulator) Finish() Logs {
     155           0 :         wals := a.wals
     156           0 :         a.wals = nil
     157           0 :         return wals
     158           0 : }
     159             : 
     160             : func (a *FileAccumulator) maybeAccumulate(
     161             :         fs vfs.FS, dirname, name string,
     162           1 : ) (isLogFile bool, err error) {
     163           1 :         dfn, li, ok := ParseLogFilename(name)
     164           1 :         if !ok {
     165           1 :                 return false, nil
     166           1 :         }
     167             :         // Have we seen this logical log number yet?
     168           1 :         i, found := slices.BinarySearchFunc(a.wals, dfn, func(lw LogicalLog, n NumWAL) int {
     169           1 :                 return cmp.Compare(lw.Num, n)
     170           1 :         })
     171           1 :         if !found {
     172           1 :                 a.wals = slices.Insert(a.wals, i, LogicalLog{Num: dfn, segments: make([]segment, 0, 1)})
     173           1 :         }
     174             :         // Ensure we haven't seen this log index yet, and find where it
     175             :         // slots within this log's segments.
     176           1 :         j, found := slices.BinarySearchFunc(a.wals[i].segments, li, func(s segment, li LogNameIndex) int {
     177           1 :                 return cmp.Compare(s.logNameIndex, li)
     178           1 :         })
     179           1 :         if found {
     180           0 :                 return false, errors.Errorf("wal: duplicate logIndex=%s for WAL %s in %s and %s",
     181           0 :                         li, dfn, dirname, a.wals[i].segments[j].dir.Dirname)
     182           0 :         }
     183           1 :         a.wals[i].segments = slices.Insert(a.wals[i].segments, j, segment{logNameIndex: li, dir: Dir{
     184           1 :                 FS:      fs,
     185           1 :                 Dirname: dirname,
     186           1 :         }})
     187           1 :         return true, nil
     188             : }
     189             : 
     190             : // Logs holds a collection of WAL files, in increasing order of NumWAL.
     191             : type Logs []LogicalLog
     192             : 
     193             : // Get retrieves the WAL with the given number if present. The second return
     194             : // value indicates whether or not the WAL was found.
     195           0 : func (l Logs) Get(num NumWAL) (LogicalLog, bool) {
     196           0 :         i, found := slices.BinarySearchFunc(l, num, func(lw LogicalLog, n NumWAL) int {
     197           0 :                 return cmp.Compare(lw.Num, n)
     198           0 :         })
     199           0 :         if !found {
     200           0 :                 return LogicalLog{}, false
     201           0 :         }
     202           0 :         return l[i], true
     203             : }
     204             : 
     205           1 : func newVirtualWALReader(wal LogicalLog) *virtualWALReader {
     206           1 :         return &virtualWALReader{
     207           1 :                 LogicalLog: wal,
     208           1 :                 currIndex:  -1,
     209           1 :         }
     210           1 : }
     211             : 
     212             : // A virtualWALReader takes an ordered sequence of physical WAL files
     213             : // ("segments") and implements the wal.Reader interface, providing a merged view
     214             : // of the WAL's logical contents. It's responsible for filtering duplicate
     215             : // records which may be shared by the tail of a segment file and the head of its
     216             : // successor.
     217             : type virtualWALReader struct {
     218             :         // VirtualWAL metadata.
     219             :         LogicalLog
     220             : 
     221             :         // State pertaining to the current position of the reader within the virtual
     222             :         // WAL and its constituent physical files.
     223             :         currIndex  int
     224             :         currFile   vfs.File
     225             :         currReader *record.Reader
     226             :         // off describes the current Offset within the WAL.
     227             :         off Offset
     228             :         // lastSeqNum is the sequence number of the batch contained within the last
     229             :         // record returned to the user. A virtual WAL may be split across a sequence
     230             :         // of several physical WAL files. The tail of one physical WAL may be
     231             :         // duplicated within the head of the next physical WAL file. We use
     232             :         // contained batches' sequence numbers to deduplicate. This lastSeqNum field
     233             :         // should monotonically increase as we iterate over the WAL files. If we
     234             :         // ever observe a batch encoding a sequence number <= lastSeqNum, we must
     235             :         // have already returned the batch and should skip it.
     236             :         lastSeqNum base.SeqNum
     237             :         // recordBuf is a buffer used to hold the latest record read from a physical
     238             :         // file, and then returned to the user. A pointer to this buffer is returned
     239             :         // directly to the caller of NextRecord.
     240             :         recordBuf bytes.Buffer
     241             : }
     242             : 
     243             : // *virtualWALReader implements wal.Reader.
     244             : var _ Reader = (*virtualWALReader)(nil)
     245             : 
     246             : // NextRecord returns a reader for the next record. It returns io.EOF if there
     247             : // are no more records. The reader returned becomes stale after the next
     248             : // NextRecord call, and should no longer be used.
     249           1 : func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
     250           1 :         // On the first call, we need to open the first file.
     251           1 :         if r.currIndex < 0 {
     252           1 :                 err := r.nextFile()
     253           1 :                 if err != nil {
     254           0 :                         return nil, Offset{}, err
     255           0 :                 }
     256             :         }
     257             : 
     258           1 :         for {
     259           1 :                 // Update our current physical offset to match the current file offset.
     260           1 :                 r.off.Physical = r.currReader.Offset()
     261           1 :                 // Obtain a Reader for the next record within this log file.
     262           1 :                 rec, err := r.currReader.Next()
     263           1 :                 if errors.Is(err, io.EOF) {
     264           1 :                         // This file is exhausted; continue to the next.
     265           1 :                         err := r.nextFile()
     266           1 :                         if err != nil {
     267           1 :                                 return nil, r.off, err
     268           1 :                         }
     269           1 :                         continue
     270             :                 }
     271             : 
     272             :                 // Copy the record into a buffer. This ensures we read its entirety so
     273             :                 // that NextRecord returns the next record, even if the caller never
     274             :                 // exhausts the previous record's Reader. The record.Reader requires the
     275             :                 // record to be exhausted to read all of the record's chunks before
     276             :                 // attempting to read the next record. Buffering also also allows us to
     277             :                 // easily read the header of the batch down below for deduplication.
     278           1 :                 r.recordBuf.Reset()
     279           1 :                 if err == nil {
     280           1 :                         _, err = io.Copy(&r.recordBuf, rec)
     281           1 :                 }
     282             :                 // The record may be malformed. This is expected during a WAL failover,
     283             :                 // because the tail of a WAL may be only partially written or otherwise
     284             :                 // unclean because of WAL recycling and the inability to write the EOF
     285             :                 // trailer record. If this isn't the last file, we silently ignore the
     286             :                 // invalid record at the tail and proceed to the next file. If it is
     287             :                 // the last file, bubble the error up and let the client decide what to
     288             :                 // do with it. If the virtual WAL is the most recent WAL, Open may also
     289             :                 // decide to ignore it because it's consistent with an incomplete
     290             :                 // in-flight write at the time of process exit/crash. See #453.
     291           1 :                 if record.IsInvalidRecord(err) && r.currIndex < len(r.segments)-1 {
     292           0 :                         if err := r.nextFile(); err != nil {
     293           0 :                                 return nil, r.off, err
     294           0 :                         }
     295           0 :                         continue
     296           1 :                 } else if err != nil {
     297           0 :                         return nil, r.off, err
     298           0 :                 }
     299             : 
     300             :                 // We may observe repeat records between the physical files that make up
     301             :                 // a virtual WAL because inflight writes to a file on a stalled disk may
     302             :                 // or may not end up completing. WAL records always contain encoded
     303             :                 // batches, and batches that contain data can be uniquely identifed by
     304             :                 // sequence number.
     305             :                 //
     306             :                 // Parse the batch header.
     307           1 :                 h, ok := batchrepr.ReadHeader(r.recordBuf.Bytes())
     308           1 :                 if !ok {
     309           0 :                         // Failed to read the batch header because the record was smaller
     310           0 :                         // than the length of a batch header. This is unexpected. The record
     311           0 :                         // envelope successfully decoded and the checkums of the individual
     312           0 :                         // record fragment(s) validated, so the writer truly wrote an
     313           0 :                         // invalid batch. During Open WAL recovery treats this as
     314           0 :                         // corruption. We could return the record to the caller, allowing
     315           0 :                         // the caller to interpret it as corruption, but it seems safer to
     316           0 :                         // be explicit and surface the corruption error here.
     317           0 :                         return nil, r.off, base.CorruptionErrorf("pebble: corrupt log file logNum=%d, logNameIndex=%s: invalid batch",
     318           0 :                                 r.Num, errors.Safe(r.segments[r.currIndex].logNameIndex))
     319           0 :                 }
     320             : 
     321             :                 // There's a subtlety necessitated by LogData operations. A LogData
     322             :                 // applied to a batch results in data appended to the WAL in a batch
     323             :                 // format, but the data is never applied to the memtable or LSM. A batch
     324             :                 // only containing LogData will repeat a sequence number. We skip these
     325             :                 // batches because they're not relevant for recovery and we do not want
     326             :                 // to mistakenly deduplicate the batch containing KVs at the same
     327             :                 // sequence number. We can differentiate LogData-only batches through
     328             :                 // their batch headers: they'll encode a count of zero.
     329           1 :                 if h.Count == 0 {
     330           1 :                         continue
     331             :                 }
     332             : 
     333             :                 // If we've already observed a sequence number >= this batch's sequence
     334             :                 // number, we must've already returned this record to the client. Skip
     335             :                 // it.
     336           1 :                 if h.SeqNum <= r.lastSeqNum {
     337           1 :                         continue
     338             :                 }
     339           1 :                 r.lastSeqNum = h.SeqNum
     340           1 :                 return &r.recordBuf, r.off, nil
     341             :         }
     342             : }
     343             : 
     344             : // Close closes the reader, releasing open resources.
     345           1 : func (r *virtualWALReader) Close() error {
     346           1 :         if r.currFile != nil {
     347           1 :                 if err := r.currFile.Close(); err != nil {
     348           0 :                         return err
     349           0 :                 }
     350             :         }
     351           1 :         return nil
     352             : }
     353             : 
     354             : // nextFile advances the internal state to the next physical segment file.
     355           1 : func (r *virtualWALReader) nextFile() error {
     356           1 :         if r.currFile != nil {
     357           1 :                 err := r.currFile.Close()
     358           1 :                 r.currFile = nil
     359           1 :                 if err != nil {
     360           0 :                         return err
     361           0 :                 }
     362             :         }
     363           1 :         r.currIndex++
     364           1 :         if r.currIndex >= len(r.segments) {
     365           1 :                 return io.EOF
     366           1 :         }
     367             : 
     368           1 :         fs, path := r.LogicalLog.SegmentLocation(r.currIndex)
     369           1 :         r.off.PreviousFilesBytes += r.off.Physical
     370           1 :         r.off.PhysicalFile = path
     371           1 :         r.off.Physical = 0
     372           1 :         var err error
     373           1 :         if r.currFile, err = fs.Open(path); err != nil {
     374           0 :                 return errors.Wrapf(err, "opening WAL file segment %q", path)
     375           0 :         }
     376           1 :         r.currReader = record.NewReader(r.currFile, base.DiskFileNum(r.Num))
     377           1 :         return nil
     378             : }

Generated by: LCOV version 1.14