LCOV - code coverage report
Current view: top level - pebble/wal - reader.go (source / functions) Hit Total Coverage
Test: 2024-04-21 08:15Z e7c4ec2e - tests only.lcov Lines: 176 186 94.6 %
Date: 2024-04-21 08:15:51 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package wal
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "cmp"
      10             :         "fmt"
      11             :         "io"
      12             :         "slices"
      13             :         "strings"
      14             : 
      15             :         "github.com/cockroachdb/errors"
      16             :         "github.com/cockroachdb/pebble/batchrepr"
      17             :         "github.com/cockroachdb/pebble/internal/base"
      18             :         "github.com/cockroachdb/pebble/record"
      19             :         "github.com/cockroachdb/pebble/vfs"
      20             : )
      21             : 
      22             : // A LogicalLog identifies a logical WAL and its consituent segment files.
      23             : type LogicalLog struct {
      24             :         Num NumWAL
      25             :         // segments contains the list of the consistuent physical segment files that
      26             :         // make up the single logical WAL file. segments is ordered by increasing
      27             :         // logIndex.
      28             :         segments []segment
      29             : }
      30             : 
      31             : // A segment represents an individual physical file that makes up a contiguous
      32             : // segment of a logical WAL. If a failover occurred during a WAL's lifetime, a
      33             : // WAL may be composed of multiple segments.
      34             : type segment struct {
      35             :         logNameIndex LogNameIndex
      36             :         dir          Dir
      37             : }
      38             : 
      39             : // String implements fmt.Stringer.
      40           1 : func (s segment) String() string {
      41           1 :         return fmt.Sprintf("(%s,%s)", s.dir.Dirname, s.logNameIndex)
      42           1 : }
      43             : 
      44             : // NumSegments returns the number of constituent physical log files that make up
      45             : // the log.
      46           1 : func (ll LogicalLog) NumSegments() int { return len(ll.segments) }
      47             : 
      48             : // SegmentLocation returns the FS and path for the i-th physical segment file.
      49           1 : func (ll LogicalLog) SegmentLocation(i int) (vfs.FS, string) {
      50           1 :         s := ll.segments[i]
      51           1 :         path := s.dir.FS.PathJoin(s.dir.Dirname, makeLogFilename(ll.Num, s.logNameIndex))
      52           1 :         return s.dir.FS, path
      53           1 : }
      54             : 
      55             : // PhysicalSize stats each of the log's physical files, summing their sizes.
      56           1 : func (ll LogicalLog) PhysicalSize() (uint64, error) {
      57           1 :         var size uint64
      58           1 :         for i := range ll.segments {
      59           1 :                 fs, path := ll.SegmentLocation(i)
      60           1 :                 stat, err := fs.Stat(path)
      61           1 :                 if err != nil {
      62           0 :                         return 0, err
      63           0 :                 }
      64           1 :                 size += uint64(stat.Size())
      65             :         }
      66           1 :         return size, nil
      67             : }
      68             : 
      69             : // OpenForRead a logical WAL for reading.
      70           1 : func (ll LogicalLog) OpenForRead() Reader {
      71           1 :         return newVirtualWALReader(ll)
      72           1 : }
      73             : 
      74             : // String implements fmt.Stringer.
      75           1 : func (ll LogicalLog) String() string {
      76           1 :         var sb strings.Builder
      77           1 :         sb.WriteString(base.DiskFileNum(ll.Num).String())
      78           1 :         sb.WriteString(": {")
      79           1 :         for i := range ll.segments {
      80           1 :                 if i > 0 {
      81           1 :                         sb.WriteString(", ")
      82           1 :                 }
      83           1 :                 sb.WriteString(ll.segments[i].String())
      84             :         }
      85           1 :         sb.WriteString("}")
      86           1 :         return sb.String()
      87             : }
      88             : 
      89             : // appendDeletableLogs appends all of the LogicalLog's constituent physical
      90             : // files as DeletableLogs to dst, returning the modified slice.
      91             : // AppendDeletableLogs will Stat physical files to determine physical sizes.
      92             : // AppendDeletableLogs does not make any judgmenet on whether a log file is
      93             : // obsolete, so callers must take care not to delete logs that are still
      94             : // unflushed.
      95           1 : func appendDeletableLogs(dst []DeletableLog, ll LogicalLog) ([]DeletableLog, error) {
      96           1 :         for i := range ll.segments {
      97           1 :                 fs, path := ll.SegmentLocation(i)
      98           1 :                 stat, err := fs.Stat(path)
      99           1 :                 if err != nil {
     100           0 :                         return dst, err
     101           0 :                 }
     102           1 :                 dst = append(dst, DeletableLog{
     103           1 :                         FS:             fs,
     104           1 :                         Path:           path,
     105           1 :                         NumWAL:         ll.Num,
     106           1 :                         ApproxFileSize: uint64(stat.Size()),
     107           1 :                 })
     108             :         }
     109           1 :         return dst, nil
     110             : }
     111             : 
     112             : // Scan finds all log files in the provided directories. It returns an
     113             : // ordered list of WALs in increasing NumWAL order.
     114           1 : func Scan(dirs ...Dir) (Logs, error) {
     115           1 :         var fa FileAccumulator
     116           1 :         for _, d := range dirs {
     117           1 :                 ls, err := d.FS.List(d.Dirname)
     118           1 :                 if err != nil {
     119           1 :                         return nil, errors.Wrapf(err, "reading %q", d.Dirname)
     120           1 :                 }
     121           1 :                 for _, name := range ls {
     122           1 :                         _, err := fa.maybeAccumulate(d.FS, d.Dirname, name)
     123           1 :                         if err != nil {
     124           1 :                                 return nil, err
     125           1 :                         }
     126             :                 }
     127             :         }
     128           1 :         return fa.wals, nil
     129             : }
     130             : 
     131             : // FileAccumulator parses and accumulates log files.
     132             : type FileAccumulator struct {
     133             :         wals []LogicalLog
     134             : }
     135             : 
     136             : // MaybeAccumulate parses the provided path's filename. If the filename
     137             : // indicates the file is a write-ahead log, MaybeAccumulate updates its internal
     138             : // state to remember the file and returns isLogFile=true. An error is returned
     139             : // if the file is a duplicate.
     140           1 : func (a *FileAccumulator) MaybeAccumulate(fs vfs.FS, path string) (isLogFile bool, err error) {
     141           1 :         filename := fs.PathBase(path)
     142           1 :         dirname := fs.PathDir(path)
     143           1 :         return a.maybeAccumulate(fs, dirname, filename)
     144           1 : }
     145             : 
     146             : // Finish returns a Logs constructed from the physical files observed through
     147             : // MaybeAccumulate.
     148           1 : func (a *FileAccumulator) Finish() Logs {
     149           1 :         wals := a.wals
     150           1 :         a.wals = nil
     151           1 :         return wals
     152           1 : }
     153             : 
     154             : func (a *FileAccumulator) maybeAccumulate(
     155             :         fs vfs.FS, dirname, name string,
     156           1 : ) (isLogFile bool, err error) {
     157           1 :         dfn, li, ok := ParseLogFilename(name)
     158           1 :         if !ok {
     159           1 :                 return false, nil
     160           1 :         }
     161             :         // Have we seen this logical log number yet?
     162           1 :         i, found := slices.BinarySearchFunc(a.wals, dfn, func(lw LogicalLog, n NumWAL) int {
     163           1 :                 return cmp.Compare(lw.Num, n)
     164           1 :         })
     165           1 :         if !found {
     166           1 :                 a.wals = slices.Insert(a.wals, i, LogicalLog{Num: dfn, segments: make([]segment, 0, 1)})
     167           1 :         }
     168             :         // Ensure we haven't seen this log index yet, and find where it
     169             :         // slots within this log's segments.
     170           1 :         j, found := slices.BinarySearchFunc(a.wals[i].segments, li, func(s segment, li LogNameIndex) int {
     171           1 :                 return cmp.Compare(s.logNameIndex, li)
     172           1 :         })
     173           1 :         if found {
     174           1 :                 return false, errors.Errorf("wal: duplicate logIndex=%s for WAL %s in %s and %s",
     175           1 :                         li, dfn, dirname, a.wals[i].segments[j].dir.Dirname)
     176           1 :         }
     177           1 :         a.wals[i].segments = slices.Insert(a.wals[i].segments, j, segment{logNameIndex: li, dir: Dir{
     178           1 :                 FS:      fs,
     179           1 :                 Dirname: dirname,
     180           1 :         }})
     181           1 :         return true, nil
     182             : }
     183             : 
     184             : // Logs holds a collection of WAL files, in increasing order of NumWAL.
     185             : type Logs []LogicalLog
     186             : 
     187             : // Get retrieves the WAL with the given number if present. The second return
     188             : // value indicates whether or not the WAL was found.
     189           1 : func (l Logs) Get(num NumWAL) (LogicalLog, bool) {
     190           1 :         i, found := slices.BinarySearchFunc(l, num, func(lw LogicalLog, n NumWAL) int {
     191           1 :                 return cmp.Compare(lw.Num, n)
     192           1 :         })
     193           1 :         if !found {
     194           1 :                 return LogicalLog{}, false
     195           1 :         }
     196           1 :         return l[i], true
     197             : }
     198             : 
     199           1 : func newVirtualWALReader(wal LogicalLog) *virtualWALReader {
     200           1 :         return &virtualWALReader{
     201           1 :                 LogicalLog: wal,
     202           1 :                 currIndex:  -1,
     203           1 :         }
     204           1 : }
     205             : 
     206             : // A virtualWALReader takes an ordered sequence of physical WAL files
     207             : // ("segments") and implements the wal.Reader interface, providing a merged view
     208             : // of the WAL's logical contents. It's responsible for filtering duplicate
     209             : // records which may be shared by the tail of a segment file and the head of its
     210             : // successor.
     211             : type virtualWALReader struct {
     212             :         // VirtualWAL metadata.
     213             :         LogicalLog
     214             : 
     215             :         // State pertaining to the current position of the reader within the virtual
     216             :         // WAL and its constituent physical files.
     217             :         currIndex  int
     218             :         currFile   vfs.File
     219             :         currReader *record.Reader
     220             :         // off describes the current Offset within the WAL.
     221             :         off Offset
     222             :         // lastSeqNum is the sequence number of the batch contained within the last
     223             :         // record returned to the user. A virtual WAL may be split across a sequence
     224             :         // of several physical WAL files. The tail of one physical WAL may be
     225             :         // duplicated within the head of the next physical WAL file. We use
     226             :         // contained batches' sequence numbers to deduplicate. This lastSeqNum field
     227             :         // should monotonically increase as we iterate over the WAL files. If we
     228             :         // ever observe a batch encoding a sequence number <= lastSeqNum, we must
     229             :         // have already returned the batch and should skip it.
     230             :         lastSeqNum uint64
     231             :         // recordBuf is a buffer used to hold the latest record read from a physical
     232             :         // file, and then returned to the user. A pointer to this buffer is returned
     233             :         // directly to the caller of NextRecord.
     234             :         recordBuf bytes.Buffer
     235             : }
     236             : 
     237             : // *virtualWALReader implements wal.Reader.
     238             : var _ Reader = (*virtualWALReader)(nil)
     239             : 
     240             : // NextRecord returns a reader for the next record. It returns io.EOF if there
     241             : // are no more records. The reader returned becomes stale after the next
     242             : // NextRecord call, and should no longer be used.
     243           1 : func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
     244           1 :         r.recordBuf.Reset()
     245           1 : 
     246           1 :         // On the first call, we need to open the first file.
     247           1 :         if r.currIndex < 0 {
     248           1 :                 err := r.nextFile()
     249           1 :                 if err != nil {
     250           0 :                         return nil, Offset{}, err
     251           0 :                 }
     252             :         }
     253             : 
     254           1 :         for {
     255           1 :                 // Update our current physical offset to match the current file offset.
     256           1 :                 r.off.Physical = r.currReader.Offset()
     257           1 :                 // Obtain a Reader for the next record within this log file.
     258           1 :                 rec, err := r.currReader.Next()
     259           1 :                 if errors.Is(err, io.EOF) {
     260           1 :                         // This file is exhausted; continue to the next.
     261           1 :                         err := r.nextFile()
     262           1 :                         if err != nil {
     263           1 :                                 return nil, r.off, err
     264           1 :                         }
     265           1 :                         continue
     266             :                 }
     267             : 
     268             :                 // Copy the record into a buffer. This ensures we read its entirety so
     269             :                 // that NextRecord returns the next record, even if the caller never
     270             :                 // exhausts the previous record's Reader. The record.Reader requires the
     271             :                 // record to be exhausted to read all of the record's chunks before
     272             :                 // attempting to read the next record. Buffering also also allows us to
     273             :                 // easily read the header of the batch down below for deduplication.
     274           1 :                 if err == nil {
     275           1 :                         _, err = io.Copy(&r.recordBuf, rec)
     276           1 :                 }
     277             :                 // The record may be malformed. This is expected during a WAL failover,
     278             :                 // because the tail of a WAL may be only partially written or otherwise
     279             :                 // unclean because of WAL recycling and the inability to write the EOF
     280             :                 // trailer record. If this isn't the last file, we silently ignore the
     281             :                 // invalid record at the tail and proceed to the next file. If it is
     282             :                 // the last file, bubble the error up and let the client decide what to
     283             :                 // do with it. If the virtual WAL is the most recent WAL, Open may also
     284             :                 // decide to ignore it because it's consistent with an incomplete
     285             :                 // in-flight write at the time of process exit/crash. See #453.
     286           1 :                 if record.IsInvalidRecord(err) && r.currIndex < len(r.segments)-1 {
     287           1 :                         if err := r.nextFile(); err != nil {
     288           1 :                                 return nil, r.off, err
     289           1 :                         }
     290           1 :                         continue
     291           1 :                 } else if err != nil {
     292           1 :                         return nil, r.off, err
     293           1 :                 }
     294             : 
     295             :                 // We may observe repeat records between the physical files that make up
     296             :                 // a virtual WAL because inflight writes to a file on a stalled disk may
     297             :                 // or may not end up completing. WAL records always contain encoded
     298             :                 // batches, and batches that contain data can be uniquely identifed by
     299             :                 // sequence number.
     300             :                 //
     301             :                 // Parse the batch header.
     302           1 :                 h, ok := batchrepr.ReadHeader(r.recordBuf.Bytes())
     303           1 :                 if !ok {
     304           1 :                         // Failed to read the batch header because the record was smaller
     305           1 :                         // than the length of a batch header. This is unexpected. The record
     306           1 :                         // envelope successfully decoded and the checkums of the individual
     307           1 :                         // record fragment(s) validated, so the writer truly wrote an
     308           1 :                         // invalid batch. During Open WAL recovery treats this as
     309           1 :                         // corruption. We could return the record to the caller, allowing
     310           1 :                         // the caller to interpret it as corruption, but it seems safer to
     311           1 :                         // be explicit and surface the corruption error here.
     312           1 :                         return nil, r.off, base.CorruptionErrorf("pebble: corrupt log file logNum=%d, logNameIndex=%s: invalid batch",
     313           1 :                                 r.Num, errors.Safe(r.segments[r.currIndex].logNameIndex))
     314           1 :                 }
     315             : 
     316             :                 // There's a subtlety necessitated by LogData operations. A LogData
     317             :                 // applied to a batch results in data appended to the WAL in a batch
     318             :                 // format, but the data is never applied to the memtable or LSM. A batch
     319             :                 // only containing LogData will repeat a sequence number. We skip these
     320             :                 // batches because they're not relevant for recovery and we do not want
     321             :                 // to mistakenly deduplicate the batch containing KVs at the same
     322             :                 // sequence number. We can differentiate LogData-only batches through
     323             :                 // their batch headers: they'll encode a count of zero.
     324           1 :                 if h.Count == 0 {
     325           1 :                         r.recordBuf.Reset()
     326           1 :                         continue
     327             :                 }
     328             : 
     329             :                 // If we've already observed a sequence number >= this batch's sequence
     330             :                 // number, we must've already returned this record to the client. Skip
     331             :                 // it.
     332           1 :                 if h.SeqNum <= r.lastSeqNum {
     333           1 :                         r.recordBuf.Reset()
     334           1 :                         continue
     335             :                 }
     336           1 :                 r.lastSeqNum = h.SeqNum
     337           1 :                 return &r.recordBuf, r.off, nil
     338             :         }
     339             : }
     340             : 
     341             : // Close closes the reader, releasing open resources.
     342           1 : func (r *virtualWALReader) Close() error {
     343           1 :         if r.currFile != nil {
     344           1 :                 if err := r.currFile.Close(); err != nil {
     345           0 :                         return err
     346           0 :                 }
     347             :         }
     348           1 :         return nil
     349             : }
     350             : 
     351             : // nextFile advances the internal state to the next physical segment file.
     352           1 : func (r *virtualWALReader) nextFile() error {
     353           1 :         if r.currFile != nil {
     354           1 :                 err := r.currFile.Close()
     355           1 :                 r.currFile = nil
     356           1 :                 if err != nil {
     357           0 :                         return err
     358           0 :                 }
     359             :         }
     360           1 :         r.currIndex++
     361           1 :         if r.currIndex >= len(r.segments) {
     362           1 :                 return io.EOF
     363           1 :         }
     364             : 
     365           1 :         fs, path := r.LogicalLog.SegmentLocation(r.currIndex)
     366           1 :         r.off.PhysicalFile = path
     367           1 :         r.off.Physical = 0
     368           1 :         var err error
     369           1 :         if r.currFile, err = fs.Open(path); err != nil {
     370           1 :                 return errors.Wrapf(err, "opening WAL file segment %q", path)
     371           1 :         }
     372           1 :         r.currReader = record.NewReader(r.currFile, base.DiskFileNum(r.Num))
     373           1 :         return nil
     374             : }

Generated by: LCOV version 1.14