LCOV - code coverage report
Current view: top level - pebble/tool - find.go (source / functions) Hit Total Coverage
Test: 2024-02-16 08:16Z 4c682705 - tests only.lcov Lines: 399 470 84.9 %
Date: 2024-02-16 08:16:36 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package tool
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "cmp"
      10             :         "fmt"
      11             :         "io"
      12             :         "slices"
      13             :         "sort"
      14             : 
      15             :         "github.com/cockroachdb/pebble"
      16             :         "github.com/cockroachdb/pebble/internal/base"
      17             :         "github.com/cockroachdb/pebble/internal/keyspan"
      18             :         "github.com/cockroachdb/pebble/internal/manifest"
      19             :         "github.com/cockroachdb/pebble/internal/private"
      20             :         "github.com/cockroachdb/pebble/internal/rangedel"
      21             :         "github.com/cockroachdb/pebble/record"
      22             :         "github.com/cockroachdb/pebble/sstable"
      23             :         "github.com/spf13/cobra"
      24             : )
      25             : 
      26             : type findRef struct {
      27             :         key     base.InternalKey
      28             :         value   []byte
      29             :         fileNum base.FileNum
      30             : }
      31             : 
      32             : // findT implements the find tool.
      33             : //
      34             : // TODO(bananabrick): Add support for virtual sstables in this tool. Currently,
      35             : // the tool will work because we're parsing files from disk, so virtual sstables
      36             : // will never be added to findT.tables. The manifest could contain information
      37             : // about virtual sstables. This is fine because the manifest is only used to
      38             : // compute the findT.editRefs, and editRefs is only used if a file in
      39             : // findT.tables contains a key. Of course, the tool won't be completely
      40             : // accurate without dealing with virtual sstable case.
      41             : type findT struct {
      42             :         Root *cobra.Command
      43             : 
      44             :         // Configuration.
      45             :         opts      *pebble.Options
      46             :         comparers sstable.Comparers
      47             :         mergers   sstable.Mergers
      48             : 
      49             :         // Flags.
      50             :         comparerName string
      51             :         fmtKey       keyFormatter
      52             :         fmtValue     valueFormatter
      53             :         verbose      bool
      54             : 
      55             :         // Map from file num to path on disk.
      56             :         files map[base.DiskFileNum]string
      57             :         // Map from file num to version edit index which references the file num.
      58             :         editRefs map[base.DiskFileNum][]int
      59             :         // List of version edits.
      60             :         edits []manifest.VersionEdit
      61             :         // Sorted list of WAL file nums.
      62             :         logs []base.DiskFileNum
      63             :         // Sorted list of manifest file nums.
      64             :         manifests []base.DiskFileNum
      65             :         // Sorted list of table file nums.
      66             :         tables []base.FileNum
      67             :         // Set of tables that contains references to the search key.
      68             :         tableRefs map[base.FileNum]bool
      69             :         // Map from file num to table metadata.
      70             :         tableMeta map[base.FileNum]*manifest.FileMetadata
      71             :         // List of error messages for SSTables that could not be decoded.
      72             :         errors []string
      73             : }
      74             : 
      75             : func newFind(
      76             :         opts *pebble.Options,
      77             :         comparers sstable.Comparers,
      78             :         defaultComparer string,
      79             :         mergers sstable.Mergers,
      80           1 : ) *findT {
      81           1 :         f := &findT{
      82           1 :                 opts:      opts,
      83           1 :                 comparers: comparers,
      84           1 :                 mergers:   mergers,
      85           1 :         }
      86           1 :         f.fmtKey.mustSet("quoted")
      87           1 :         f.fmtValue.mustSet("[%x]")
      88           1 : 
      89           1 :         f.Root = &cobra.Command{
      90           1 :                 Use:   "find <dir> <key>",
      91           1 :                 Short: "find references to the specified key",
      92           1 :                 Long: `
      93           1 : Find references to the specified key and any range tombstones that contain the
      94           1 : key. This includes references to the key in WAL files and sstables, and the
      95           1 : provenance of the sstables (flushed, ingested, compacted).
      96           1 : `,
      97           1 :                 Args: cobra.ExactArgs(2),
      98           1 :                 Run:  f.run,
      99           1 :         }
     100           1 : 
     101           1 :         f.Root.Flags().BoolVarP(
     102           1 :                 &f.verbose, "verbose", "v", false, "verbose output")
     103           1 :         f.Root.Flags().StringVar(
     104           1 :                 &f.comparerName, "comparer", defaultComparer, "comparer name")
     105           1 :         f.Root.Flags().Var(
     106           1 :                 &f.fmtKey, "key", "key formatter")
     107           1 :         f.Root.Flags().Var(
     108           1 :                 &f.fmtValue, "value", "value formatter")
     109           1 :         return f
     110           1 : }
     111             : 
     112           1 : func (f *findT) run(cmd *cobra.Command, args []string) {
     113           1 :         stdout, stderr := cmd.OutOrStdout(), cmd.OutOrStderr()
     114           1 :         var key key
     115           1 :         if err := key.Set(args[1]); err != nil {
     116           0 :                 fmt.Fprintf(stdout, "%s\n", err)
     117           0 :                 return
     118           0 :         }
     119             : 
     120           1 :         if err := f.findFiles(stdout, stderr, args[0]); err != nil {
     121           1 :                 fmt.Fprintf(stdout, "%s\n", err)
     122           1 :                 return
     123           1 :         }
     124           1 :         f.readManifests(stdout)
     125           1 : 
     126           1 :         f.opts.Comparer = f.comparers[f.comparerName]
     127           1 :         if f.opts.Comparer == nil {
     128           0 :                 fmt.Fprintf(stderr, "unknown comparer %q", f.comparerName)
     129           0 :                 return
     130           0 :         }
     131           1 :         f.fmtKey.setForComparer(f.opts.Comparer.Name, f.comparers)
     132           1 :         f.fmtValue.setForComparer(f.opts.Comparer.Name, f.comparers)
     133           1 : 
     134           1 :         refs := f.search(stdout, key)
     135           1 :         var lastFileNum base.FileNum
     136           1 :         for i := range refs {
     137           1 :                 r := &refs[i]
     138           1 :                 if lastFileNum != r.fileNum {
     139           1 :                         lastFileNum = r.fileNum
     140           1 :                         fmt.Fprintf(stdout, "%s", f.opts.FS.PathBase(f.files[base.PhysicalTableDiskFileNum(r.fileNum)]))
     141           1 :                         if m := f.tableMeta[r.fileNum]; m != nil {
     142           1 :                                 fmt.Fprintf(stdout, " ")
     143           1 :                                 formatKeyRange(stdout, f.fmtKey, &m.Smallest, &m.Largest)
     144           1 :                         }
     145           1 :                         fmt.Fprintf(stdout, "\n")
     146           1 :                         if p := f.tableProvenance(r.fileNum); p != "" {
     147           1 :                                 fmt.Fprintf(stdout, "    (%s)\n", p)
     148           1 :                         }
     149             :                 }
     150           1 :                 fmt.Fprintf(stdout, "    ")
     151           1 :                 formatKeyValue(stdout, f.fmtKey, f.fmtValue, &r.key, r.value)
     152             :         }
     153             : 
     154           1 :         for _, errorMsg := range f.errors {
     155           1 :                 fmt.Fprint(stdout, errorMsg)
     156           1 :         }
     157             : }
     158             : 
     159             : // Find all of the manifests, logs, and tables in the specified directory.
     160           1 : func (f *findT) findFiles(stdout, stderr io.Writer, dir string) error {
     161           1 :         f.files = make(map[base.DiskFileNum]string)
     162           1 :         f.editRefs = make(map[base.DiskFileNum][]int)
     163           1 :         f.logs = nil
     164           1 :         f.manifests = nil
     165           1 :         f.tables = nil
     166           1 :         f.tableMeta = make(map[base.FileNum]*manifest.FileMetadata)
     167           1 : 
     168           1 :         if _, err := f.opts.FS.Stat(dir); err != nil {
     169           1 :                 return err
     170           1 :         }
     171             : 
     172           1 :         walk(stderr, f.opts.FS, dir, func(path string) {
     173           1 :                 // TODO(sumeer): delegate FileTypeLog handling to wal package.
     174           1 :                 ft, fileNum, ok := base.ParseFilename(f.opts.FS, path)
     175           1 :                 if !ok {
     176           1 :                         return
     177           1 :                 }
     178           1 :                 switch ft {
     179           1 :                 case base.FileTypeLog:
     180           1 :                         f.logs = append(f.logs, fileNum)
     181           1 :                 case base.FileTypeManifest:
     182           1 :                         f.manifests = append(f.manifests, fileNum)
     183           1 :                 case base.FileTypeTable:
     184           1 :                         f.tables = append(f.tables, base.PhysicalTableFileNum(fileNum))
     185           1 :                 default:
     186           1 :                         return
     187             :                 }
     188           1 :                 f.files[fileNum] = path
     189             :         })
     190             : 
     191           1 :         slices.Sort(f.logs)
     192           1 :         slices.Sort(f.manifests)
     193           1 :         slices.Sort(f.tables)
     194           1 : 
     195           1 :         if f.verbose {
     196           1 :                 fmt.Fprintf(stdout, "%s\n", dir)
     197           1 :                 fmt.Fprintf(stdout, "%5d %s\n", len(f.manifests), makePlural("manifest", int64(len(f.manifests))))
     198           1 :                 fmt.Fprintf(stdout, "%5d %s\n", len(f.logs), makePlural("log", int64(len(f.logs))))
     199           1 :                 fmt.Fprintf(stdout, "%5d %s\n", len(f.tables), makePlural("sstable", int64(len(f.tables))))
     200           1 :         }
     201           1 :         return nil
     202             : }
     203             : 
     204             : // Read the manifests and populate the editRefs map which is used to determine
     205             : // the provenance and metadata of tables.
     206           1 : func (f *findT) readManifests(stdout io.Writer) {
     207           1 :         for _, fileNum := range f.manifests {
     208           1 :                 func() {
     209           1 :                         path := f.files[fileNum]
     210           1 :                         mf, err := f.opts.FS.Open(path)
     211           1 :                         if err != nil {
     212           0 :                                 fmt.Fprintf(stdout, "%s\n", err)
     213           0 :                                 return
     214           0 :                         }
     215           1 :                         defer mf.Close()
     216           1 : 
     217           1 :                         if f.verbose {
     218           1 :                                 fmt.Fprintf(stdout, "%s\n", path)
     219           1 :                         }
     220             : 
     221           1 :                         rr := record.NewReader(mf, 0 /* logNum */)
     222           1 :                         for {
     223           1 :                                 r, err := rr.Next()
     224           1 :                                 if err != nil {
     225           1 :                                         if err != io.EOF {
     226           0 :                                                 fmt.Fprintf(stdout, "%s: %s\n", path, err)
     227           0 :                                         }
     228           1 :                                         break
     229             :                                 }
     230             : 
     231           1 :                                 var ve manifest.VersionEdit
     232           1 :                                 if err := ve.Decode(r); err != nil {
     233           0 :                                         fmt.Fprintf(stdout, "%s: %s\n", path, err)
     234           0 :                                         break
     235             :                                 }
     236           1 :                                 i := len(f.edits)
     237           1 :                                 f.edits = append(f.edits, ve)
     238           1 : 
     239           1 :                                 if ve.ComparerName != "" {
     240           1 :                                         f.comparerName = ve.ComparerName
     241           1 :                                 }
     242           1 :                                 if num := ve.MinUnflushedLogNum; num != 0 {
     243           1 :                                         f.editRefs[num] = append(f.editRefs[num], i)
     244           1 :                                 }
     245           1 :                                 for df := range ve.DeletedFiles {
     246           1 :                                         diskFileNum := base.PhysicalTableDiskFileNum(df.FileNum)
     247           1 :                                         f.editRefs[diskFileNum] = append(f.editRefs[diskFileNum], i)
     248           1 :                                 }
     249           1 :                                 for _, nf := range ve.NewFiles {
     250           1 :                                         // The same file can be deleted and added in a single version edit
     251           1 :                                         // which indicates a "move" compaction. Only add the edit to the list
     252           1 :                                         // once.
     253           1 :                                         diskFileNum := base.PhysicalTableDiskFileNum(nf.Meta.FileNum)
     254           1 :                                         refs := f.editRefs[diskFileNum]
     255           1 :                                         if n := len(refs); n == 0 || refs[n-1] != i {
     256           1 :                                                 f.editRefs[diskFileNum] = append(refs, i)
     257           1 :                                         }
     258           1 :                                         if _, ok := f.tableMeta[nf.Meta.FileNum]; !ok {
     259           1 :                                                 f.tableMeta[nf.Meta.FileNum] = nf.Meta
     260           1 :                                         }
     261             :                                 }
     262             :                         }
     263             :                 }()
     264             :         }
     265             : 
     266           1 :         if f.verbose {
     267           1 :                 fmt.Fprintf(stdout, "%5d %s\n", len(f.edits), makePlural("edit", int64(len(f.edits))))
     268           1 :         }
     269             : }
     270             : 
     271             : // Search the logs and sstables for references to the specified key.
     272           1 : func (f *findT) search(stdout io.Writer, key []byte) []findRef {
     273           1 :         refs := f.searchLogs(stdout, key, nil)
     274           1 :         refs = f.searchTables(stdout, key, refs)
     275           1 : 
     276           1 :         // For a given file (log or table) the references are already in the correct
     277           1 :         // order. We simply want to order the references by fileNum using a stable
     278           1 :         // sort.
     279           1 :         //
     280           1 :         // TODO(peter): I'm not sure if this is perfectly correct with regards to log
     281           1 :         // files and ingested sstables, but it is close enough and doing something
     282           1 :         // better is onerous. Revisit if this ever becomes problematic (e.g. if we
     283           1 :         // allow finding more than one key at a time).
     284           1 :         //
     285           1 :         // An example of the problem with logs and ingestion (which can only occur
     286           1 :         // with distinct keys). If I write key "a" to a log, I can then ingested key
     287           1 :         // "b" without causing "a" to be flushed. Then I can write key "c" to the
     288           1 :         // log. Ideally, we'd show the key "a" from the log, then the key "b" from
     289           1 :         // the ingested sstable, then key "c" from the log.
     290           1 :         slices.SortStableFunc(refs, func(a, b findRef) int {
     291           1 :                 return cmp.Compare(a.fileNum, b.fileNum)
     292           1 :         })
     293           1 :         return refs
     294             : }
     295             : 
     296             : // Search the logs for references to the specified key.
     297           1 : func (f *findT) searchLogs(stdout io.Writer, searchKey []byte, refs []findRef) []findRef {
     298           1 :         cmp := f.opts.Comparer.Compare
     299           1 :         for _, fileNum := range f.logs {
     300           1 :                 _ = func() (err error) {
     301           1 :                         path := f.files[fileNum]
     302           1 :                         lf, err := f.opts.FS.Open(path)
     303           1 :                         if err != nil {
     304           0 :                                 fmt.Fprintf(stdout, "%s\n", err)
     305           0 :                                 return
     306           0 :                         }
     307           1 :                         defer lf.Close()
     308           1 : 
     309           1 :                         if f.verbose {
     310           1 :                                 fmt.Fprintf(stdout, "%s", path)
     311           1 :                                 defer fmt.Fprintf(stdout, "\n")
     312           1 :                         }
     313           1 :                         defer func() {
     314           1 :                                 switch err {
     315           0 :                                 case record.ErrZeroedChunk:
     316           0 :                                         if f.verbose {
     317           0 :                                                 fmt.Fprintf(stdout, ": EOF [%s] (may be due to WAL preallocation)", err)
     318           0 :                                         }
     319           0 :                                 case record.ErrInvalidChunk:
     320           0 :                                         if f.verbose {
     321           0 :                                                 fmt.Fprintf(stdout, ": EOF [%s] (may be due to WAL recycling)", err)
     322           0 :                                         }
     323           1 :                                 default:
     324           1 :                                         if err != io.EOF {
     325           0 :                                                 if f.verbose {
     326           0 :                                                         fmt.Fprintf(stdout, ": %s", err)
     327           0 :                                                 } else {
     328           0 :                                                         fmt.Fprintf(stdout, "%s: %s\n", path, err)
     329           0 :                                                 }
     330             :                                         }
     331             :                                 }
     332             :                         }()
     333             : 
     334           1 :                         var b pebble.Batch
     335           1 :                         var buf bytes.Buffer
     336           1 :                         rr := record.NewReader(lf, fileNum)
     337           1 :                         for {
     338           1 :                                 r, err := rr.Next()
     339           1 :                                 if err == nil {
     340           1 :                                         buf.Reset()
     341           1 :                                         _, err = io.Copy(&buf, r)
     342           1 :                                 }
     343           1 :                                 if err != nil {
     344           1 :                                         return err
     345           1 :                                 }
     346             : 
     347           1 :                                 b = pebble.Batch{}
     348           1 :                                 if err := b.SetRepr(buf.Bytes()); err != nil {
     349           0 :                                         fmt.Fprintf(stdout, "%s: corrupt log file: %v", path, err)
     350           0 :                                         continue
     351             :                                 }
     352           1 :                                 seqNum := b.SeqNum()
     353           1 :                                 for r := b.Reader(); ; seqNum++ {
     354           1 :                                         kind, ukey, value, ok, err := r.Next()
     355           1 :                                         if !ok {
     356           1 :                                                 if err != nil {
     357           0 :                                                         fmt.Fprintf(stdout, "%s: corrupt log file: %v", path, err)
     358           0 :                                                         break
     359             :                                                 }
     360           1 :                                                 break
     361             :                                         }
     362           1 :                                         ikey := base.MakeInternalKey(ukey, seqNum, kind)
     363           1 :                                         switch kind {
     364             :                                         case base.InternalKeyKindDelete,
     365             :                                                 base.InternalKeyKindDeleteSized,
     366             :                                                 base.InternalKeyKindSet,
     367             :                                                 base.InternalKeyKindMerge,
     368             :                                                 base.InternalKeyKindSingleDelete,
     369           1 :                                                 base.InternalKeyKindSetWithDelete:
     370           1 :                                                 if cmp(searchKey, ikey.UserKey) != 0 {
     371           1 :                                                         continue
     372             :                                                 }
     373           1 :                                         case base.InternalKeyKindRangeDelete:
     374           1 :                                                 // Output tombstones that contain or end with the search key.
     375           1 :                                                 t := rangedel.Decode(ikey, value, nil)
     376           1 :                                                 if !t.Contains(cmp, searchKey) && cmp(t.End, searchKey) != 0 {
     377           1 :                                                         continue
     378             :                                                 }
     379           0 :                                         default:
     380           0 :                                                 continue
     381             :                                         }
     382             : 
     383           1 :                                         refs = append(refs, findRef{
     384           1 :                                                 key:     ikey.Clone(),
     385           1 :                                                 value:   append([]byte(nil), value...),
     386           1 :                                                 fileNum: base.PhysicalTableFileNum(fileNum),
     387           1 :                                         })
     388             :                                 }
     389             :                         }
     390             :                 }()
     391             :         }
     392           1 :         return refs
     393             : }
     394             : 
     395             : // Search the tables for references to the specified key.
     396           1 : func (f *findT) searchTables(stdout io.Writer, searchKey []byte, refs []findRef) []findRef {
     397           1 :         cache := pebble.NewCache(128 << 20 /* 128 MB */)
     398           1 :         defer cache.Unref()
     399           1 : 
     400           1 :         f.tableRefs = make(map[base.FileNum]bool)
     401           1 :         for _, fileNum := range f.tables {
     402           1 :                 _ = func() (err error) {
     403           1 :                         path := f.files[base.PhysicalTableDiskFileNum(fileNum)]
     404           1 :                         tf, err := f.opts.FS.Open(path)
     405           1 :                         if err != nil {
     406           0 :                                 fmt.Fprintf(stdout, "%s\n", err)
     407           0 :                                 return
     408           0 :                         }
     409             : 
     410           1 :                         m := f.tableMeta[fileNum]
     411           1 :                         if f.verbose {
     412           1 :                                 fmt.Fprintf(stdout, "%s", path)
     413           1 :                                 if m != nil && m.SmallestSeqNum == m.LargestSeqNum {
     414           1 :                                         fmt.Fprintf(stdout, ": global seqnum: %d", m.LargestSeqNum)
     415           1 :                                 }
     416           1 :                                 defer fmt.Fprintf(stdout, "\n")
     417             :                         }
     418           1 :                         defer func() {
     419           1 :                                 switch {
     420           0 :                                 case err != nil:
     421           0 :                                         if f.verbose {
     422           0 :                                                 fmt.Fprintf(stdout, ": %v", err)
     423           0 :                                         } else {
     424           0 :                                                 fmt.Fprintf(stdout, "%s: %v\n", path, err)
     425           0 :                                         }
     426             :                                 }
     427             :                         }()
     428             : 
     429           1 :                         opts := sstable.ReaderOptions{
     430           1 :                                 Cache:    cache,
     431           1 :                                 Comparer: f.opts.Comparer,
     432           1 :                                 Filters:  f.opts.Filters,
     433           1 :                         }
     434           1 :                         readable, err := sstable.NewSimpleReadable(tf)
     435           1 :                         if err != nil {
     436           0 :                                 return err
     437           0 :                         }
     438           1 :                         r, err := sstable.NewReader(readable, opts, f.comparers, f.mergers,
     439           1 :                                 private.SSTableRawTombstonesOpt.(sstable.ReaderOption))
     440           1 :                         if err != nil {
     441           1 :                                 f.errors = append(f.errors, fmt.Sprintf("Unable to decode sstable %s, %s", f.files[base.PhysicalTableDiskFileNum(fileNum)], err.Error()))
     442           1 :                                 // Ensure the error only gets printed once.
     443           1 :                                 err = nil
     444           1 :                                 return
     445           1 :                         }
     446           1 :                         defer r.Close()
     447           1 : 
     448           1 :                         if m != nil && m.SmallestSeqNum == m.LargestSeqNum {
     449           1 :                                 r.Properties.GlobalSeqNum = m.LargestSeqNum
     450           1 :                         }
     451             : 
     452           1 :                         iter, err := r.NewIter(nil, nil)
     453           1 :                         if err != nil {
     454           0 :                                 return err
     455           0 :                         }
     456           1 :                         defer iter.Close()
     457           1 :                         key, value := iter.SeekGE(searchKey, base.SeekGEFlagsNone)
     458           1 : 
     459           1 :                         // We configured sstable.Reader to return raw tombstones which requires a
     460           1 :                         // bit more work here to put them in a form that can be iterated in
     461           1 :                         // parallel with the point records.
     462           1 :                         rangeDelIter, err := func() (keyspan.FragmentIterator, error) {
     463           1 :                                 iter, err := r.NewRawRangeDelIter()
     464           1 :                                 if err != nil {
     465           0 :                                         return nil, err
     466           0 :                                 }
     467           1 :                                 if iter == nil {
     468           1 :                                         return keyspan.NewIter(r.Compare, nil), nil
     469           1 :                                 }
     470           1 :                                 defer iter.Close()
     471           1 : 
     472           1 :                                 var tombstones []keyspan.Span
     473           1 :                                 t, err := iter.First()
     474           1 :                                 for ; t != nil; t, err = iter.Next() {
     475           1 :                                         if !t.Contains(r.Compare, searchKey) {
     476           1 :                                                 continue
     477             :                                         }
     478           1 :                                         tombstones = append(tombstones, t.ShallowClone())
     479             :                                 }
     480           1 :                                 if err != nil {
     481           0 :                                         return nil, err
     482           0 :                                 }
     483             : 
     484           1 :                                 slices.SortFunc(tombstones, func(a, b keyspan.Span) int {
     485           0 :                                         return r.Compare(a.Start, b.Start)
     486           0 :                                 })
     487           1 :                                 return keyspan.NewIter(r.Compare, tombstones), nil
     488             :                         }()
     489           1 :                         if err != nil {
     490           0 :                                 return err
     491           0 :                         }
     492             : 
     493           1 :                         defer rangeDelIter.Close()
     494           1 :                         rangeDel, err := rangeDelIter.First()
     495           1 :                         if err != nil {
     496           0 :                                 return err
     497           0 :                         }
     498             : 
     499           1 :                         foundRef := false
     500           1 :                         for key != nil || rangeDel != nil {
     501           1 :                                 if key != nil &&
     502           1 :                                         (rangeDel == nil || r.Compare(key.UserKey, rangeDel.Start) < 0) {
     503           1 :                                         if r.Compare(searchKey, key.UserKey) != 0 {
     504           1 :                                                 key, value = nil, base.LazyValue{}
     505           1 :                                                 continue
     506             :                                         }
     507           1 :                                         v, _, err := value.Value(nil)
     508           1 :                                         if err != nil {
     509           0 :                                                 return err
     510           0 :                                         }
     511           1 :                                         refs = append(refs, findRef{
     512           1 :                                                 key:     key.Clone(),
     513           1 :                                                 value:   append([]byte(nil), v...),
     514           1 :                                                 fileNum: fileNum,
     515           1 :                                         })
     516           1 :                                         key, value = iter.Next()
     517           1 :                                 } else {
     518           1 :                                         // Use rangedel.Encode to add a reference for each key
     519           1 :                                         // within the span.
     520           1 :                                         err := rangedel.Encode(rangeDel, func(k base.InternalKey, v []byte) error {
     521           1 :                                                 refs = append(refs, findRef{
     522           1 :                                                         key:     k.Clone(),
     523           1 :                                                         value:   append([]byte(nil), v...),
     524           1 :                                                         fileNum: fileNum,
     525           1 :                                                 })
     526           1 :                                                 return nil
     527           1 :                                         })
     528           1 :                                         if err != nil {
     529           0 :                                                 return err
     530           0 :                                         }
     531           1 :                                         rangeDel, err = rangeDelIter.Next()
     532           1 :                                         if err != nil {
     533           0 :                                                 return err
     534           0 :                                         }
     535             :                                 }
     536           1 :                                 foundRef = true
     537             :                         }
     538             : 
     539           1 :                         if foundRef {
     540           1 :                                 f.tableRefs[fileNum] = true
     541           1 :                         }
     542           1 :                         return nil
     543             :                 }()
     544             :         }
     545           1 :         return refs
     546             : }
     547             : 
     548             : // Determine the provenance of the specified table. We search the version edits
     549             : // for the first edit which created the table, and then analyze the edit to
     550             : // determine if it was a compaction, flush, or ingestion. Returns an empty
     551             : // string if the provenance of a table cannot be determined.
     552           1 : func (f *findT) tableProvenance(fileNum base.FileNum) string {
     553           1 :         editRefs := f.editRefs[base.PhysicalTableDiskFileNum(fileNum)]
     554           1 :         for len(editRefs) > 0 {
     555           1 :                 ve := f.edits[editRefs[0]]
     556           1 :                 editRefs = editRefs[1:]
     557           1 :                 for _, nf := range ve.NewFiles {
     558           1 :                         if fileNum != nf.Meta.FileNum {
     559           1 :                                 continue
     560             :                         }
     561             : 
     562           1 :                         var buf bytes.Buffer
     563           1 :                         switch {
     564           1 :                         case len(ve.DeletedFiles) > 0:
     565           1 :                                 // A version edit with deleted files is a compaction. The deleted
     566           1 :                                 // files are the inputs to the compaction. We're going to
     567           1 :                                 // reconstruct the input files and display those inputs that
     568           1 :                                 // contain the search key (i.e. are list in refs) and use an
     569           1 :                                 // ellipsis to indicate when there were other inputs that have
     570           1 :                                 // been elided.
     571           1 :                                 var sourceLevels []int
     572           1 :                                 levels := make(map[int][]base.FileNum)
     573           1 :                                 for df := range ve.DeletedFiles {
     574           1 :                                         files := levels[df.Level]
     575           1 :                                         if len(files) == 0 {
     576           1 :                                                 sourceLevels = append(sourceLevels, df.Level)
     577           1 :                                         }
     578           1 :                                         levels[df.Level] = append(files, df.FileNum)
     579             :                                 }
     580             : 
     581           1 :                                 sort.Ints(sourceLevels)
     582           1 :                                 if sourceLevels[len(sourceLevels)-1] != nf.Level {
     583           0 :                                         sourceLevels = append(sourceLevels, nf.Level)
     584           0 :                                 }
     585             : 
     586           1 :                                 sep := " "
     587           1 :                                 fmt.Fprintf(&buf, "compacted")
     588           1 :                                 for _, level := range sourceLevels {
     589           1 :                                         files := levels[level]
     590           1 :                                         slices.Sort(files)
     591           1 : 
     592           1 :                                         fmt.Fprintf(&buf, "%sL%d [", sep, level)
     593           1 :                                         sep = ""
     594           1 :                                         elided := false
     595           1 :                                         for _, fileNum := range files {
     596           1 :                                                 if f.tableRefs[fileNum] {
     597           1 :                                                         fmt.Fprintf(&buf, "%s%s", sep, fileNum)
     598           1 :                                                         sep = " "
     599           1 :                                                 } else {
     600           1 :                                                         elided = true
     601           1 :                                                 }
     602             :                                         }
     603           1 :                                         if elided {
     604           1 :                                                 fmt.Fprintf(&buf, "%s...", sep)
     605           1 :                                         }
     606           1 :                                         fmt.Fprintf(&buf, "]")
     607           1 :                                         sep = " + "
     608             :                                 }
     609             : 
     610           1 :                         case ve.MinUnflushedLogNum != 0:
     611           1 :                                 // A version edit with a min-unflushed log indicates a flush
     612           1 :                                 // operation.
     613           1 :                                 fmt.Fprintf(&buf, "flushed to L%d", nf.Level)
     614             : 
     615           1 :                         case nf.Meta.SmallestSeqNum == nf.Meta.LargestSeqNum:
     616           1 :                                 // If the smallest and largest seqnum are the same, the file was
     617           1 :                                 // ingested. Note that this can also occur for a flushed sstable
     618           1 :                                 // that contains only a single key, though that would have
     619           1 :                                 // already been captured above.
     620           1 :                                 fmt.Fprintf(&buf, "ingested to L%d", nf.Level)
     621             : 
     622           0 :                         default:
     623           0 :                                 // The provenance of the table is unclear. This is usually due to
     624           0 :                                 // the MANIFEST rolling over and taking a snapshot of the LSM
     625           0 :                                 // state.
     626           0 :                                 fmt.Fprintf(&buf, "added to L%d", nf.Level)
     627             :                         }
     628             : 
     629             :                         // After a table is created, it can be moved to a different level via a
     630             :                         // move compaction. This is indicated by a version edit that deletes the
     631             :                         // table from one level and adds the same table to a different
     632             :                         // level. Loop over the remaining version edits for the table looking for
     633             :                         // such moves.
     634           1 :                         for len(editRefs) > 0 {
     635           1 :                                 ve := f.edits[editRefs[0]]
     636           1 :                                 editRefs = editRefs[1:]
     637           1 :                                 for _, nf := range ve.NewFiles {
     638           1 :                                         if fileNum == nf.Meta.FileNum {
     639           1 :                                                 for df := range ve.DeletedFiles {
     640           1 :                                                         if fileNum == df.FileNum {
     641           1 :                                                                 fmt.Fprintf(&buf, ", moved to L%d", nf.Level)
     642           1 :                                                                 break
     643             :                                                         }
     644             :                                                 }
     645           1 :                                                 break
     646             :                                         }
     647             :                                 }
     648             :                         }
     649             : 
     650           1 :                         return buf.String()
     651             :                 }
     652             :         }
     653           1 :         return ""
     654             : }

Generated by: LCOV version 1.14