LCOV - code coverage report
Current view: top level - pebble/tool - find.go (source / functions) Hit Total Coverage
Test: 2024-04-24 08:17Z 28ba8053 - tests only.lcov Lines: 402 472 85.2 %
Date: 2024-04-24 08:18:51 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package tool
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "cmp"
      10             :         "fmt"
      11             :         "io"
      12             :         "path/filepath"
      13             :         "slices"
      14             :         "sort"
      15             : 
      16             :         "github.com/cockroachdb/pebble"
      17             :         "github.com/cockroachdb/pebble/internal/base"
      18             :         "github.com/cockroachdb/pebble/internal/keyspan"
      19             :         "github.com/cockroachdb/pebble/internal/manifest"
      20             :         "github.com/cockroachdb/pebble/internal/private"
      21             :         "github.com/cockroachdb/pebble/internal/rangedel"
      22             :         "github.com/cockroachdb/pebble/record"
      23             :         "github.com/cockroachdb/pebble/sstable"
      24             :         "github.com/cockroachdb/pebble/wal"
      25             :         "github.com/spf13/cobra"
      26             : )
      27             : 
      28             : type findRef struct {
      29             :         key      base.InternalKey
      30             :         value    []byte
      31             :         fileNum  base.FileNum
      32             :         filename string
      33             : }
      34             : 
      35             : // findT implements the find tool.
      36             : //
      37             : // TODO(bananabrick): Add support for virtual sstables in this tool. Currently,
      38             : // the tool will work because we're parsing files from disk, so virtual sstables
      39             : // will never be added to findT.tables. The manifest could contain information
      40             : // about virtual sstables. This is fine because the manifest is only used to
      41             : // compute the findT.editRefs, and editRefs is only used if a file in
      42             : // findT.tables contains a key. Of course, the tool won't be completely
      43             : // accurate without dealing with virtual sstable case.
      44             : type findT struct {
      45             :         Root *cobra.Command
      46             : 
      47             :         // Configuration.
      48             :         opts      *pebble.Options
      49             :         comparers sstable.Comparers
      50             :         mergers   sstable.Mergers
      51             : 
      52             :         // Flags.
      53             :         comparerName string
      54             :         fmtKey       keyFormatter
      55             :         fmtValue     valueFormatter
      56             :         verbose      bool
      57             : 
      58             :         // Map from file num to version edit index which references the file num.
      59             :         editRefs map[base.DiskFileNum][]int
      60             :         // List of version edits.
      61             :         edits []manifest.VersionEdit
      62             :         // List of WAL files sorted by disk file num.
      63             :         logs []wal.LogicalLog
      64             :         // List of manifest files sorted by disk file num.
      65             :         manifests []fileLoc
      66             :         // List of table files sorted by disk file num.
      67             :         tables []fileLoc
      68             :         // Set of tables that contains references to the search key.
      69             :         tableRefs map[base.FileNum]bool
      70             :         // Map from file num to table metadata.
      71             :         tableMeta map[base.FileNum]*manifest.FileMetadata
      72             :         // List of error messages for SSTables that could not be decoded.
      73             :         errors []string
      74             : }
      75             : 
      76             : type fileLoc struct {
      77             :         base.DiskFileNum
      78             :         path string
      79             : }
      80             : 
      81           1 : func cmpFileLoc(a, b fileLoc) int {
      82           1 :         return cmp.Compare(a.DiskFileNum, b.DiskFileNum)
      83           1 : }
      84             : 
      85             : func newFind(
      86             :         opts *pebble.Options,
      87             :         comparers sstable.Comparers,
      88             :         defaultComparer string,
      89             :         mergers sstable.Mergers,
      90           1 : ) *findT {
      91           1 :         f := &findT{
      92           1 :                 opts:      opts,
      93           1 :                 comparers: comparers,
      94           1 :                 mergers:   mergers,
      95           1 :         }
      96           1 :         f.fmtKey.mustSet("quoted")
      97           1 :         f.fmtValue.mustSet("[%x]")
      98           1 : 
      99           1 :         f.Root = &cobra.Command{
     100           1 :                 Use:   "find <dir> <key>",
     101           1 :                 Short: "find references to the specified key",
     102           1 :                 Long: `
     103           1 : Find references to the specified key and any range tombstones that contain the
     104           1 : key. This includes references to the key in WAL files and sstables, and the
     105           1 : provenance of the sstables (flushed, ingested, compacted).
     106           1 : `,
     107           1 :                 Args: cobra.ExactArgs(2),
     108           1 :                 Run:  f.run,
     109           1 :         }
     110           1 : 
     111           1 :         f.Root.Flags().BoolVarP(
     112           1 :                 &f.verbose, "verbose", "v", false, "verbose output")
     113           1 :         f.Root.Flags().StringVar(
     114           1 :                 &f.comparerName, "comparer", defaultComparer, "comparer name")
     115           1 :         f.Root.Flags().Var(
     116           1 :                 &f.fmtKey, "key", "key formatter")
     117           1 :         f.Root.Flags().Var(
     118           1 :                 &f.fmtValue, "value", "value formatter")
     119           1 :         return f
     120           1 : }
     121             : 
     122           1 : func (f *findT) run(cmd *cobra.Command, args []string) {
     123           1 :         stdout, stderr := cmd.OutOrStdout(), cmd.OutOrStderr()
     124           1 :         var key key
     125           1 :         if err := key.Set(args[1]); err != nil {
     126           0 :                 fmt.Fprintf(stdout, "%s\n", err)
     127           0 :                 return
     128           0 :         }
     129             : 
     130           1 :         if err := f.findFiles(stdout, stderr, args[0]); err != nil {
     131           1 :                 fmt.Fprintf(stdout, "%s\n", err)
     132           1 :                 return
     133           1 :         }
     134           1 :         f.readManifests(stdout)
     135           1 : 
     136           1 :         f.opts.Comparer = f.comparers[f.comparerName]
     137           1 :         if f.opts.Comparer == nil {
     138           0 :                 fmt.Fprintf(stderr, "unknown comparer %q", f.comparerName)
     139           0 :                 return
     140           0 :         }
     141           1 :         f.fmtKey.setForComparer(f.opts.Comparer.Name, f.comparers)
     142           1 :         f.fmtValue.setForComparer(f.opts.Comparer.Name, f.comparers)
     143           1 : 
     144           1 :         refs := f.search(stdout, key)
     145           1 :         var lastFilename string
     146           1 :         for i := range refs {
     147           1 :                 r := &refs[i]
     148           1 :                 if lastFilename != r.filename {
     149           1 :                         lastFilename = r.filename
     150           1 :                         fmt.Fprintf(stdout, "%s", r.filename)
     151           1 :                         if m := f.tableMeta[r.fileNum]; m != nil {
     152           1 :                                 fmt.Fprintf(stdout, " ")
     153           1 :                                 formatKeyRange(stdout, f.fmtKey, &m.Smallest, &m.Largest)
     154           1 :                         }
     155           1 :                         fmt.Fprintf(stdout, "\n")
     156           1 :                         if p := f.tableProvenance(r.fileNum); p != "" {
     157           1 :                                 fmt.Fprintf(stdout, "    (%s)\n", p)
     158           1 :                         }
     159             :                 }
     160           1 :                 fmt.Fprintf(stdout, "    ")
     161           1 :                 formatKeyValue(stdout, f.fmtKey, f.fmtValue, &r.key, r.value)
     162             :         }
     163             : 
     164           1 :         for _, errorMsg := range f.errors {
     165           1 :                 fmt.Fprint(stdout, errorMsg)
     166           1 :         }
     167             : }
     168             : 
     169             : // Find all of the manifests, logs, and tables in the specified directory.
     170           1 : func (f *findT) findFiles(stdout, stderr io.Writer, dir string) error {
     171           1 :         f.editRefs = make(map[base.DiskFileNum][]int)
     172           1 :         f.logs = nil
     173           1 :         f.manifests = nil
     174           1 :         f.tables = nil
     175           1 :         f.tableMeta = make(map[base.FileNum]*manifest.FileMetadata)
     176           1 : 
     177           1 :         if _, err := f.opts.FS.Stat(dir); err != nil {
     178           1 :                 return err
     179           1 :         }
     180             : 
     181           1 :         var walAccumulator wal.FileAccumulator
     182           1 :         walk(stderr, f.opts.FS, dir, func(path string) {
     183           1 :                 if isLogFile, err := walAccumulator.MaybeAccumulate(f.opts.FS, path); err != nil {
     184           0 :                         fmt.Fprintf(stderr, "%s: %v\n", path, err)
     185           0 :                         return
     186           1 :                 } else if isLogFile {
     187           1 :                         return
     188           1 :                 }
     189             : 
     190           1 :                 ft, fileNum, ok := base.ParseFilename(f.opts.FS, path)
     191           1 :                 if !ok {
     192           1 :                         return
     193           1 :                 }
     194           1 :                 fl := fileLoc{DiskFileNum: fileNum, path: path}
     195           1 :                 switch ft {
     196           1 :                 case base.FileTypeManifest:
     197           1 :                         f.manifests = append(f.manifests, fl)
     198           1 :                 case base.FileTypeTable:
     199           1 :                         f.tables = append(f.tables, fl)
     200           1 :                 default:
     201           1 :                         return
     202             :                 }
     203             :         })
     204             : 
     205             :         // TODO(jackson): Provide a means of scanning the secondary WAL directory
     206             :         // too.
     207             : 
     208           1 :         f.logs = walAccumulator.Finish()
     209           1 :         slices.SortFunc(f.manifests, cmpFileLoc)
     210           1 :         slices.SortFunc(f.tables, cmpFileLoc)
     211           1 : 
     212           1 :         if f.verbose {
     213           1 :                 fmt.Fprintf(stdout, "%s\n", dir)
     214           1 :                 fmt.Fprintf(stdout, "%5d %s\n", len(f.manifests), makePlural("manifest", int64(len(f.manifests))))
     215           1 :                 fmt.Fprintf(stdout, "%5d %s\n", len(f.logs), makePlural("log", int64(len(f.logs))))
     216           1 :                 fmt.Fprintf(stdout, "%5d %s\n", len(f.tables), makePlural("sstable", int64(len(f.tables))))
     217           1 :         }
     218           1 :         return nil
     219             : }
     220             : 
     221             : // Read the manifests and populate the editRefs map which is used to determine
     222             : // the provenance and metadata of tables.
     223           1 : func (f *findT) readManifests(stdout io.Writer) {
     224           1 :         for _, fl := range f.manifests {
     225           1 :                 func() {
     226           1 :                         mf, err := f.opts.FS.Open(fl.path)
     227           1 :                         if err != nil {
     228           0 :                                 fmt.Fprintf(stdout, "%s\n", err)
     229           0 :                                 return
     230           0 :                         }
     231           1 :                         defer mf.Close()
     232           1 : 
     233           1 :                         if f.verbose {
     234           1 :                                 fmt.Fprintf(stdout, "%s\n", fl.path)
     235           1 :                         }
     236             : 
     237           1 :                         rr := record.NewReader(mf, 0 /* logNum */)
     238           1 :                         for {
     239           1 :                                 r, err := rr.Next()
     240           1 :                                 if err != nil {
     241           1 :                                         if err != io.EOF {
     242           0 :                                                 fmt.Fprintf(stdout, "%s: %s\n", fl.path, err)
     243           0 :                                         }
     244           1 :                                         break
     245             :                                 }
     246             : 
     247           1 :                                 var ve manifest.VersionEdit
     248           1 :                                 if err := ve.Decode(r); err != nil {
     249           0 :                                         fmt.Fprintf(stdout, "%s: %s\n", fl.path, err)
     250           0 :                                         break
     251             :                                 }
     252           1 :                                 i := len(f.edits)
     253           1 :                                 f.edits = append(f.edits, ve)
     254           1 : 
     255           1 :                                 if ve.ComparerName != "" {
     256           1 :                                         f.comparerName = ve.ComparerName
     257           1 :                                 }
     258           1 :                                 if num := ve.MinUnflushedLogNum; num != 0 {
     259           1 :                                         f.editRefs[num] = append(f.editRefs[num], i)
     260           1 :                                 }
     261           1 :                                 for df := range ve.DeletedFiles {
     262           1 :                                         diskFileNum := base.PhysicalTableDiskFileNum(df.FileNum)
     263           1 :                                         f.editRefs[diskFileNum] = append(f.editRefs[diskFileNum], i)
     264           1 :                                 }
     265           1 :                                 for _, nf := range ve.NewFiles {
     266           1 :                                         // The same file can be deleted and added in a single version edit
     267           1 :                                         // which indicates a "move" compaction. Only add the edit to the list
     268           1 :                                         // once.
     269           1 :                                         diskFileNum := base.PhysicalTableDiskFileNum(nf.Meta.FileNum)
     270           1 :                                         refs := f.editRefs[diskFileNum]
     271           1 :                                         if n := len(refs); n == 0 || refs[n-1] != i {
     272           1 :                                                 f.editRefs[diskFileNum] = append(refs, i)
     273           1 :                                         }
     274           1 :                                         if _, ok := f.tableMeta[nf.Meta.FileNum]; !ok {
     275           1 :                                                 f.tableMeta[nf.Meta.FileNum] = nf.Meta
     276           1 :                                         }
     277             :                                 }
     278             :                         }
     279             :                 }()
     280             :         }
     281             : 
     282           1 :         if f.verbose {
     283           1 :                 fmt.Fprintf(stdout, "%5d %s\n", len(f.edits), makePlural("edit", int64(len(f.edits))))
     284           1 :         }
     285             : }
     286             : 
     287             : // Search the logs and sstables for references to the specified key.
     288           1 : func (f *findT) search(stdout io.Writer, key []byte) []findRef {
     289           1 :         refs := f.searchLogs(stdout, key, nil)
     290           1 :         refs = f.searchTables(stdout, key, refs)
     291           1 : 
     292           1 :         // For a given file (log or table) the references are already in the correct
     293           1 :         // order. We simply want to order the references by fileNum using a stable
     294           1 :         // sort.
     295           1 :         //
     296           1 :         // TODO(peter): I'm not sure if this is perfectly correct with regards to log
     297           1 :         // files and ingested sstables, but it is close enough and doing something
     298           1 :         // better is onerous. Revisit if this ever becomes problematic (e.g. if we
     299           1 :         // allow finding more than one key at a time).
     300           1 :         //
     301           1 :         // An example of the problem with logs and ingestion (which can only occur
     302           1 :         // with distinct keys). If I write key "a" to a log, I can then ingested key
     303           1 :         // "b" without causing "a" to be flushed. Then I can write key "c" to the
     304           1 :         // log. Ideally, we'd show the key "a" from the log, then the key "b" from
     305           1 :         // the ingested sstable, then key "c" from the log.
     306           1 :         slices.SortStableFunc(refs, func(a, b findRef) int {
     307           1 :                 if v := cmp.Compare(a.fileNum, b.fileNum); v != 0 {
     308           1 :                         return v
     309           1 :                 }
     310           1 :                 return cmp.Compare(a.filename, b.filename)
     311             :         })
     312           1 :         return refs
     313             : }
     314             : 
     315             : // Search the logs for references to the specified key.
     316           1 : func (f *findT) searchLogs(stdout io.Writer, searchKey []byte, refs []findRef) []findRef {
     317           1 :         cmp := f.opts.Comparer.Compare
     318           1 :         for _, ll := range f.logs {
     319           1 :                 _ = func() (err error) {
     320           1 :                         rr := ll.OpenForRead()
     321           1 :                         if f.verbose {
     322           1 :                                 fmt.Fprintf(stdout, "%s", ll)
     323           1 :                                 defer fmt.Fprintf(stdout, "\n")
     324           1 :                         }
     325           1 :                         defer func() {
     326           1 :                                 switch err {
     327           0 :                                 case record.ErrZeroedChunk:
     328           0 :                                         if f.verbose {
     329           0 :                                                 fmt.Fprintf(stdout, ": EOF [%s] (may be due to WAL preallocation)", err)
     330           0 :                                         }
     331           0 :                                 case record.ErrInvalidChunk:
     332           0 :                                         if f.verbose {
     333           0 :                                                 fmt.Fprintf(stdout, ": EOF [%s] (may be due to WAL recycling)", err)
     334           0 :                                         }
     335           1 :                                 default:
     336           1 :                                         if err != io.EOF {
     337           0 :                                                 if f.verbose {
     338           0 :                                                         fmt.Fprintf(stdout, ": %s", err)
     339           0 :                                                 } else {
     340           0 :                                                         fmt.Fprintf(stdout, "%s: %s\n", ll, err)
     341           0 :                                                 }
     342             :                                         }
     343             :                                 }
     344             :                         }()
     345             : 
     346           1 :                         var b pebble.Batch
     347           1 :                         var buf bytes.Buffer
     348           1 :                         for {
     349           1 :                                 r, off, err := rr.NextRecord()
     350           1 :                                 if err == nil {
     351           1 :                                         buf.Reset()
     352           1 :                                         _, err = io.Copy(&buf, r)
     353           1 :                                 }
     354           1 :                                 if err != nil {
     355           1 :                                         return err
     356           1 :                                 }
     357             : 
     358           1 :                                 b = pebble.Batch{}
     359           1 :                                 if err := b.SetRepr(buf.Bytes()); err != nil {
     360           0 :                                         fmt.Fprintf(stdout, "%s: corrupt log file: %v", ll, err)
     361           0 :                                         continue
     362             :                                 }
     363           1 :                                 seqNum := b.SeqNum()
     364           1 :                                 for r := b.Reader(); ; seqNum++ {
     365           1 :                                         kind, ukey, value, ok, err := r.Next()
     366           1 :                                         if !ok {
     367           1 :                                                 if err != nil {
     368           0 :                                                         fmt.Fprintf(stdout, "%s: corrupt log file: %v", ll, err)
     369           0 :                                                         break
     370             :                                                 }
     371           1 :                                                 break
     372             :                                         }
     373           1 :                                         ikey := base.MakeInternalKey(ukey, seqNum, kind)
     374           1 :                                         switch kind {
     375             :                                         case base.InternalKeyKindDelete,
     376             :                                                 base.InternalKeyKindDeleteSized,
     377             :                                                 base.InternalKeyKindSet,
     378             :                                                 base.InternalKeyKindMerge,
     379             :                                                 base.InternalKeyKindSingleDelete,
     380           1 :                                                 base.InternalKeyKindSetWithDelete:
     381           1 :                                                 if cmp(searchKey, ikey.UserKey) != 0 {
     382           1 :                                                         continue
     383             :                                                 }
     384           1 :                                         case base.InternalKeyKindRangeDelete:
     385           1 :                                                 // Output tombstones that contain or end with the search key.
     386           1 :                                                 t := rangedel.Decode(ikey, value, nil)
     387           1 :                                                 if !t.Contains(cmp, searchKey) && cmp(t.End, searchKey) != 0 {
     388           1 :                                                         continue
     389             :                                                 }
     390           0 :                                         default:
     391           0 :                                                 continue
     392             :                                         }
     393             : 
     394           1 :                                         refs = append(refs, findRef{
     395           1 :                                                 key:      ikey.Clone(),
     396           1 :                                                 value:    append([]byte(nil), value...),
     397           1 :                                                 fileNum:  base.FileNum(ll.Num),
     398           1 :                                                 filename: filepath.Base(off.PhysicalFile),
     399           1 :                                         })
     400             :                                 }
     401             :                         }
     402             :                 }()
     403             :         }
     404           1 :         return refs
     405             : }
     406             : 
     407             : // Search the tables for references to the specified key.
     408           1 : func (f *findT) searchTables(stdout io.Writer, searchKey []byte, refs []findRef) []findRef {
     409           1 :         cache := pebble.NewCache(128 << 20 /* 128 MB */)
     410           1 :         defer cache.Unref()
     411           1 : 
     412           1 :         f.tableRefs = make(map[base.FileNum]bool)
     413           1 :         for _, fl := range f.tables {
     414           1 :                 _ = func() (err error) {
     415           1 :                         tf, err := f.opts.FS.Open(fl.path)
     416           1 :                         if err != nil {
     417           0 :                                 fmt.Fprintf(stdout, "%s\n", err)
     418           0 :                                 return
     419           0 :                         }
     420             : 
     421           1 :                         m := f.tableMeta[base.PhysicalTableFileNum(fl.DiskFileNum)]
     422           1 :                         if f.verbose {
     423           1 :                                 fmt.Fprintf(stdout, "%s", fl.path)
     424           1 :                                 if m != nil && m.SmallestSeqNum == m.LargestSeqNum {
     425           1 :                                         fmt.Fprintf(stdout, ": global seqnum: %d", m.LargestSeqNum)
     426           1 :                                 }
     427           1 :                                 defer fmt.Fprintf(stdout, "\n")
     428             :                         }
     429           1 :                         defer func() {
     430           1 :                                 switch {
     431           0 :                                 case err != nil:
     432           0 :                                         if f.verbose {
     433           0 :                                                 fmt.Fprintf(stdout, ": %v", err)
     434           0 :                                         } else {
     435           0 :                                                 fmt.Fprintf(stdout, "%s: %v\n", fl.path, err)
     436           0 :                                         }
     437             :                                 }
     438             :                         }()
     439             : 
     440           1 :                         opts := sstable.ReaderOptions{
     441           1 :                                 Cache:    cache,
     442           1 :                                 Comparer: f.opts.Comparer,
     443           1 :                                 Filters:  f.opts.Filters,
     444           1 :                         }
     445           1 :                         readable, err := sstable.NewSimpleReadable(tf)
     446           1 :                         if err != nil {
     447           0 :                                 return err
     448           0 :                         }
     449           1 :                         r, err := sstable.NewReader(readable, opts, f.comparers, f.mergers,
     450           1 :                                 private.SSTableRawTombstonesOpt.(sstable.ReaderOption))
     451           1 :                         if err != nil {
     452           1 :                                 f.errors = append(f.errors, fmt.Sprintf("Unable to decode sstable %s, %s", fl.path, err.Error()))
     453           1 :                                 // Ensure the error only gets printed once.
     454           1 :                                 err = nil
     455           1 :                                 return
     456           1 :                         }
     457           1 :                         defer r.Close()
     458           1 : 
     459           1 :                         var transforms sstable.IterTransforms
     460           1 :                         if m != nil {
     461           1 :                                 transforms = m.IterTransforms()
     462           1 :                         }
     463             : 
     464           1 :                         iter, err := r.NewIter(transforms, nil, nil)
     465           1 :                         if err != nil {
     466           0 :                                 return err
     467           0 :                         }
     468           1 :                         defer iter.Close()
     469           1 :                         kv := iter.SeekGE(searchKey, base.SeekGEFlagsNone)
     470           1 : 
     471           1 :                         // We configured sstable.Reader to return raw tombstones which requires a
     472           1 :                         // bit more work here to put them in a form that can be iterated in
     473           1 :                         // parallel with the point records.
     474           1 :                         rangeDelIter, err := func() (keyspan.FragmentIterator, error) {
     475           1 :                                 iter, err := r.NewRawRangeDelIter(transforms)
     476           1 :                                 if err != nil {
     477           0 :                                         return nil, err
     478           0 :                                 }
     479           1 :                                 if iter == nil {
     480           1 :                                         return keyspan.NewIter(r.Compare, nil), nil
     481           1 :                                 }
     482           1 :                                 defer iter.Close()
     483           1 : 
     484           1 :                                 var tombstones []keyspan.Span
     485           1 :                                 t, err := iter.First()
     486           1 :                                 for ; t != nil; t, err = iter.Next() {
     487           1 :                                         if !t.Contains(r.Compare, searchKey) {
     488           1 :                                                 continue
     489             :                                         }
     490           1 :                                         tombstones = append(tombstones, t.ShallowClone())
     491             :                                 }
     492           1 :                                 if err != nil {
     493           0 :                                         return nil, err
     494           0 :                                 }
     495             : 
     496           1 :                                 slices.SortFunc(tombstones, func(a, b keyspan.Span) int {
     497           0 :                                         return r.Compare(a.Start, b.Start)
     498           0 :                                 })
     499           1 :                                 return keyspan.NewIter(r.Compare, tombstones), nil
     500             :                         }()
     501           1 :                         if err != nil {
     502           0 :                                 return err
     503           0 :                         }
     504             : 
     505           1 :                         defer rangeDelIter.Close()
     506           1 :                         rangeDel, err := rangeDelIter.First()
     507           1 :                         if err != nil {
     508           0 :                                 return err
     509           0 :                         }
     510             : 
     511           1 :                         foundRef := false
     512           1 :                         for kv != nil || rangeDel != nil {
     513           1 :                                 if kv != nil &&
     514           1 :                                         (rangeDel == nil || r.Compare(kv.K.UserKey, rangeDel.Start) < 0) {
     515           1 :                                         if r.Compare(searchKey, kv.K.UserKey) != 0 {
     516           1 :                                                 kv = nil
     517           1 :                                                 continue
     518             :                                         }
     519           1 :                                         v, _, err := kv.Value(nil)
     520           1 :                                         if err != nil {
     521           0 :                                                 return err
     522           0 :                                         }
     523           1 :                                         refs = append(refs, findRef{
     524           1 :                                                 key:      kv.K.Clone(),
     525           1 :                                                 value:    slices.Clone(v),
     526           1 :                                                 fileNum:  base.PhysicalTableFileNum(fl.DiskFileNum),
     527           1 :                                                 filename: filepath.Base(fl.path),
     528           1 :                                         })
     529           1 :                                         kv = iter.Next()
     530           1 :                                 } else {
     531           1 :                                         // Use rangedel.Encode to add a reference for each key
     532           1 :                                         // within the span.
     533           1 :                                         err := rangedel.Encode(rangeDel, func(k base.InternalKey, v []byte) error {
     534           1 :                                                 refs = append(refs, findRef{
     535           1 :                                                         key:      k.Clone(),
     536           1 :                                                         value:    slices.Clone(v),
     537           1 :                                                         fileNum:  base.PhysicalTableFileNum(fl.DiskFileNum),
     538           1 :                                                         filename: filepath.Base(fl.path),
     539           1 :                                                 })
     540           1 :                                                 return nil
     541           1 :                                         })
     542           1 :                                         if err != nil {
     543           0 :                                                 return err
     544           0 :                                         }
     545           1 :                                         rangeDel, err = rangeDelIter.Next()
     546           1 :                                         if err != nil {
     547           0 :                                                 return err
     548           0 :                                         }
     549             :                                 }
     550           1 :                                 foundRef = true
     551             :                         }
     552             : 
     553           1 :                         if foundRef {
     554           1 :                                 f.tableRefs[base.PhysicalTableFileNum(fl.DiskFileNum)] = true
     555           1 :                         }
     556           1 :                         return nil
     557             :                 }()
     558             :         }
     559           1 :         return refs
     560             : }
     561             : 
     562             : // Determine the provenance of the specified table. We search the version edits
     563             : // for the first edit which created the table, and then analyze the edit to
     564             : // determine if it was a compaction, flush, or ingestion. Returns an empty
     565             : // string if the provenance of a table cannot be determined.
     566           1 : func (f *findT) tableProvenance(fileNum base.FileNum) string {
     567           1 :         editRefs := f.editRefs[base.PhysicalTableDiskFileNum(fileNum)]
     568           1 :         for len(editRefs) > 0 {
     569           1 :                 ve := f.edits[editRefs[0]]
     570           1 :                 editRefs = editRefs[1:]
     571           1 :                 for _, nf := range ve.NewFiles {
     572           1 :                         if fileNum != nf.Meta.FileNum {
     573           1 :                                 continue
     574             :                         }
     575             : 
     576           1 :                         var buf bytes.Buffer
     577           1 :                         switch {
     578           1 :                         case len(ve.DeletedFiles) > 0:
     579           1 :                                 // A version edit with deleted files is a compaction. The deleted
     580           1 :                                 // files are the inputs to the compaction. We're going to
     581           1 :                                 // reconstruct the input files and display those inputs that
     582           1 :                                 // contain the search key (i.e. are list in refs) and use an
     583           1 :                                 // ellipsis to indicate when there were other inputs that have
     584           1 :                                 // been elided.
     585           1 :                                 var sourceLevels []int
     586           1 :                                 levels := make(map[int][]base.FileNum)
     587           1 :                                 for df := range ve.DeletedFiles {
     588           1 :                                         files := levels[df.Level]
     589           1 :                                         if len(files) == 0 {
     590           1 :                                                 sourceLevels = append(sourceLevels, df.Level)
     591           1 :                                         }
     592           1 :                                         levels[df.Level] = append(files, df.FileNum)
     593             :                                 }
     594             : 
     595           1 :                                 sort.Ints(sourceLevels)
     596           1 :                                 if sourceLevels[len(sourceLevels)-1] != nf.Level {
     597           0 :                                         sourceLevels = append(sourceLevels, nf.Level)
     598           0 :                                 }
     599             : 
     600           1 :                                 sep := " "
     601           1 :                                 fmt.Fprintf(&buf, "compacted")
     602           1 :                                 for _, level := range sourceLevels {
     603           1 :                                         files := levels[level]
     604           1 :                                         slices.Sort(files)
     605           1 : 
     606           1 :                                         fmt.Fprintf(&buf, "%sL%d [", sep, level)
     607           1 :                                         sep = ""
     608           1 :                                         elided := false
     609           1 :                                         for _, fileNum := range files {
     610           1 :                                                 if f.tableRefs[fileNum] {
     611           1 :                                                         fmt.Fprintf(&buf, "%s%s", sep, fileNum)
     612           1 :                                                         sep = " "
     613           1 :                                                 } else {
     614           1 :                                                         elided = true
     615           1 :                                                 }
     616             :                                         }
     617           1 :                                         if elided {
     618           1 :                                                 fmt.Fprintf(&buf, "%s...", sep)
     619           1 :                                         }
     620           1 :                                         fmt.Fprintf(&buf, "]")
     621           1 :                                         sep = " + "
     622             :                                 }
     623             : 
     624           1 :                         case ve.MinUnflushedLogNum != 0:
     625           1 :                                 // A version edit with a min-unflushed log indicates a flush
     626           1 :                                 // operation.
     627           1 :                                 fmt.Fprintf(&buf, "flushed to L%d", nf.Level)
     628             : 
     629           1 :                         case nf.Meta.SmallestSeqNum == nf.Meta.LargestSeqNum:
     630           1 :                                 // If the smallest and largest seqnum are the same, the file was
     631           1 :                                 // ingested. Note that this can also occur for a flushed sstable
     632           1 :                                 // that contains only a single key, though that would have
     633           1 :                                 // already been captured above.
     634           1 :                                 fmt.Fprintf(&buf, "ingested to L%d", nf.Level)
     635             : 
     636           0 :                         default:
     637           0 :                                 // The provenance of the table is unclear. This is usually due to
     638           0 :                                 // the MANIFEST rolling over and taking a snapshot of the LSM
     639           0 :                                 // state.
     640           0 :                                 fmt.Fprintf(&buf, "added to L%d", nf.Level)
     641             :                         }
     642             : 
     643             :                         // After a table is created, it can be moved to a different level via a
     644             :                         // move compaction. This is indicated by a version edit that deletes the
     645             :                         // table from one level and adds the same table to a different
     646             :                         // level. Loop over the remaining version edits for the table looking for
     647             :                         // such moves.
     648           1 :                         for len(editRefs) > 0 {
     649           1 :                                 ve := f.edits[editRefs[0]]
     650           1 :                                 editRefs = editRefs[1:]
     651           1 :                                 for _, nf := range ve.NewFiles {
     652           1 :                                         if fileNum == nf.Meta.FileNum {
     653           1 :                                                 for df := range ve.DeletedFiles {
     654           1 :                                                         if fileNum == df.FileNum {
     655           1 :                                                                 fmt.Fprintf(&buf, ", moved to L%d", nf.Level)
     656           1 :                                                                 break
     657             :                                                         }
     658             :                                                 }
     659           1 :                                                 break
     660             :                                         }
     661             :                                 }
     662             :                         }
     663             : 
     664           1 :                         return buf.String()
     665             :                 }
     666             :         }
     667           1 :         return ""
     668             : }

Generated by: LCOV version 1.14