LCOV - code coverage report
Current view: top level - pebble/tool - find.go (source / functions) Hit Total Coverage
Test: 2024-09-06 08:16Z 729b6f72 - tests only.lcov Lines: 410 480 85.4 %
Date: 2024-09-06 08:16:45 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package tool
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "cmp"
      10             :         "context"
      11             :         "fmt"
      12             :         "io"
      13             :         "path/filepath"
      14             :         "slices"
      15             :         "sort"
      16             : 
      17             :         "github.com/cockroachdb/pebble"
      18             :         "github.com/cockroachdb/pebble/internal/base"
      19             :         "github.com/cockroachdb/pebble/internal/keyspan"
      20             :         "github.com/cockroachdb/pebble/internal/manifest"
      21             :         "github.com/cockroachdb/pebble/internal/rangedel"
      22             :         "github.com/cockroachdb/pebble/internal/sstableinternal"
      23             :         "github.com/cockroachdb/pebble/record"
      24             :         "github.com/cockroachdb/pebble/sstable"
      25             :         "github.com/cockroachdb/pebble/wal"
      26             :         "github.com/spf13/cobra"
      27             : )
      28             : 
      29             : type findRef struct {
      30             :         key      base.InternalKey
      31             :         value    []byte
      32             :         fileNum  base.FileNum
      33             :         filename string
      34             : }
      35             : 
      36             : // findT implements the find tool.
      37             : //
      38             : // TODO(bananabrick): Add support for virtual sstables in this tool. Currently,
      39             : // the tool will work because we're parsing files from disk, so virtual sstables
      40             : // will never be added to findT.tables. The manifest could contain information
      41             : // about virtual sstables. This is fine because the manifest is only used to
      42             : // compute the findT.editRefs, and editRefs is only used if a file in
      43             : // findT.tables contains a key. Of course, the tool won't be completely
      44             : // accurate without dealing with virtual sstable case.
      45             : type findT struct {
      46             :         Root *cobra.Command
      47             : 
      48             :         // Configuration.
      49             :         opts      *pebble.Options
      50             :         comparers sstable.Comparers
      51             :         mergers   sstable.Mergers
      52             : 
      53             :         // Flags.
      54             :         comparerName string
      55             :         fmtKey       keyFormatter
      56             :         fmtValue     valueFormatter
      57             :         verbose      bool
      58             : 
      59             :         // Map from file num to version edit index which references the file num.
      60             :         editRefs map[base.DiskFileNum][]int
      61             :         // List of version edits.
      62             :         edits []manifest.VersionEdit
      63             :         // List of WAL files sorted by disk file num.
      64             :         logs []wal.LogicalLog
      65             :         // List of manifest files sorted by disk file num.
      66             :         manifests []fileLoc
      67             :         // List of table files sorted by disk file num.
      68             :         tables []fileLoc
      69             :         // Set of tables that contains references to the search key.
      70             :         tableRefs map[base.FileNum]bool
      71             :         // Map from file num to table metadata.
      72             :         tableMeta map[base.FileNum]*manifest.FileMetadata
      73             :         // List of error messages for SSTables that could not be decoded.
      74             :         errors []string
      75             : }
      76             : 
      77             : type fileLoc struct {
      78             :         base.DiskFileNum
      79             :         path string
      80             : }
      81             : 
      82           1 : func cmpFileLoc(a, b fileLoc) int {
      83           1 :         return cmp.Compare(a.DiskFileNum, b.DiskFileNum)
      84           1 : }
      85             : 
      86             : func newFind(
      87             :         opts *pebble.Options,
      88             :         comparers sstable.Comparers,
      89             :         defaultComparer string,
      90             :         mergers sstable.Mergers,
      91           1 : ) *findT {
      92           1 :         f := &findT{
      93           1 :                 opts:      opts,
      94           1 :                 comparers: comparers,
      95           1 :                 mergers:   mergers,
      96           1 :         }
      97           1 :         f.fmtKey.mustSet("quoted")
      98           1 :         f.fmtValue.mustSet("[%x]")
      99           1 : 
     100           1 :         f.Root = &cobra.Command{
     101           1 :                 Use:   "find <dir> <key>",
     102           1 :                 Short: "find references to the specified key",
     103           1 :                 Long: `
     104           1 : Find references to the specified key and any range tombstones that contain the
     105           1 : key. This includes references to the key in WAL files and sstables, and the
     106           1 : provenance of the sstables (flushed, ingested, compacted).
     107           1 : `,
     108           1 :                 Args: cobra.ExactArgs(2),
     109           1 :                 Run:  f.run,
     110           1 :         }
     111           1 : 
     112           1 :         f.Root.Flags().BoolVarP(
     113           1 :                 &f.verbose, "verbose", "v", false, "verbose output")
     114           1 :         f.Root.Flags().StringVar(
     115           1 :                 &f.comparerName, "comparer", defaultComparer, "comparer name")
     116           1 :         f.Root.Flags().Var(
     117           1 :                 &f.fmtKey, "key", "key formatter")
     118           1 :         f.Root.Flags().Var(
     119           1 :                 &f.fmtValue, "value", "value formatter")
     120           1 :         return f
     121           1 : }
     122             : 
     123           1 : func (f *findT) run(cmd *cobra.Command, args []string) {
     124           1 :         stdout, stderr := cmd.OutOrStdout(), cmd.OutOrStderr()
     125           1 :         var key key
     126           1 :         if err := key.Set(args[1]); err != nil {
     127           0 :                 fmt.Fprintf(stdout, "%s\n", err)
     128           0 :                 return
     129           0 :         }
     130             : 
     131           1 :         if err := f.findFiles(stdout, stderr, args[0]); err != nil {
     132           1 :                 fmt.Fprintf(stdout, "%s\n", err)
     133           1 :                 return
     134           1 :         }
     135           1 :         f.readManifests(stdout)
     136           1 : 
     137           1 :         f.opts.Comparer = f.comparers[f.comparerName]
     138           1 :         if f.opts.Comparer == nil {
     139           0 :                 fmt.Fprintf(stderr, "unknown comparer %q", f.comparerName)
     140           0 :                 return
     141           0 :         }
     142           1 :         f.fmtKey.setForComparer(f.opts.Comparer.Name, f.comparers)
     143           1 :         f.fmtValue.setForComparer(f.opts.Comparer.Name, f.comparers)
     144           1 : 
     145           1 :         refs := f.search(stdout, key)
     146           1 :         var lastFilename string
     147           1 :         for i := range refs {
     148           1 :                 r := &refs[i]
     149           1 :                 if lastFilename != r.filename {
     150           1 :                         lastFilename = r.filename
     151           1 :                         fmt.Fprintf(stdout, "%s", r.filename)
     152           1 :                         if m := f.tableMeta[r.fileNum]; m != nil {
     153           1 :                                 fmt.Fprintf(stdout, " ")
     154           1 :                                 formatKeyRange(stdout, f.fmtKey, &m.Smallest, &m.Largest)
     155           1 :                         }
     156           1 :                         fmt.Fprintf(stdout, "\n")
     157           1 :                         if p := f.tableProvenance(r.fileNum); p != "" {
     158           1 :                                 fmt.Fprintf(stdout, "    (%s)\n", p)
     159           1 :                         }
     160             :                 }
     161           1 :                 fmt.Fprintf(stdout, "    ")
     162           1 :                 formatKeyValue(stdout, f.fmtKey, f.fmtValue, &r.key, r.value)
     163             :         }
     164             : 
     165           1 :         for _, errorMsg := range f.errors {
     166           1 :                 fmt.Fprint(stdout, errorMsg)
     167           1 :         }
     168             : }
     169             : 
     170             : // Find all of the manifests, logs, and tables in the specified directory.
     171           1 : func (f *findT) findFiles(stdout, stderr io.Writer, dir string) error {
     172           1 :         f.editRefs = make(map[base.DiskFileNum][]int)
     173           1 :         f.logs = nil
     174           1 :         f.manifests = nil
     175           1 :         f.tables = nil
     176           1 :         f.tableMeta = make(map[base.FileNum]*manifest.FileMetadata)
     177           1 : 
     178           1 :         if _, err := f.opts.FS.Stat(dir); err != nil {
     179           1 :                 return err
     180           1 :         }
     181             : 
     182           1 :         var walAccumulator wal.FileAccumulator
     183           1 :         walk(stderr, f.opts.FS, dir, func(path string) {
     184           1 :                 if isLogFile, err := walAccumulator.MaybeAccumulate(f.opts.FS, path); err != nil {
     185           0 :                         fmt.Fprintf(stderr, "%s: %v\n", path, err)
     186           0 :                         return
     187           1 :                 } else if isLogFile {
     188           1 :                         return
     189           1 :                 }
     190             : 
     191           1 :                 ft, fileNum, ok := base.ParseFilename(f.opts.FS, path)
     192           1 :                 if !ok {
     193           1 :                         return
     194           1 :                 }
     195           1 :                 fl := fileLoc{DiskFileNum: fileNum, path: path}
     196           1 :                 switch ft {
     197           1 :                 case base.FileTypeManifest:
     198           1 :                         f.manifests = append(f.manifests, fl)
     199           1 :                 case base.FileTypeTable:
     200           1 :                         f.tables = append(f.tables, fl)
     201           1 :                 default:
     202           1 :                         return
     203             :                 }
     204             :         })
     205             : 
     206             :         // TODO(jackson): Provide a means of scanning the secondary WAL directory
     207             :         // too.
     208             : 
     209           1 :         f.logs = walAccumulator.Finish()
     210           1 :         slices.SortFunc(f.manifests, cmpFileLoc)
     211           1 :         slices.SortFunc(f.tables, cmpFileLoc)
     212           1 : 
     213           1 :         if f.verbose {
     214           1 :                 fmt.Fprintf(stdout, "%s\n", dir)
     215           1 :                 fmt.Fprintf(stdout, "%5d %s\n", len(f.manifests), makePlural("manifest", int64(len(f.manifests))))
     216           1 :                 fmt.Fprintf(stdout, "%5d %s\n", len(f.logs), makePlural("log", int64(len(f.logs))))
     217           1 :                 fmt.Fprintf(stdout, "%5d %s\n", len(f.tables), makePlural("sstable", int64(len(f.tables))))
     218           1 :         }
     219           1 :         return nil
     220             : }
     221             : 
     222             : // Read the manifests and populate the editRefs map which is used to determine
     223             : // the provenance and metadata of tables.
     224           1 : func (f *findT) readManifests(stdout io.Writer) {
     225           1 :         for _, fl := range f.manifests {
     226           1 :                 func() {
     227           1 :                         mf, err := f.opts.FS.Open(fl.path)
     228           1 :                         if err != nil {
     229           0 :                                 fmt.Fprintf(stdout, "%s\n", err)
     230           0 :                                 return
     231           0 :                         }
     232           1 :                         defer mf.Close()
     233           1 : 
     234           1 :                         if f.verbose {
     235           1 :                                 fmt.Fprintf(stdout, "%s\n", fl.path)
     236           1 :                         }
     237             : 
     238           1 :                         rr := record.NewReader(mf, 0 /* logNum */)
     239           1 :                         for {
     240           1 :                                 r, err := rr.Next()
     241           1 :                                 if err != nil {
     242           1 :                                         if err != io.EOF {
     243           0 :                                                 fmt.Fprintf(stdout, "%s: %s\n", fl.path, err)
     244           0 :                                         }
     245           1 :                                         break
     246             :                                 }
     247             : 
     248           1 :                                 var ve manifest.VersionEdit
     249           1 :                                 if err := ve.Decode(r); err != nil {
     250           0 :                                         fmt.Fprintf(stdout, "%s: %s\n", fl.path, err)
     251           0 :                                         break
     252             :                                 }
     253           1 :                                 i := len(f.edits)
     254           1 :                                 f.edits = append(f.edits, ve)
     255           1 : 
     256           1 :                                 if ve.ComparerName != "" {
     257           1 :                                         f.comparerName = ve.ComparerName
     258           1 :                                 }
     259           1 :                                 if num := ve.MinUnflushedLogNum; num != 0 {
     260           1 :                                         f.editRefs[num] = append(f.editRefs[num], i)
     261           1 :                                 }
     262           1 :                                 for df := range ve.DeletedFiles {
     263           1 :                                         diskFileNum := base.PhysicalTableDiskFileNum(df.FileNum)
     264           1 :                                         f.editRefs[diskFileNum] = append(f.editRefs[diskFileNum], i)
     265           1 :                                 }
     266           1 :                                 for _, nf := range ve.NewFiles {
     267           1 :                                         // The same file can be deleted and added in a single version edit
     268           1 :                                         // which indicates a "move" compaction. Only add the edit to the list
     269           1 :                                         // once.
     270           1 :                                         diskFileNum := base.PhysicalTableDiskFileNum(nf.Meta.FileNum)
     271           1 :                                         refs := f.editRefs[diskFileNum]
     272           1 :                                         if n := len(refs); n == 0 || refs[n-1] != i {
     273           1 :                                                 f.editRefs[diskFileNum] = append(refs, i)
     274           1 :                                         }
     275           1 :                                         if _, ok := f.tableMeta[nf.Meta.FileNum]; !ok {
     276           1 :                                                 f.tableMeta[nf.Meta.FileNum] = nf.Meta
     277           1 :                                         }
     278             :                                 }
     279             :                         }
     280             :                 }()
     281             :         }
     282             : 
     283           1 :         if f.verbose {
     284           1 :                 fmt.Fprintf(stdout, "%5d %s\n", len(f.edits), makePlural("edit", int64(len(f.edits))))
     285           1 :         }
     286             : }
     287             : 
     288             : // Search the logs and sstables for references to the specified key.
     289           1 : func (f *findT) search(stdout io.Writer, key []byte) []findRef {
     290           1 :         refs := f.searchLogs(stdout, key, nil)
     291           1 :         refs = f.searchTables(stdout, key, refs)
     292           1 : 
     293           1 :         // For a given file (log or table) the references are already in the correct
     294           1 :         // order. We simply want to order the references by fileNum using a stable
     295           1 :         // sort.
     296           1 :         //
     297           1 :         // TODO(peter): I'm not sure if this is perfectly correct with regards to log
     298           1 :         // files and ingested sstables, but it is close enough and doing something
     299           1 :         // better is onerous. Revisit if this ever becomes problematic (e.g. if we
     300           1 :         // allow finding more than one key at a time).
     301           1 :         //
     302           1 :         // An example of the problem with logs and ingestion (which can only occur
     303           1 :         // with distinct keys). If I write key "a" to a log, I can then ingested key
     304           1 :         // "b" without causing "a" to be flushed. Then I can write key "c" to the
     305           1 :         // log. Ideally, we'd show the key "a" from the log, then the key "b" from
     306           1 :         // the ingested sstable, then key "c" from the log.
     307           1 :         slices.SortStableFunc(refs, func(a, b findRef) int {
     308           1 :                 if v := cmp.Compare(a.fileNum, b.fileNum); v != 0 {
     309           1 :                         return v
     310           1 :                 }
     311           1 :                 return cmp.Compare(a.filename, b.filename)
     312             :         })
     313           1 :         return refs
     314             : }
     315             : 
     316             : // Search the logs for references to the specified key.
     317           1 : func (f *findT) searchLogs(stdout io.Writer, searchKey []byte, refs []findRef) []findRef {
     318           1 :         cmp := f.opts.Comparer.Compare
     319           1 :         for _, ll := range f.logs {
     320           1 :                 _ = func() (err error) {
     321           1 :                         rr := ll.OpenForRead()
     322           1 :                         if f.verbose {
     323           1 :                                 fmt.Fprintf(stdout, "%s", ll)
     324           1 :                                 defer fmt.Fprintf(stdout, "\n")
     325           1 :                         }
     326           1 :                         defer func() {
     327           1 :                                 switch err {
     328           0 :                                 case record.ErrZeroedChunk:
     329           0 :                                         if f.verbose {
     330           0 :                                                 fmt.Fprintf(stdout, ": EOF [%s] (may be due to WAL preallocation)", err)
     331           0 :                                         }
     332           0 :                                 case record.ErrInvalidChunk:
     333           0 :                                         if f.verbose {
     334           0 :                                                 fmt.Fprintf(stdout, ": EOF [%s] (may be due to WAL recycling)", err)
     335           0 :                                         }
     336           1 :                                 default:
     337           1 :                                         if err != io.EOF {
     338           0 :                                                 if f.verbose {
     339           0 :                                                         fmt.Fprintf(stdout, ": %s", err)
     340           0 :                                                 } else {
     341           0 :                                                         fmt.Fprintf(stdout, "%s: %s\n", ll, err)
     342           0 :                                                 }
     343             :                                         }
     344             :                                 }
     345             :                         }()
     346             : 
     347           1 :                         var b pebble.Batch
     348           1 :                         var buf bytes.Buffer
     349           1 :                         for {
     350           1 :                                 r, off, err := rr.NextRecord()
     351           1 :                                 if err == nil {
     352           1 :                                         buf.Reset()
     353           1 :                                         _, err = io.Copy(&buf, r)
     354           1 :                                 }
     355           1 :                                 if err != nil {
     356           1 :                                         return err
     357           1 :                                 }
     358             : 
     359           1 :                                 b = pebble.Batch{}
     360           1 :                                 if err := b.SetRepr(buf.Bytes()); err != nil {
     361           0 :                                         fmt.Fprintf(stdout, "%s: corrupt log file: %v", ll, err)
     362           0 :                                         continue
     363             :                                 }
     364           1 :                                 seqNum := b.SeqNum()
     365           1 :                                 for r := b.Reader(); ; seqNum++ {
     366           1 :                                         kind, ukey, value, ok, err := r.Next()
     367           1 :                                         if !ok {
     368           1 :                                                 if err != nil {
     369           0 :                                                         fmt.Fprintf(stdout, "%s: corrupt log file: %v", ll, err)
     370           0 :                                                         break
     371             :                                                 }
     372           1 :                                                 break
     373             :                                         }
     374           1 :                                         ikey := base.MakeInternalKey(ukey, seqNum, kind)
     375           1 :                                         switch kind {
     376             :                                         case base.InternalKeyKindDelete,
     377             :                                                 base.InternalKeyKindDeleteSized,
     378             :                                                 base.InternalKeyKindSet,
     379             :                                                 base.InternalKeyKindMerge,
     380             :                                                 base.InternalKeyKindSingleDelete,
     381           1 :                                                 base.InternalKeyKindSetWithDelete:
     382           1 :                                                 if cmp(searchKey, ikey.UserKey) != 0 {
     383           1 :                                                         continue
     384             :                                                 }
     385           1 :                                         case base.InternalKeyKindRangeDelete:
     386           1 :                                                 // Output tombstones that contain or end with the search key.
     387           1 :                                                 t := rangedel.Decode(ikey, value, nil)
     388           1 :                                                 if !t.Contains(cmp, searchKey) && cmp(t.End, searchKey) != 0 {
     389           1 :                                                         continue
     390             :                                                 }
     391           0 :                                         default:
     392           0 :                                                 continue
     393             :                                         }
     394             : 
     395           1 :                                         refs = append(refs, findRef{
     396           1 :                                                 key:      ikey.Clone(),
     397           1 :                                                 value:    append([]byte(nil), value...),
     398           1 :                                                 fileNum:  base.FileNum(ll.Num),
     399           1 :                                                 filename: filepath.Base(off.PhysicalFile),
     400           1 :                                         })
     401             :                                 }
     402             :                         }
     403             :                 }()
     404             :         }
     405           1 :         return refs
     406             : }
     407             : 
     408             : // Search the tables for references to the specified key.
     409           1 : func (f *findT) searchTables(stdout io.Writer, searchKey []byte, refs []findRef) []findRef {
     410           1 :         cache := pebble.NewCache(128 << 20 /* 128 MB */)
     411           1 :         defer cache.Unref()
     412           1 : 
     413           1 :         f.tableRefs = make(map[base.FileNum]bool)
     414           1 :         for _, fl := range f.tables {
     415           1 :                 _ = func() (err error) {
     416           1 :                         tf, err := f.opts.FS.Open(fl.path)
     417           1 :                         if err != nil {
     418           0 :                                 fmt.Fprintf(stdout, "%s\n", err)
     419           0 :                                 return
     420           0 :                         }
     421             : 
     422           1 :                         m := f.tableMeta[base.PhysicalTableFileNum(fl.DiskFileNum)]
     423           1 :                         if f.verbose {
     424           1 :                                 fmt.Fprintf(stdout, "%s", fl.path)
     425           1 :                                 if m != nil && m.SmallestSeqNum == m.LargestSeqNum {
     426           1 :                                         fmt.Fprintf(stdout, ": global seqnum: %d", m.LargestSeqNum)
     427           1 :                                 }
     428           1 :                                 defer fmt.Fprintf(stdout, "\n")
     429             :                         }
     430           1 :                         defer func() {
     431           1 :                                 switch {
     432           0 :                                 case err != nil:
     433           0 :                                         if f.verbose {
     434           0 :                                                 fmt.Fprintf(stdout, ": %v", err)
     435           0 :                                         } else {
     436           0 :                                                 fmt.Fprintf(stdout, "%s: %v\n", fl.path, err)
     437           0 :                                         }
     438             :                                 }
     439             :                         }()
     440             : 
     441           1 :                         opts := sstable.ReaderOptions{
     442           1 :                                 Comparer:  f.opts.Comparer,
     443           1 :                                 Filters:   f.opts.Filters,
     444           1 :                                 Comparers: f.comparers,
     445           1 :                                 Mergers:   f.mergers,
     446           1 :                         }
     447           1 :                         opts.SetInternal(sstableinternal.ReaderOptions{
     448           1 :                                 CacheOpts: sstableinternal.CacheOptions{
     449           1 :                                         Cache:   cache,
     450           1 :                                         FileNum: fl.DiskFileNum,
     451           1 :                                 },
     452           1 :                         })
     453           1 :                         readable, err := sstable.NewSimpleReadable(tf)
     454           1 :                         if err != nil {
     455           0 :                                 return err
     456           0 :                         }
     457           1 :                         r, err := sstable.NewReader(context.Background(), readable, opts)
     458           1 :                         if err != nil {
     459           1 :                                 f.errors = append(f.errors, fmt.Sprintf("Unable to decode sstable %s, %s", fl.path, err.Error()))
     460           1 :                                 // Ensure the error only gets printed once.
     461           1 :                                 err = nil
     462           1 :                                 return
     463           1 :                         }
     464           1 :                         defer r.Close()
     465           1 : 
     466           1 :                         var transforms sstable.IterTransforms
     467           1 :                         var fragTransforms sstable.FragmentIterTransforms
     468           1 :                         if m != nil {
     469           1 :                                 transforms = m.IterTransforms()
     470           1 :                                 fragTransforms = m.FragmentIterTransforms()
     471           1 :                         }
     472             : 
     473           1 :                         iter, err := r.NewIter(transforms, nil, nil)
     474           1 :                         if err != nil {
     475           0 :                                 return err
     476           0 :                         }
     477           1 :                         defer iter.Close()
     478           1 :                         kv := iter.SeekGE(searchKey, base.SeekGEFlagsNone)
     479           1 : 
     480           1 :                         // We configured sstable.Reader to return raw tombstones which requires a
     481           1 :                         // bit more work here to put them in a form that can be iterated in
     482           1 :                         // parallel with the point records.
     483           1 :                         rangeDelIter, err := func() (keyspan.FragmentIterator, error) {
     484           1 :                                 iter, err := r.NewRawRangeDelIter(context.Background(), fragTransforms)
     485           1 :                                 if err != nil {
     486           0 :                                         return nil, err
     487           0 :                                 }
     488           1 :                                 if iter == nil {
     489           1 :                                         return keyspan.NewIter(r.Compare, nil), nil
     490           1 :                                 }
     491           1 :                                 defer iter.Close()
     492           1 : 
     493           1 :                                 var tombstones []keyspan.Span
     494           1 :                                 t, err := iter.First()
     495           1 :                                 for ; t != nil; t, err = iter.Next() {
     496           1 :                                         if !t.Contains(r.Compare, searchKey) {
     497           1 :                                                 continue
     498             :                                         }
     499           1 :                                         tombstones = append(tombstones, t.Clone())
     500             :                                 }
     501           1 :                                 if err != nil {
     502           0 :                                         return nil, err
     503           0 :                                 }
     504             : 
     505           1 :                                 slices.SortFunc(tombstones, func(a, b keyspan.Span) int {
     506           0 :                                         return r.Compare(a.Start, b.Start)
     507           0 :                                 })
     508           1 :                                 return keyspan.NewIter(r.Compare, tombstones), nil
     509             :                         }()
     510           1 :                         if err != nil {
     511           0 :                                 return err
     512           0 :                         }
     513             : 
     514           1 :                         defer rangeDelIter.Close()
     515           1 :                         rangeDel, err := rangeDelIter.First()
     516           1 :                         if err != nil {
     517           0 :                                 return err
     518           0 :                         }
     519             : 
     520           1 :                         foundRef := false
     521           1 :                         for kv != nil || rangeDel != nil {
     522           1 :                                 if kv != nil &&
     523           1 :                                         (rangeDel == nil || r.Compare(kv.K.UserKey, rangeDel.Start) < 0) {
     524           1 :                                         if r.Compare(searchKey, kv.K.UserKey) != 0 {
     525           1 :                                                 kv = nil
     526           1 :                                                 continue
     527             :                                         }
     528           1 :                                         v, _, err := kv.Value(nil)
     529           1 :                                         if err != nil {
     530           0 :                                                 return err
     531           0 :                                         }
     532           1 :                                         refs = append(refs, findRef{
     533           1 :                                                 key:      kv.K.Clone(),
     534           1 :                                                 value:    slices.Clone(v),
     535           1 :                                                 fileNum:  base.PhysicalTableFileNum(fl.DiskFileNum),
     536           1 :                                                 filename: filepath.Base(fl.path),
     537           1 :                                         })
     538           1 :                                         kv = iter.Next()
     539           1 :                                 } else {
     540           1 :                                         // Use rangedel.Encode to add a reference for each key
     541           1 :                                         // within the span.
     542           1 :                                         err := rangedel.Encode(*rangeDel, func(k base.InternalKey, v []byte) error {
     543           1 :                                                 refs = append(refs, findRef{
     544           1 :                                                         key:      k.Clone(),
     545           1 :                                                         value:    slices.Clone(v),
     546           1 :                                                         fileNum:  base.PhysicalTableFileNum(fl.DiskFileNum),
     547           1 :                                                         filename: filepath.Base(fl.path),
     548           1 :                                                 })
     549           1 :                                                 return nil
     550           1 :                                         })
     551           1 :                                         if err != nil {
     552           0 :                                                 return err
     553           0 :                                         }
     554           1 :                                         rangeDel, err = rangeDelIter.Next()
     555           1 :                                         if err != nil {
     556           0 :                                                 return err
     557           0 :                                         }
     558             :                                 }
     559           1 :                                 foundRef = true
     560             :                         }
     561             : 
     562           1 :                         if foundRef {
     563           1 :                                 f.tableRefs[base.PhysicalTableFileNum(fl.DiskFileNum)] = true
     564           1 :                         }
     565           1 :                         return nil
     566             :                 }()
     567             :         }
     568           1 :         return refs
     569             : }
     570             : 
     571             : // Determine the provenance of the specified table. We search the version edits
     572             : // for the first edit which created the table, and then analyze the edit to
     573             : // determine if it was a compaction, flush, or ingestion. Returns an empty
     574             : // string if the provenance of a table cannot be determined.
     575           1 : func (f *findT) tableProvenance(fileNum base.FileNum) string {
     576           1 :         editRefs := f.editRefs[base.PhysicalTableDiskFileNum(fileNum)]
     577           1 :         for len(editRefs) > 0 {
     578           1 :                 ve := f.edits[editRefs[0]]
     579           1 :                 editRefs = editRefs[1:]
     580           1 :                 for _, nf := range ve.NewFiles {
     581           1 :                         if fileNum != nf.Meta.FileNum {
     582           1 :                                 continue
     583             :                         }
     584             : 
     585           1 :                         var buf bytes.Buffer
     586           1 :                         switch {
     587           1 :                         case len(ve.DeletedFiles) > 0:
     588           1 :                                 // A version edit with deleted files is a compaction. The deleted
     589           1 :                                 // files are the inputs to the compaction. We're going to
     590           1 :                                 // reconstruct the input files and display those inputs that
     591           1 :                                 // contain the search key (i.e. are list in refs) and use an
     592           1 :                                 // ellipsis to indicate when there were other inputs that have
     593           1 :                                 // been elided.
     594           1 :                                 var sourceLevels []int
     595           1 :                                 levels := make(map[int][]base.FileNum)
     596           1 :                                 for df := range ve.DeletedFiles {
     597           1 :                                         files := levels[df.Level]
     598           1 :                                         if len(files) == 0 {
     599           1 :                                                 sourceLevels = append(sourceLevels, df.Level)
     600           1 :                                         }
     601           1 :                                         levels[df.Level] = append(files, df.FileNum)
     602             :                                 }
     603             : 
     604           1 :                                 sort.Ints(sourceLevels)
     605           1 :                                 if sourceLevels[len(sourceLevels)-1] != nf.Level {
     606           0 :                                         sourceLevels = append(sourceLevels, nf.Level)
     607           0 :                                 }
     608             : 
     609           1 :                                 sep := " "
     610           1 :                                 fmt.Fprintf(&buf, "compacted")
     611           1 :                                 for _, level := range sourceLevels {
     612           1 :                                         files := levels[level]
     613           1 :                                         slices.Sort(files)
     614           1 : 
     615           1 :                                         fmt.Fprintf(&buf, "%sL%d [", sep, level)
     616           1 :                                         sep = ""
     617           1 :                                         elided := false
     618           1 :                                         for _, fileNum := range files {
     619           1 :                                                 if f.tableRefs[fileNum] {
     620           1 :                                                         fmt.Fprintf(&buf, "%s%s", sep, fileNum)
     621           1 :                                                         sep = " "
     622           1 :                                                 } else {
     623           1 :                                                         elided = true
     624           1 :                                                 }
     625             :                                         }
     626           1 :                                         if elided {
     627           1 :                                                 fmt.Fprintf(&buf, "%s...", sep)
     628           1 :                                         }
     629           1 :                                         fmt.Fprintf(&buf, "]")
     630           1 :                                         sep = " + "
     631             :                                 }
     632             : 
     633           1 :                         case ve.MinUnflushedLogNum != 0:
     634           1 :                                 // A version edit with a min-unflushed log indicates a flush
     635           1 :                                 // operation.
     636           1 :                                 fmt.Fprintf(&buf, "flushed to L%d", nf.Level)
     637             : 
     638           1 :                         case nf.Meta.SmallestSeqNum == nf.Meta.LargestSeqNum:
     639           1 :                                 // If the smallest and largest seqnum are the same, the file was
     640           1 :                                 // ingested. Note that this can also occur for a flushed sstable
     641           1 :                                 // that contains only a single key, though that would have
     642           1 :                                 // already been captured above.
     643           1 :                                 fmt.Fprintf(&buf, "ingested to L%d", nf.Level)
     644             : 
     645           0 :                         default:
     646           0 :                                 // The provenance of the table is unclear. This is usually due to
     647           0 :                                 // the MANIFEST rolling over and taking a snapshot of the LSM
     648           0 :                                 // state.
     649           0 :                                 fmt.Fprintf(&buf, "added to L%d", nf.Level)
     650             :                         }
     651             : 
     652             :                         // After a table is created, it can be moved to a different level via a
     653             :                         // move compaction. This is indicated by a version edit that deletes the
     654             :                         // table from one level and adds the same table to a different
     655             :                         // level. Loop over the remaining version edits for the table looking for
     656             :                         // such moves.
     657           1 :                         for len(editRefs) > 0 {
     658           1 :                                 ve := f.edits[editRefs[0]]
     659           1 :                                 editRefs = editRefs[1:]
     660           1 :                                 for _, nf := range ve.NewFiles {
     661           1 :                                         if fileNum == nf.Meta.FileNum {
     662           1 :                                                 for df := range ve.DeletedFiles {
     663           1 :                                                         if fileNum == df.FileNum {
     664           1 :                                                                 fmt.Fprintf(&buf, ", moved to L%d", nf.Level)
     665           1 :                                                                 break
     666             :                                                         }
     667             :                                                 }
     668           1 :                                                 break
     669             :                                         }
     670             :                                 }
     671             :                         }
     672             : 
     673           1 :                         return buf.String()
     674             :                 }
     675             :         }
     676           1 :         return ""
     677             : }

Generated by: LCOV version 1.14