LCOV - code coverage report
Current view: top level - pebble/tool - find.go (source / functions) Hit Total Coverage
Test: 2023-10-18 08:17Z 5807b591 - tests + meta.lcov Lines: 394 461 85.5 %
Date: 2023-10-18 08:18:06 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package tool
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "fmt"
      10             :         "io"
      11             :         "sort"
      12             : 
      13             :         "github.com/cockroachdb/pebble"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/internal/keyspan"
      16             :         "github.com/cockroachdb/pebble/internal/manifest"
      17             :         "github.com/cockroachdb/pebble/internal/private"
      18             :         "github.com/cockroachdb/pebble/internal/rangedel"
      19             :         "github.com/cockroachdb/pebble/record"
      20             :         "github.com/cockroachdb/pebble/sstable"
      21             :         "github.com/spf13/cobra"
      22             : )
      23             : 
      24             : type findRef struct {
      25             :         key     base.InternalKey
      26             :         value   []byte
      27             :         fileNum base.FileNum
      28             : }
      29             : 
      30             : // findT implements the find tool.
      31             : //
      32             : // TODO(bananabrick): Add support for virtual sstables in this tool. Currently,
      33             : // the tool will work because we're parsing files from disk, so virtual sstables
      34             : // will never be added to findT.tables. The manifest could contain information
      35             : // about virtual sstables. This is fine because the manifest is only used to
      36             : // compute the findT.editRefs, and editRefs is only used if a file in
      37             : // findT.tables contains a key. Of course, the tool won't be completely
      38             : // accurate without dealing with virtual sstable case.
      39             : type findT struct {
      40             :         Root *cobra.Command
      41             : 
      42             :         // Configuration.
      43             :         opts      *pebble.Options
      44             :         comparers sstable.Comparers
      45             :         mergers   sstable.Mergers
      46             : 
      47             :         // Flags.
      48             :         comparerName string
      49             :         fmtKey       keyFormatter
      50             :         fmtValue     valueFormatter
      51             :         verbose      bool
      52             : 
      53             :         // Map from file num to path on disk.
      54             :         files map[base.DiskFileNum]string
      55             :         // Map from file num to version edit index which references the file num.
      56             :         editRefs map[base.FileNum][]int
      57             :         // List of version edits.
      58             :         edits []manifest.VersionEdit
      59             :         // Sorted list of WAL file nums.
      60             :         logs []base.DiskFileNum
      61             :         // Sorted list of manifest file nums.
      62             :         manifests []base.DiskFileNum
      63             :         // Sorted list of table file nums.
      64             :         tables []base.FileNum
      65             :         // Set of tables that contains references to the search key.
      66             :         tableRefs map[base.FileNum]bool
      67             :         // Map from file num to table metadata.
      68             :         tableMeta map[base.FileNum]*manifest.FileMetadata
      69             :         // List of error messages for SSTables that could not be decoded.
      70             :         errors []string
      71             : }
      72             : 
      73             : func newFind(
      74             :         opts *pebble.Options,
      75             :         comparers sstable.Comparers,
      76             :         defaultComparer string,
      77             :         mergers sstable.Mergers,
      78           1 : ) *findT {
      79           1 :         f := &findT{
      80           1 :                 opts:      opts,
      81           1 :                 comparers: comparers,
      82           1 :                 mergers:   mergers,
      83           1 :         }
      84           1 :         f.fmtKey.mustSet("quoted")
      85           1 :         f.fmtValue.mustSet("[%x]")
      86           1 : 
      87           1 :         f.Root = &cobra.Command{
      88           1 :                 Use:   "find <dir> <key>",
      89           1 :                 Short: "find references to the specified key",
      90           1 :                 Long: `
      91           1 : Find references to the specified key and any range tombstones that contain the
      92           1 : key. This includes references to the key in WAL files and sstables, and the
      93           1 : provenance of the sstables (flushed, ingested, compacted).
      94           1 : `,
      95           1 :                 Args: cobra.ExactArgs(2),
      96           1 :                 Run:  f.run,
      97           1 :         }
      98           1 : 
      99           1 :         f.Root.Flags().BoolVarP(
     100           1 :                 &f.verbose, "verbose", "v", false, "verbose output")
     101           1 :         f.Root.Flags().StringVar(
     102           1 :                 &f.comparerName, "comparer", defaultComparer, "comparer name")
     103           1 :         f.Root.Flags().Var(
     104           1 :                 &f.fmtKey, "key", "key formatter")
     105           1 :         f.Root.Flags().Var(
     106           1 :                 &f.fmtValue, "value", "value formatter")
     107           1 :         return f
     108           1 : }
     109             : 
     110           1 : func (f *findT) run(cmd *cobra.Command, args []string) {
     111           1 :         stdout, stderr := cmd.OutOrStdout(), cmd.OutOrStderr()
     112           1 :         var key key
     113           1 :         if err := key.Set(args[1]); err != nil {
     114           0 :                 fmt.Fprintf(stdout, "%s\n", err)
     115           0 :                 return
     116           0 :         }
     117             : 
     118           1 :         if err := f.findFiles(stdout, stderr, args[0]); err != nil {
     119           1 :                 fmt.Fprintf(stdout, "%s\n", err)
     120           1 :                 return
     121           1 :         }
     122           1 :         f.readManifests(stdout)
     123           1 : 
     124           1 :         f.opts.Comparer = f.comparers[f.comparerName]
     125           1 :         if f.opts.Comparer == nil {
     126           0 :                 fmt.Fprintf(stderr, "unknown comparer %q", f.comparerName)
     127           0 :                 return
     128           0 :         }
     129           1 :         f.fmtKey.setForComparer(f.opts.Comparer.Name, f.comparers)
     130           1 :         f.fmtValue.setForComparer(f.opts.Comparer.Name, f.comparers)
     131           1 : 
     132           1 :         refs := f.search(stdout, key)
     133           1 :         var lastFileNum base.FileNum
     134           1 :         for i := range refs {
     135           1 :                 r := &refs[i]
     136           1 :                 if lastFileNum != r.fileNum {
     137           1 :                         lastFileNum = r.fileNum
     138           1 :                         fmt.Fprintf(stdout, "%s", f.opts.FS.PathBase(f.files[r.fileNum.DiskFileNum()]))
     139           1 :                         if m := f.tableMeta[r.fileNum]; m != nil {
     140           1 :                                 fmt.Fprintf(stdout, " ")
     141           1 :                                 formatKeyRange(stdout, f.fmtKey, &m.Smallest, &m.Largest)
     142           1 :                         }
     143           1 :                         fmt.Fprintf(stdout, "\n")
     144           1 :                         if p := f.tableProvenance(r.fileNum); p != "" {
     145           1 :                                 fmt.Fprintf(stdout, "    (%s)\n", p)
     146           1 :                         }
     147             :                 }
     148           1 :                 fmt.Fprintf(stdout, "    ")
     149           1 :                 formatKeyValue(stdout, f.fmtKey, f.fmtValue, &r.key, r.value)
     150             :         }
     151             : 
     152           1 :         for _, errorMsg := range f.errors {
     153           1 :                 fmt.Fprint(stdout, errorMsg)
     154           1 :         }
     155             : }
     156             : 
     157             : // Find all of the manifests, logs, and tables in the specified directory.
     158           1 : func (f *findT) findFiles(stdout, stderr io.Writer, dir string) error {
     159           1 :         f.files = make(map[base.DiskFileNum]string)
     160           1 :         f.editRefs = make(map[base.FileNum][]int)
     161           1 :         f.logs = nil
     162           1 :         f.manifests = nil
     163           1 :         f.tables = nil
     164           1 :         f.tableMeta = make(map[base.FileNum]*manifest.FileMetadata)
     165           1 : 
     166           1 :         if _, err := f.opts.FS.Stat(dir); err != nil {
     167           1 :                 return err
     168           1 :         }
     169             : 
     170           1 :         walk(stderr, f.opts.FS, dir, func(path string) {
     171           1 :                 ft, fileNum, ok := base.ParseFilename(f.opts.FS, path)
     172           1 :                 if !ok {
     173           0 :                         return
     174           0 :                 }
     175           1 :                 switch ft {
     176           1 :                 case base.FileTypeLog:
     177           1 :                         f.logs = append(f.logs, fileNum)
     178           1 :                 case base.FileTypeManifest:
     179           1 :                         f.manifests = append(f.manifests, fileNum)
     180           1 :                 case base.FileTypeTable:
     181           1 :                         f.tables = append(f.tables, fileNum.FileNum())
     182           1 :                 default:
     183           1 :                         return
     184             :                 }
     185           1 :                 f.files[fileNum] = path
     186             :         })
     187             : 
     188           1 :         sort.Slice(f.logs, func(i, j int) bool {
     189           1 :                 return f.logs[i] < f.logs[j]
     190           1 :         })
     191           1 :         sort.Slice(f.manifests, func(i, j int) bool {
     192           0 :                 return f.manifests[i] < f.manifests[j]
     193           0 :         })
     194           1 :         sort.Slice(f.tables, func(i, j int) bool {
     195           1 :                 return f.tables[i] < f.tables[j]
     196           1 :         })
     197             : 
     198           1 :         if f.verbose {
     199           1 :                 fmt.Fprintf(stdout, "%s\n", dir)
     200           1 :                 fmt.Fprintf(stdout, "%5d %s\n", len(f.manifests), makePlural("manifest", int64(len(f.manifests))))
     201           1 :                 fmt.Fprintf(stdout, "%5d %s\n", len(f.logs), makePlural("log", int64(len(f.logs))))
     202           1 :                 fmt.Fprintf(stdout, "%5d %s\n", len(f.tables), makePlural("sstable", int64(len(f.tables))))
     203           1 :         }
     204           1 :         return nil
     205             : }
     206             : 
     207             : // Read the manifests and populate the editRefs map which is used to determine
     208             : // the provenance and metadata of tables.
     209           1 : func (f *findT) readManifests(stdout io.Writer) {
     210           1 :         for _, fileNum := range f.manifests {
     211           1 :                 func() {
     212           1 :                         path := f.files[fileNum]
     213           1 :                         mf, err := f.opts.FS.Open(path)
     214           1 :                         if err != nil {
     215           0 :                                 fmt.Fprintf(stdout, "%s\n", err)
     216           0 :                                 return
     217           0 :                         }
     218           1 :                         defer mf.Close()
     219           1 : 
     220           1 :                         if f.verbose {
     221           1 :                                 fmt.Fprintf(stdout, "%s\n", path)
     222           1 :                         }
     223             : 
     224           1 :                         rr := record.NewReader(mf, 0 /* logNum */)
     225           1 :                         for {
     226           1 :                                 r, err := rr.Next()
     227           1 :                                 if err != nil {
     228           1 :                                         if err != io.EOF {
     229           0 :                                                 fmt.Fprintf(stdout, "%s: %s\n", path, err)
     230           0 :                                         }
     231           1 :                                         break
     232             :                                 }
     233             : 
     234           1 :                                 var ve manifest.VersionEdit
     235           1 :                                 if err := ve.Decode(r); err != nil {
     236           0 :                                         fmt.Fprintf(stdout, "%s: %s\n", path, err)
     237           0 :                                         break
     238             :                                 }
     239           1 :                                 i := len(f.edits)
     240           1 :                                 f.edits = append(f.edits, ve)
     241           1 : 
     242           1 :                                 if ve.ComparerName != "" {
     243           1 :                                         f.comparerName = ve.ComparerName
     244           1 :                                 }
     245           1 :                                 if num := ve.MinUnflushedLogNum.FileNum(); num != 0 {
     246           1 :                                         f.editRefs[num] = append(f.editRefs[num], i)
     247           1 :                                 }
     248           1 :                                 for df := range ve.DeletedFiles {
     249           1 :                                         f.editRefs[df.FileNum] = append(f.editRefs[df.FileNum], i)
     250           1 :                                 }
     251           1 :                                 for _, nf := range ve.NewFiles {
     252           1 :                                         // The same file can be deleted and added in a single version edit
     253           1 :                                         // which indicates a "move" compaction. Only add the edit to the list
     254           1 :                                         // once.
     255           1 :                                         refs := f.editRefs[nf.Meta.FileNum]
     256           1 :                                         if n := len(refs); n == 0 || refs[n-1] != i {
     257           1 :                                                 f.editRefs[nf.Meta.FileNum] = append(refs, i)
     258           1 :                                         }
     259           1 :                                         if _, ok := f.tableMeta[nf.Meta.FileNum]; !ok {
     260           1 :                                                 f.tableMeta[nf.Meta.FileNum] = nf.Meta
     261           1 :                                         }
     262             :                                 }
     263             :                         }
     264             :                 }()
     265             :         }
     266             : 
     267           1 :         if f.verbose {
     268           1 :                 fmt.Fprintf(stdout, "%5d %s\n", len(f.edits), makePlural("edit", int64(len(f.edits))))
     269           1 :         }
     270             : }
     271             : 
     272             : // Search the logs and sstables for references to the specified key.
     273           1 : func (f *findT) search(stdout io.Writer, key []byte) []findRef {
     274           1 :         refs := f.searchLogs(stdout, key, nil)
     275           1 :         refs = f.searchTables(stdout, key, refs)
     276           1 : 
     277           1 :         // For a given file (log or table) the references are already in the correct
     278           1 :         // order. We simply want to order the references by fileNum using a stable
     279           1 :         // sort.
     280           1 :         //
     281           1 :         // TODO(peter): I'm not sure if this is perfectly correct with regards to log
     282           1 :         // files and ingested sstables, but it is close enough and doing something
     283           1 :         // better is onerous. Revisit if this ever becomes problematic (e.g. if we
     284           1 :         // allow finding more than one key at a time).
     285           1 :         //
     286           1 :         // An example of the problem with logs and ingestion (which can only occur
     287           1 :         // with distinct keys). If I write key "a" to a log, I can then ingested key
     288           1 :         // "b" without causing "a" to be flushed. Then I can write key "c" to the
     289           1 :         // log. Ideally, we'd show the key "a" from the log, then the key "b" from
     290           1 :         // the ingested sstable, then key "c" from the log.
     291           1 :         sort.SliceStable(refs, func(i, j int) bool {
     292           1 :                 return refs[i].fileNum < refs[j].fileNum
     293           1 :         })
     294           1 :         return refs
     295             : }
     296             : 
     297             : // Search the logs for references to the specified key.
     298           1 : func (f *findT) searchLogs(stdout io.Writer, searchKey []byte, refs []findRef) []findRef {
     299           1 :         cmp := f.opts.Comparer.Compare
     300           1 :         for _, fileNum := range f.logs {
     301           1 :                 _ = func() (err error) {
     302           1 :                         path := f.files[fileNum]
     303           1 :                         lf, err := f.opts.FS.Open(path)
     304           1 :                         if err != nil {
     305           0 :                                 fmt.Fprintf(stdout, "%s\n", err)
     306           0 :                                 return
     307           0 :                         }
     308           1 :                         defer lf.Close()
     309           1 : 
     310           1 :                         if f.verbose {
     311           1 :                                 fmt.Fprintf(stdout, "%s", path)
     312           1 :                                 defer fmt.Fprintf(stdout, "\n")
     313           1 :                         }
     314           1 :                         defer func() {
     315           1 :                                 switch err {
     316           0 :                                 case record.ErrZeroedChunk:
     317           0 :                                         if f.verbose {
     318           0 :                                                 fmt.Fprintf(stdout, ": EOF [%s] (may be due to WAL preallocation)", err)
     319           0 :                                         }
     320           0 :                                 case record.ErrInvalidChunk:
     321           0 :                                         if f.verbose {
     322           0 :                                                 fmt.Fprintf(stdout, ": EOF [%s] (may be due to WAL recycling)", err)
     323           0 :                                         }
     324           1 :                                 default:
     325           1 :                                         if err != io.EOF {
     326           0 :                                                 if f.verbose {
     327           0 :                                                         fmt.Fprintf(stdout, ": %s", err)
     328           0 :                                                 } else {
     329           0 :                                                         fmt.Fprintf(stdout, "%s: %s\n", path, err)
     330           0 :                                                 }
     331             :                                         }
     332             :                                 }
     333             :                         }()
     334             : 
     335           1 :                         var b pebble.Batch
     336           1 :                         var buf bytes.Buffer
     337           1 :                         rr := record.NewReader(lf, fileNum)
     338           1 :                         for {
     339           1 :                                 r, err := rr.Next()
     340           1 :                                 if err == nil {
     341           1 :                                         buf.Reset()
     342           1 :                                         _, err = io.Copy(&buf, r)
     343           1 :                                 }
     344           1 :                                 if err != nil {
     345           1 :                                         return err
     346           1 :                                 }
     347             : 
     348           1 :                                 b = pebble.Batch{}
     349           1 :                                 if err := b.SetRepr(buf.Bytes()); err != nil {
     350           0 :                                         fmt.Fprintf(stdout, "%s: corrupt log file: %v", path, err)
     351           0 :                                         continue
     352             :                                 }
     353           1 :                                 seqNum := b.SeqNum()
     354           1 :                                 for r := b.Reader(); ; seqNum++ {
     355           1 :                                         kind, ukey, value, ok := r.Next()
     356           1 :                                         if !ok {
     357           1 :                                                 break
     358             :                                         }
     359           1 :                                         ikey := base.MakeInternalKey(ukey, seqNum, kind)
     360           1 :                                         switch kind {
     361             :                                         case base.InternalKeyKindDelete,
     362             :                                                 base.InternalKeyKindDeleteSized,
     363             :                                                 base.InternalKeyKindSet,
     364             :                                                 base.InternalKeyKindMerge,
     365             :                                                 base.InternalKeyKindSingleDelete,
     366           1 :                                                 base.InternalKeyKindSetWithDelete:
     367           1 :                                                 if cmp(searchKey, ikey.UserKey) != 0 {
     368           1 :                                                         continue
     369             :                                                 }
     370           1 :                                         case base.InternalKeyKindRangeDelete:
     371           1 :                                                 // Output tombstones that contain or end with the search key.
     372           1 :                                                 t := rangedel.Decode(ikey, value, nil)
     373           1 :                                                 if !t.Contains(cmp, searchKey) && cmp(t.End, searchKey) != 0 {
     374           1 :                                                         continue
     375             :                                                 }
     376           0 :                                         default:
     377           0 :                                                 continue
     378             :                                         }
     379             : 
     380           1 :                                         refs = append(refs, findRef{
     381           1 :                                                 key:     ikey.Clone(),
     382           1 :                                                 value:   append([]byte(nil), value...),
     383           1 :                                                 fileNum: fileNum.FileNum(),
     384           1 :                                         })
     385             :                                 }
     386             :                         }
     387             :                 }()
     388             :         }
     389           1 :         return refs
     390             : }
     391             : 
     392             : // Search the tables for references to the specified key.
     393           1 : func (f *findT) searchTables(stdout io.Writer, searchKey []byte, refs []findRef) []findRef {
     394           1 :         cache := pebble.NewCache(128 << 20 /* 128 MB */)
     395           1 :         defer cache.Unref()
     396           1 : 
     397           1 :         f.tableRefs = make(map[base.FileNum]bool)
     398           1 :         for _, fileNum := range f.tables {
     399           1 :                 _ = func() (err error) {
     400           1 :                         path := f.files[fileNum.DiskFileNum()]
     401           1 :                         tf, err := f.opts.FS.Open(path)
     402           1 :                         if err != nil {
     403           0 :                                 fmt.Fprintf(stdout, "%s\n", err)
     404           0 :                                 return
     405           0 :                         }
     406             : 
     407           1 :                         m := f.tableMeta[fileNum]
     408           1 :                         if f.verbose {
     409           1 :                                 fmt.Fprintf(stdout, "%s", path)
     410           1 :                                 if m != nil && m.SmallestSeqNum == m.LargestSeqNum {
     411           1 :                                         fmt.Fprintf(stdout, ": global seqnum: %d", m.LargestSeqNum)
     412           1 :                                 }
     413           1 :                                 defer fmt.Fprintf(stdout, "\n")
     414             :                         }
     415           1 :                         defer func() {
     416           1 :                                 switch {
     417           0 :                                 case err != nil:
     418           0 :                                         if f.verbose {
     419           0 :                                                 fmt.Fprintf(stdout, ": %v", err)
     420           0 :                                         } else {
     421           0 :                                                 fmt.Fprintf(stdout, "%s: %v\n", path, err)
     422           0 :                                         }
     423             :                                 }
     424             :                         }()
     425             : 
     426           1 :                         opts := sstable.ReaderOptions{
     427           1 :                                 Cache:    cache,
     428           1 :                                 Comparer: f.opts.Comparer,
     429           1 :                                 Filters:  f.opts.Filters,
     430           1 :                         }
     431           1 :                         readable, err := sstable.NewSimpleReadable(tf)
     432           1 :                         if err != nil {
     433           0 :                                 return err
     434           0 :                         }
     435           1 :                         r, err := sstable.NewReader(readable, opts, f.comparers, f.mergers,
     436           1 :                                 private.SSTableRawTombstonesOpt.(sstable.ReaderOption))
     437           1 :                         if err != nil {
     438           1 :                                 f.errors = append(f.errors, fmt.Sprintf("Unable to decode sstable %s, %s", f.files[fileNum.DiskFileNum()], err.Error()))
     439           1 :                                 // Ensure the error only gets printed once.
     440           1 :                                 err = nil
     441           1 :                                 return
     442           1 :                         }
     443           1 :                         defer r.Close()
     444           1 : 
     445           1 :                         if m != nil && m.SmallestSeqNum == m.LargestSeqNum {
     446           1 :                                 r.Properties.GlobalSeqNum = m.LargestSeqNum
     447           1 :                         }
     448             : 
     449           1 :                         iter, err := r.NewIter(nil, nil)
     450           1 :                         if err != nil {
     451           0 :                                 return err
     452           0 :                         }
     453           1 :                         defer iter.Close()
     454           1 :                         key, value := iter.SeekGE(searchKey, base.SeekGEFlagsNone)
     455           1 : 
     456           1 :                         // We configured sstable.Reader to return raw tombstones which requires a
     457           1 :                         // bit more work here to put them in a form that can be iterated in
     458           1 :                         // parallel with the point records.
     459           1 :                         rangeDelIter, err := func() (keyspan.FragmentIterator, error) {
     460           1 :                                 iter, err := r.NewRawRangeDelIter()
     461           1 :                                 if err != nil {
     462           0 :                                         return nil, err
     463           0 :                                 }
     464           1 :                                 if iter == nil {
     465           1 :                                         return keyspan.NewIter(r.Compare, nil), nil
     466           1 :                                 }
     467           1 :                                 defer iter.Close()
     468           1 : 
     469           1 :                                 var tombstones []keyspan.Span
     470           1 :                                 for t := iter.First(); t != nil; t = iter.Next() {
     471           1 :                                         if !t.Contains(r.Compare, searchKey) {
     472           1 :                                                 continue
     473             :                                         }
     474           1 :                                         tombstones = append(tombstones, t.ShallowClone())
     475             :                                 }
     476             : 
     477           1 :                                 sort.Slice(tombstones, func(i, j int) bool {
     478           0 :                                         return r.Compare(tombstones[i].Start, tombstones[j].Start) < 0
     479           0 :                                 })
     480           1 :                                 return keyspan.NewIter(r.Compare, tombstones), nil
     481             :                         }()
     482           1 :                         if err != nil {
     483           0 :                                 return err
     484           0 :                         }
     485             : 
     486           1 :                         defer rangeDelIter.Close()
     487           1 :                         rangeDel := rangeDelIter.First()
     488           1 : 
     489           1 :                         foundRef := false
     490           1 :                         for key != nil || rangeDel != nil {
     491           1 :                                 if key != nil &&
     492           1 :                                         (rangeDel == nil || r.Compare(key.UserKey, rangeDel.Start) < 0) {
     493           1 :                                         if r.Compare(searchKey, key.UserKey) != 0 {
     494           1 :                                                 key, value = nil, base.LazyValue{}
     495           1 :                                                 continue
     496             :                                         }
     497           1 :                                         v, _, err := value.Value(nil)
     498           1 :                                         if err != nil {
     499           0 :                                                 return err
     500           0 :                                         }
     501           1 :                                         refs = append(refs, findRef{
     502           1 :                                                 key:     key.Clone(),
     503           1 :                                                 value:   append([]byte(nil), v...),
     504           1 :                                                 fileNum: fileNum,
     505           1 :                                         })
     506           1 :                                         key, value = iter.Next()
     507           1 :                                 } else {
     508           1 :                                         // Use rangedel.Encode to add a reference for each key
     509           1 :                                         // within the span.
     510           1 :                                         err := rangedel.Encode(rangeDel, func(k base.InternalKey, v []byte) error {
     511           1 :                                                 refs = append(refs, findRef{
     512           1 :                                                         key:     k.Clone(),
     513           1 :                                                         value:   append([]byte(nil), v...),
     514           1 :                                                         fileNum: fileNum,
     515           1 :                                                 })
     516           1 :                                                 return nil
     517           1 :                                         })
     518           1 :                                         if err != nil {
     519           0 :                                                 return err
     520           0 :                                         }
     521           1 :                                         rangeDel = rangeDelIter.Next()
     522             :                                 }
     523           1 :                                 foundRef = true
     524             :                         }
     525             : 
     526           1 :                         if foundRef {
     527           1 :                                 f.tableRefs[fileNum] = true
     528           1 :                         }
     529           1 :                         return nil
     530             :                 }()
     531             :         }
     532           1 :         return refs
     533             : }
     534             : 
     535             : // Determine the provenance of the specified table. We search the version edits
     536             : // for the first edit which created the table, and then analyze the edit to
     537             : // determine if it was a compaction, flush, or ingestion. Returns an empty
     538             : // string if the provenance of a table cannot be determined.
     539           1 : func (f *findT) tableProvenance(fileNum base.FileNum) string {
     540           1 :         editRefs := f.editRefs[fileNum]
     541           1 :         for len(editRefs) > 0 {
     542           1 :                 ve := f.edits[editRefs[0]]
     543           1 :                 editRefs = editRefs[1:]
     544           1 :                 for _, nf := range ve.NewFiles {
     545           1 :                         if fileNum != nf.Meta.FileNum {
     546           1 :                                 continue
     547             :                         }
     548             : 
     549           1 :                         var buf bytes.Buffer
     550           1 :                         switch {
     551           1 :                         case len(ve.DeletedFiles) > 0:
     552           1 :                                 // A version edit with deleted files is a compaction. The deleted
     553           1 :                                 // files are the inputs to the compaction. We're going to
     554           1 :                                 // reconstruct the input files and display those inputs that
     555           1 :                                 // contain the search key (i.e. are list in refs) and use an
     556           1 :                                 // ellipsis to indicate when there were other inputs that have
     557           1 :                                 // been elided.
     558           1 :                                 var sourceLevels []int
     559           1 :                                 levels := make(map[int][]base.FileNum)
     560           1 :                                 for df := range ve.DeletedFiles {
     561           1 :                                         files := levels[df.Level]
     562           1 :                                         if len(files) == 0 {
     563           1 :                                                 sourceLevels = append(sourceLevels, df.Level)
     564           1 :                                         }
     565           1 :                                         levels[df.Level] = append(files, df.FileNum)
     566             :                                 }
     567             : 
     568           1 :                                 sort.Ints(sourceLevels)
     569           1 :                                 if sourceLevels[len(sourceLevels)-1] != nf.Level {
     570           0 :                                         sourceLevels = append(sourceLevels, nf.Level)
     571           0 :                                 }
     572             : 
     573           1 :                                 sep := " "
     574           1 :                                 fmt.Fprintf(&buf, "compacted")
     575           1 :                                 for _, level := range sourceLevels {
     576           1 :                                         files := levels[level]
     577           1 :                                         sort.Slice(files, func(i, j int) bool {
     578           1 :                                                 return files[i] < files[j]
     579           1 :                                         })
     580             : 
     581           1 :                                         fmt.Fprintf(&buf, "%sL%d [", sep, level)
     582           1 :                                         sep = ""
     583           1 :                                         elided := false
     584           1 :                                         for _, fileNum := range files {
     585           1 :                                                 if f.tableRefs[fileNum] {
     586           1 :                                                         fmt.Fprintf(&buf, "%s%s", sep, fileNum)
     587           1 :                                                         sep = " "
     588           1 :                                                 } else {
     589           1 :                                                         elided = true
     590           1 :                                                 }
     591             :                                         }
     592           1 :                                         if elided {
     593           1 :                                                 fmt.Fprintf(&buf, "%s...", sep)
     594           1 :                                         }
     595           1 :                                         fmt.Fprintf(&buf, "]")
     596           1 :                                         sep = " + "
     597             :                                 }
     598             : 
     599           1 :                         case ve.MinUnflushedLogNum != 0:
     600           1 :                                 // A version edit with a min-unflushed log indicates a flush
     601           1 :                                 // operation.
     602           1 :                                 fmt.Fprintf(&buf, "flushed to L%d", nf.Level)
     603             : 
     604           1 :                         case nf.Meta.SmallestSeqNum == nf.Meta.LargestSeqNum:
     605           1 :                                 // If the smallest and largest seqnum are the same, the file was
     606           1 :                                 // ingested. Note that this can also occur for a flushed sstable
     607           1 :                                 // that contains only a single key, though that would have
     608           1 :                                 // already been captured above.
     609           1 :                                 fmt.Fprintf(&buf, "ingested to L%d", nf.Level)
     610             : 
     611           0 :                         default:
     612           0 :                                 // The provenance of the table is unclear. This is usually due to
     613           0 :                                 // the MANIFEST rolling over and taking a snapshot of the LSM
     614           0 :                                 // state.
     615           0 :                                 fmt.Fprintf(&buf, "added to L%d", nf.Level)
     616             :                         }
     617             : 
     618             :                         // After a table is created, it can be moved to a different level via a
     619             :                         // move compaction. This is indicated by a version edit that deletes the
     620             :                         // table from one level and adds the same table to a different
     621             :                         // level. Loop over the remaining version edits for the table looking for
     622             :                         // such moves.
     623           1 :                         for len(editRefs) > 0 {
     624           1 :                                 ve := f.edits[editRefs[0]]
     625           1 :                                 editRefs = editRefs[1:]
     626           1 :                                 for _, nf := range ve.NewFiles {
     627           1 :                                         if fileNum == nf.Meta.FileNum {
     628           1 :                                                 for df := range ve.DeletedFiles {
     629           1 :                                                         if fileNum == df.FileNum {
     630           1 :                                                                 fmt.Fprintf(&buf, ", moved to L%d", nf.Level)
     631           1 :                                                                 break
     632             :                                                         }
     633             :                                                 }
     634           1 :                                                 break
     635             :                                         }
     636             :                                 }
     637             :                         }
     638             : 
     639           1 :                         return buf.String()
     640             :                 }
     641             :         }
     642           1 :         return ""
     643             : }

Generated by: LCOV version 1.14