LCOV - code coverage report
Current view: top level - pebble/tool - wal.go (source / functions) Hit Total Coverage
Test: 2024-10-08 08:17Z 3f7527ff - tests + meta.lcov Lines: 108 226 47.8 %
Date: 2024-10-08 08:18:11 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package tool
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "encoding/binary"
      10             :         "fmt"
      11             :         "io"
      12             :         "os"
      13             : 
      14             :         "github.com/cockroachdb/errors"
      15             :         "github.com/cockroachdb/pebble"
      16             :         "github.com/cockroachdb/pebble/batchrepr"
      17             :         "github.com/cockroachdb/pebble/internal/base"
      18             :         "github.com/cockroachdb/pebble/rangekey"
      19             :         "github.com/cockroachdb/pebble/record"
      20             :         "github.com/cockroachdb/pebble/sstable"
      21             :         "github.com/cockroachdb/pebble/wal"
      22             :         "github.com/spf13/cobra"
      23             : )
      24             : 
      25             : // walT implements WAL-level tools, including both configuration state and the
      26             : // commands themselves.
      27             : type walT struct {
      28             :         Root       *cobra.Command
      29             :         Dump       *cobra.Command
      30             :         DumpMerged *cobra.Command
      31             : 
      32             :         opts     *pebble.Options
      33             :         fmtKey   keyFormatter
      34             :         fmtValue valueFormatter
      35             : 
      36             :         defaultComparer string
      37             :         comparers       sstable.Comparers
      38             :         verbose         bool
      39             : }
      40             : 
      41           1 : func newWAL(opts *pebble.Options, comparers sstable.Comparers, defaultComparer string) *walT {
      42           1 :         w := &walT{
      43           1 :                 opts: opts,
      44           1 :         }
      45           1 :         w.fmtKey.mustSet("quoted")
      46           1 :         w.fmtValue.mustSet("size")
      47           1 :         w.comparers = comparers
      48           1 :         w.defaultComparer = defaultComparer
      49           1 : 
      50           1 :         w.Root = &cobra.Command{
      51           1 :                 Use:   "wal",
      52           1 :                 Short: "WAL introspection tools",
      53           1 :         }
      54           1 :         w.Dump = &cobra.Command{
      55           1 :                 Use:   "dump <wal-files>",
      56           1 :                 Short: "print WAL contents",
      57           1 :                 Long: `
      58           1 : Print the contents of the WAL files.
      59           1 : `,
      60           1 :                 Args: cobra.MinimumNArgs(1),
      61           1 :                 Run:  w.runDump,
      62           1 :         }
      63           1 :         w.DumpMerged = &cobra.Command{
      64           1 :                 Use:   "dump-merged <wal-files>",
      65           1 :                 Short: "print WAL contents",
      66           1 :                 Long: `
      67           1 : Print the merged contents of multiple WAL segment files that
      68           1 : together form a single logical WAL.
      69           1 : `,
      70           1 :                 Args: cobra.MinimumNArgs(1),
      71           1 :                 Run:  w.runDumpMerged,
      72           1 :         }
      73           1 : 
      74           1 :         w.Root.AddCommand(w.Dump)
      75           1 :         w.Root.AddCommand(w.DumpMerged)
      76           1 :         w.Root.PersistentFlags().BoolVarP(&w.verbose, "verbose", "v", false, "verbose output")
      77           1 : 
      78           1 :         w.Dump.Flags().Var(
      79           1 :                 &w.fmtKey, "key", "key formatter")
      80           1 :         w.Dump.Flags().Var(
      81           1 :                 &w.fmtValue, "value", "value formatter")
      82           1 :         return w
      83           1 : }
      84             : 
      85             : type errAndArg struct {
      86             :         err error
      87             :         arg string
      88             : }
      89             : 
      90           1 : func (w *walT) runDump(cmd *cobra.Command, args []string) {
      91           1 :         stdout, stderr := cmd.OutOrStdout(), cmd.OutOrStderr()
      92           1 :         w.fmtKey.setForComparer(w.defaultComparer, w.comparers)
      93           1 :         w.fmtValue.setForComparer(w.defaultComparer, w.comparers)
      94           1 :         var errs []errAndArg
      95           1 :         logErr := func(arg string, offset int64, err error) {
      96           0 :                 err = errors.Wrapf(err, "%s: offset %d: ", arg, offset)
      97           0 :                 fmt.Fprintf(stderr, "%s\n", err)
      98           0 :                 errs = append(errs, errAndArg{
      99           0 :                         arg: arg,
     100           0 :                         err: err,
     101           0 :                 })
     102           0 :         }
     103             : 
     104           1 :         for _, arg := range args {
     105           1 :                 func() {
     106           1 :                         // Parse the filename in order to extract the file number. This is
     107           1 :                         // necessary in case WAL recycling was used (which it is usually is). If
     108           1 :                         // we can't parse the filename or it isn't a log file, we'll plow ahead
     109           1 :                         // anyways (which will likely fail when we try to read the file).
     110           1 :                         fileNum, _, ok := wal.ParseLogFilename(arg)
     111           1 :                         if !ok {
     112           0 :                                 fileNum = 0
     113           0 :                         }
     114             : 
     115           1 :                         f, err := w.opts.FS.Open(arg)
     116           1 :                         if err != nil {
     117           0 :                                 fmt.Fprintf(stderr, "%s\n", err)
     118           0 :                                 return
     119           0 :                         }
     120           1 :                         defer f.Close()
     121           1 : 
     122           1 :                         fmt.Fprintf(stdout, "%s\n", arg)
     123           1 : 
     124           1 :                         var b pebble.Batch
     125           1 :                         var buf bytes.Buffer
     126           1 :                         rr := record.NewReader(f, base.DiskFileNum(fileNum))
     127           1 :                         for {
     128           1 :                                 offset := rr.Offset()
     129           1 :                                 r, err := rr.Next()
     130           1 :                                 if err == nil {
     131           1 :                                         buf.Reset()
     132           1 :                                         _, err = io.Copy(&buf, r)
     133           1 :                                 }
     134           1 :                                 if err != nil {
     135           1 :                                         // It is common to encounter a zeroed or invalid chunk due to WAL
     136           1 :                                         // preallocation and WAL recycling. We need to distinguish these
     137           1 :                                         // errors from EOF in order to recognize that the record was
     138           1 :                                         // truncated, but want to otherwise treat them like EOF.
     139           1 :                                         switch {
     140           0 :                                         case errors.Is(err, record.ErrZeroedChunk):
     141           0 :                                                 fmt.Fprintf(stdout, "EOF [%s] (may be due to WAL preallocation)\n", err)
     142           0 :                                         case errors.Is(err, record.ErrInvalidChunk):
     143           0 :                                                 fmt.Fprintf(stdout, "EOF [%s] (may be due to WAL recycling)\n", err)
     144           1 :                                         default:
     145           1 :                                                 fmt.Fprintf(stdout, "%s\n", err)
     146             :                                         }
     147           1 :                                         return
     148             :                                 }
     149             : 
     150           1 :                                 b = pebble.Batch{}
     151           1 :                                 if err := b.SetRepr(buf.Bytes()); err != nil {
     152           0 :                                         fmt.Fprintf(stdout, "corrupt batch within log file %q: %v", arg, err)
     153           0 :                                         return
     154           0 :                                 }
     155           1 :                                 fmt.Fprintf(stdout, "%d(%d) seq=%d count=%d, len=%d\n",
     156           1 :                                         offset, len(b.Repr()), b.SeqNum(), b.Count(), buf.Len())
     157           1 :                                 w.dumpBatch(stdout, &b, b.Reader(), func(err error) {
     158           0 :                                         logErr(arg, offset, err)
     159           0 :                                 })
     160             :                         }
     161             :                 }()
     162             :         }
     163           1 :         if len(errs) > 0 {
     164           0 :                 fmt.Fprintln(stderr, "Errors: ")
     165           0 :                 for _, ea := range errs {
     166           0 :                         fmt.Fprintf(stderr, "%s: %s\n", ea.arg, ea.err)
     167           0 :                 }
     168             :         }
     169             : }
     170             : 
     171           0 : func (w *walT) runDumpMerged(cmd *cobra.Command, args []string) {
     172           0 :         stdout, stderr := cmd.OutOrStdout(), cmd.OutOrStderr()
     173           0 :         w.fmtKey.setForComparer(w.defaultComparer, w.comparers)
     174           0 :         w.fmtValue.setForComparer(w.defaultComparer, w.comparers)
     175           0 :         var a wal.FileAccumulator
     176           0 :         for _, arg := range args {
     177           0 :                 isLog, err := a.MaybeAccumulate(w.opts.FS, arg)
     178           0 :                 if !isLog {
     179           0 :                         fmt.Fprintf(stderr, "%q does not parse as a log file\n", arg)
     180           0 :                         os.Exit(1)
     181           0 :                 } else if err != nil {
     182           0 :                         fmt.Fprintf(stderr, "%s: %s\n", arg, err)
     183           0 :                         os.Exit(1)
     184           0 :                 }
     185             :         }
     186           0 :         logs := a.Finish()
     187           0 :         var errs []error
     188           0 :         for _, log := range logs {
     189           0 :                 fmt.Fprintf(stdout, "log file %s contains %d segment files:\n", log.Num, log.NumSegments())
     190           0 :                 errs = append(errs, w.runDumpMergedOne(cmd, log)...)
     191           0 :         }
     192           0 :         if len(errs) > 0 {
     193           0 :                 fmt.Fprintln(stderr, "Errors: ")
     194           0 :                 for _, err := range errs {
     195           0 :                         fmt.Fprintf(stderr, "%s\n", err)
     196           0 :                 }
     197             :         }
     198             : }
     199             : 
     200           0 : func (w *walT) runDumpMergedOne(cmd *cobra.Command, ll wal.LogicalLog) []error {
     201           0 :         stdout, stderr := cmd.OutOrStdout(), cmd.OutOrStderr()
     202           0 :         var buf bytes.Buffer
     203           0 :         var errs []error
     204           0 :         var b pebble.Batch
     205           0 : 
     206           0 :         logErr := func(offset wal.Offset, err error) {
     207           0 :                 err = errors.Wrapf(err, "offset %s: ", offset)
     208           0 :                 fmt.Fprintln(stderr, err)
     209           0 :                 errs = append(errs, err)
     210           0 :         }
     211             : 
     212           0 :         rr := ll.OpenForRead()
     213           0 :         for {
     214           0 :                 buf.Reset()
     215           0 :                 r, offset, err := rr.NextRecord()
     216           0 :                 if err == nil {
     217           0 :                         _, err = io.Copy(&buf, r)
     218           0 :                 }
     219           0 :                 if err != nil {
     220           0 :                         // It is common to encounter a zeroed or invalid chunk due to WAL
     221           0 :                         // preallocation and WAL recycling. We need to distinguish these
     222           0 :                         // errors from EOF in order to recognize that the record was
     223           0 :                         // truncated and to avoid replaying subsequent WALs, but want
     224           0 :                         // to otherwise treat them like EOF.
     225           0 :                         if err == io.EOF {
     226           0 :                                 break
     227           0 :                         } else if record.IsInvalidRecord(err) {
     228           0 :                                 break
     229             :                         }
     230           0 :                         return append(errs, err)
     231             :                 }
     232           0 :                 if buf.Len() < batchrepr.HeaderLen {
     233           0 :                         logErr(offset, errors.Newf("%d-byte batch too short", buf.Len()))
     234           0 :                         continue
     235             :                 }
     236           0 :                 b = pebble.Batch{}
     237           0 :                 if err := b.SetRepr(buf.Bytes()); err != nil {
     238           0 :                         logErr(offset, errors.Newf("unable to parse batch: %x", buf.Bytes()))
     239           0 :                         continue
     240             :                 }
     241           0 :                 fmt.Fprintf(stdout, "%s(%d) seq=%d count=%d, len=%d\n",
     242           0 :                         offset, len(b.Repr()), b.SeqNum(), b.Count(), buf.Len())
     243           0 :                 w.dumpBatch(stdout, &b, b.Reader(), func(err error) {
     244           0 :                         logErr(offset, err)
     245           0 :                 })
     246             :         }
     247           0 :         return nil
     248             : }
     249             : 
     250             : func (w *walT) dumpBatch(
     251             :         stdout io.Writer, b *pebble.Batch, r batchrepr.Reader, logErr func(error),
     252           1 : ) {
     253           1 :         for idx := 0; ; idx++ {
     254           1 :                 kind, ukey, value, ok, err := r.Next()
     255           1 :                 if !ok {
     256           1 :                         if err != nil {
     257           0 :                                 logErr(errors.Newf("unable to decode %d'th key in batch; %s", idx, err))
     258           0 :                         }
     259           1 :                         break
     260             :                 }
     261           1 :                 fmt.Fprintf(stdout, "    %s(", kind)
     262           1 :                 switch kind {
     263           1 :                 case base.InternalKeyKindDelete:
     264           1 :                         fmt.Fprintf(stdout, "%s", w.fmtKey.fn(ukey))
     265           1 :                 case base.InternalKeyKindSet:
     266           1 :                         fmt.Fprintf(stdout, "%s,%s", w.fmtKey.fn(ukey), w.fmtValue.fn(ukey, value))
     267           0 :                 case base.InternalKeyKindMerge:
     268           0 :                         fmt.Fprintf(stdout, "%s,%s", w.fmtKey.fn(ukey), w.fmtValue.fn(ukey, value))
     269           0 :                 case base.InternalKeyKindLogData:
     270           0 :                         fmt.Fprintf(stdout, "<%d>", len(value))
     271           0 :                 case base.InternalKeyKindIngestSST:
     272           0 :                         fileNum, _ := binary.Uvarint(ukey)
     273           0 :                         fmt.Fprintf(stdout, "%s", base.FileNum(fileNum))
     274           0 :                 case base.InternalKeyKindExcise:
     275           0 :                         fmt.Fprintf(stdout, "%s,%s", w.fmtKey.fn(ukey), w.fmtKey.fn(value))
     276           0 :                 case base.InternalKeyKindSingleDelete:
     277           0 :                         fmt.Fprintf(stdout, "%s", w.fmtKey.fn(ukey))
     278           0 :                 case base.InternalKeyKindSetWithDelete:
     279           0 :                         fmt.Fprintf(stdout, "%s", w.fmtKey.fn(ukey))
     280           0 :                 case base.InternalKeyKindRangeDelete:
     281           0 :                         fmt.Fprintf(stdout, "%s,%s", w.fmtKey.fn(ukey), w.fmtKey.fn(value))
     282           1 :                 case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
     283           1 :                         ik := base.MakeInternalKey(ukey, b.SeqNum()+base.SeqNum(idx), kind)
     284           1 :                         s, err := rangekey.Decode(ik, value, nil)
     285           1 :                         if err != nil {
     286           0 :                                 logErr(errors.Newf("%s: error decoding %s", w.fmtKey.fn(ukey), err))
     287           1 :                         } else {
     288           1 :                                 fmt.Fprintf(stdout, "%s", s.Pretty(w.fmtKey.fn))
     289           1 :                         }
     290           0 :                 case base.InternalKeyKindDeleteSized:
     291           0 :                         v, _ := binary.Uvarint(value)
     292           0 :                         fmt.Fprintf(stdout, "%s,%d", w.fmtKey.fn(ukey), v)
     293           0 :                 default:
     294           0 :                         err := errors.Newf("invalid key kind %d in key at index %d/%d of batch with seqnum %d at offset %d",
     295           0 :                                 kind, idx, b.Count(), b.SeqNum())
     296           0 :                         fmt.Fprintf(stdout, "<error: %s>", err)
     297           0 :                         logErr(err)
     298             :                 }
     299           1 :                 fmt.Fprintf(stdout, ")\n")
     300             :         }
     301             : }

Generated by: LCOV version 1.14