LCOV - code coverage report
Current view: top level - pebble/metamorphic - retryable.go (source / functions) Hit Total Coverage
Test: 2023-09-20 08:16Z f509cd48 - meta test only.lcov Lines: 219 226 96.9 %
Date: 2023-09-20 08:17:46 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package metamorphic
       6             : 
       7             : import (
       8             :         "bytes"
       9             : 
      10             :         "github.com/cockroachdb/errors"
      11             :         "github.com/cockroachdb/pebble"
      12             :         "github.com/cockroachdb/pebble/internal/testkeys"
      13             :         "github.com/cockroachdb/pebble/vfs/errorfs"
      14             : )
      15             : 
      16             : // withRetries executes fn, retrying it whenever an errorfs.ErrInjected error
      17             : // is returned.  It returns the first nil or non-errorfs.ErrInjected error
      18             : // returned by fn.
      19           1 : func withRetries(fn func() error) error {
      20           1 :         for {
      21           1 :                 if err := fn(); !errors.Is(err, errorfs.ErrInjected) {
      22           1 :                         return err
      23           1 :                 }
      24             :         }
      25             : }
      26             : 
      27             : // retryableIter holds an iterator and the state necessary to reset it to its
      28             : // state after the last successful operation. This allows us to retry failed
      29             : // iterator operations by running them again on a non-error iterator with the
      30             : // same pre-operation state.
      31             : type retryableIter struct {
      32             :         iter    *pebble.Iterator
      33             :         lastKey []byte
      34             : 
      35             :         // When filterMax is >0, this iterator filters out keys with suffixes
      36             :         // outside of the range [filterMin, filterMax). Keys without suffixes are
      37             :         // surfaced. This is used to ensure determinism regardless of whether
      38             :         // block-property filters filter keys or not.
      39             :         filterMin, filterMax uint64
      40             : 
      41             :         // rangeKeyChangeGuess is only used if the iterator has a filter set. A single
      42             :         // operation on the retryableIter may result in many operations on
      43             :         // retryableIter.iter if we need to skip filtered keys. Thus, the value of
      44             :         // retryableIter.iter.RangeKeyChanged() will not necessarily indicate if the
      45             :         // range key actually changed.
      46             :         //
      47             :         // Since one call to a positioning operation may lead to multiple
      48             :         // positioning operations, we set rangeKeyChangeGuess to false, iff every single
      49             :         // positioning operation returned iter.RangeKeyChanged() == false.
      50             :         //
      51             :         // rangeKeyChangeGuess == true implies that at least one of the many iterator
      52             :         // operations returned RangeKeyChanged to true, but we may have false
      53             :         // positives. We can't assume that the range key actually changed if
      54             :         // iter.RangeKeyChanged() returns true after one of the positioning
      55             :         // operations. Consider a db with two range keys, which are also in the same
      56             :         // block, a-f, g-h where the iterator's filter excludes g-h. If the iterator
      57             :         // is positioned on a-f, then a call to SeekLT(z), will position the iterator
      58             :         // over g-h, but the retryableIter will call Prev and the iterator will be
      59             :         // positioned back over a-f. In this case the range key hasn't changed, but
      60             :         // one of the positioning operations will return iter.RangeKeyChanged() ==
      61             :         // true.
      62             :         rangeKeyChangeGuess bool
      63             : 
      64             :         // rangeKeyChanged is the true accurate value of whether the range key has
      65             :         // changed from the perspective of a client of the retryableIter. It is used
      66             :         // to determine if rangeKeyChangeGuess is a false positive. It is computed
      67             :         // by comparing the range key at the current position with the range key
      68             :         // at the previous position.
      69             :         rangeKeyChanged bool
      70             : 
      71             :         // When a filter is set on the iterator, one positioning op from the
      72             :         // perspective of a client of the retryableIter, may result in multiple
      73             :         // intermediary positioning ops. This bool is set if the current positioning
      74             :         // op is intermediate.
      75             :         intermediatePosition bool
      76             : 
      77             :         rkeyBuff []byte
      78             : }
      79             : 
      80           1 : func (i *retryableIter) shouldFilter() bool {
      81           1 :         if i.filterMax == 0 {
      82           1 :                 return false
      83           1 :         }
      84           1 :         k := i.iter.Key()
      85           1 :         n := testkeys.Comparer.Split(k)
      86           1 :         if n == len(k) {
      87           1 :                 // No suffix, don't filter it.
      88           1 :                 return false
      89           1 :         }
      90           1 :         v, err := testkeys.ParseSuffix(k[n:])
      91           1 :         if err != nil {
      92           0 :                 panic(err)
      93             :         }
      94           1 :         ts := uint64(v)
      95           1 :         return ts < i.filterMin || ts >= i.filterMax
      96             : }
      97             : 
      98           1 : func (i *retryableIter) needRetry() bool {
      99           1 :         return errors.Is(i.iter.Error(), errorfs.ErrInjected)
     100           1 : }
     101             : 
     102           1 : func (i *retryableIter) withRetry(fn func()) {
     103           1 :         for {
     104           1 :                 fn()
     105           1 :                 if !i.needRetry() {
     106           1 :                         break
     107             :                 }
     108           0 :                 for i.needRetry() {
     109           0 :                         i.iter.SeekGE(i.lastKey)
     110           0 :                 }
     111             :         }
     112             : 
     113           1 :         i.lastKey = i.lastKey[:0]
     114           1 :         if i.iter.Valid() {
     115           1 :                 i.lastKey = append(i.lastKey, i.iter.Key()...)
     116           1 :         }
     117             : }
     118             : 
     119           1 : func (i *retryableIter) Close() error {
     120           1 :         return i.iter.Close()
     121           1 : }
     122             : 
     123           1 : func (i *retryableIter) Error() error {
     124           1 :         return i.iter.Error()
     125           1 : }
     126             : 
     127             : // A call to an iterator positioning function may result in sub calls to other
     128             : // iterator positioning functions. We need to run some code in the top level
     129             : // call, so we use withPosition to reduce code duplication in the positioning
     130             : // functions.
     131           1 : func (i *retryableIter) withPosition(fn func()) {
     132           1 :         // For the top level op, i.intermediatePosition must always be false.
     133           1 :         intermediate := i.intermediatePosition
     134           1 :         // Any subcalls to positioning ops will be intermediate.
     135           1 :         i.intermediatePosition = true
     136           1 :         defer func() {
     137           1 :                 i.intermediatePosition = intermediate
     138           1 :         }()
     139             : 
     140           1 :         if !intermediate {
     141           1 :                 // Clear out the previous value stored in the buff.
     142           1 :                 i.rkeyBuff = i.rkeyBuff[:0]
     143           1 :                 if _, hasRange := i.iter.HasPointAndRange(); hasRange {
     144           1 :                         // This is a top level positioning op. We should determine if the iter
     145           1 :                         // is positioned over a range key to later determine if the range key
     146           1 :                         // changed.
     147           1 :                         startTmp, _ := i.iter.RangeBounds()
     148           1 :                         i.rkeyBuff = append(i.rkeyBuff, startTmp...)
     149           1 : 
     150           1 :                 }
     151             :                 // Set this to false. Any positioning op can set this to true.
     152           1 :                 i.rangeKeyChangeGuess = false
     153             :         }
     154             : 
     155           1 :         fn()
     156           1 : 
     157           1 :         if !intermediate {
     158           1 :                 // Check if the range key changed.
     159           1 :                 var newStartKey []byte
     160           1 :                 if _, hasRange := i.iter.HasPointAndRange(); hasRange {
     161           1 :                         newStartKey, _ = i.iter.RangeBounds()
     162           1 :                 }
     163             : 
     164           1 :                 i.rangeKeyChanged = !bytes.Equal(newStartKey, i.rkeyBuff)
     165             :         }
     166             : }
     167             : 
     168           1 : func (i *retryableIter) updateRangeKeyChangedGuess() {
     169           1 :         i.rangeKeyChangeGuess = i.rangeKeyChangeGuess || i.iter.RangeKeyChanged()
     170           1 : }
     171             : 
     172           1 : func (i *retryableIter) First() bool {
     173           1 :         var valid bool
     174           1 :         i.withPosition(func() {
     175           1 :                 i.withRetry(func() {
     176           1 :                         valid = i.iter.First()
     177           1 :                 })
     178           1 :                 i.updateRangeKeyChangedGuess()
     179           1 :                 if valid && i.shouldFilter() {
     180           1 :                         valid = i.Next()
     181           1 :                 }
     182             :         })
     183           1 :         return valid
     184             : }
     185             : 
     186           1 : func (i *retryableIter) Key() []byte {
     187           1 :         return i.iter.Key()
     188           1 : }
     189             : 
     190           1 : func (i *retryableIter) RangeKeyChanged() bool {
     191           1 :         if i.filterMax == 0 {
     192           1 :                 return i.iter.RangeKeyChanged()
     193           1 :         }
     194             : 
     195           1 :         if !i.rangeKeyChangeGuess {
     196           1 :                 // false negatives shouldn't be possible so just return.
     197           1 :                 return false
     198           1 :         }
     199             : 
     200             :         // i.rangeKeyChangeGuess is true. This may be a false positive, so just
     201             :         // return i.rangeKeyChanged which will always be correct.
     202           1 :         return i.rangeKeyChanged
     203             : }
     204             : 
     205           1 : func (i *retryableIter) HasPointAndRange() (bool, bool) {
     206           1 :         return i.iter.HasPointAndRange()
     207           1 : }
     208             : 
     209           1 : func (i *retryableIter) RangeBounds() ([]byte, []byte) {
     210           1 :         return i.iter.RangeBounds()
     211           1 : }
     212             : 
     213           1 : func (i *retryableIter) RangeKeys() []pebble.RangeKeyData {
     214           1 :         return i.iter.RangeKeys()
     215           1 : }
     216             : 
     217           1 : func (i *retryableIter) Last() bool {
     218           1 :         var valid bool
     219           1 :         i.withPosition(func() {
     220           1 :                 i.withRetry(func() { valid = i.iter.Last() })
     221           1 :                 i.updateRangeKeyChangedGuess()
     222           1 :                 if valid && i.shouldFilter() {
     223           1 :                         valid = i.Prev()
     224           1 :                 }
     225             :         })
     226           1 :         return valid
     227             : }
     228             : 
     229           1 : func (i *retryableIter) Next() bool {
     230           1 :         var valid bool
     231           1 :         i.withPosition(func() {
     232           1 :                 i.withRetry(func() {
     233           1 :                         valid = i.iter.Next()
     234           1 :                         i.updateRangeKeyChangedGuess()
     235           1 :                         for valid && i.shouldFilter() {
     236           1 :                                 valid = i.iter.Next()
     237           1 :                                 i.updateRangeKeyChangedGuess()
     238           1 :                         }
     239             :                 })
     240             :         })
     241           1 :         return valid
     242             : }
     243             : 
     244           1 : func (i *retryableIter) NextWithLimit(limit []byte) pebble.IterValidityState {
     245           1 :         var validity pebble.IterValidityState
     246           1 :         i.withPosition(func() {
     247           1 :                 i.withRetry(func() {
     248           1 :                         validity = i.iter.NextWithLimit(limit)
     249           1 :                         i.updateRangeKeyChangedGuess()
     250           1 :                         for validity == pebble.IterValid && i.shouldFilter() {
     251           1 :                                 validity = i.iter.NextWithLimit(limit)
     252           1 :                                 i.updateRangeKeyChangedGuess()
     253           1 :                         }
     254             :                 })
     255             :         })
     256           1 :         return validity
     257             : 
     258             : }
     259             : 
     260           1 : func (i *retryableIter) NextPrefix() bool {
     261           1 :         var valid bool
     262           1 :         i.withPosition(func() {
     263           1 :                 i.withRetry(func() {
     264           1 :                         valid = i.iter.NextPrefix()
     265           1 :                         i.updateRangeKeyChangedGuess()
     266           1 :                         for valid && i.shouldFilter() {
     267           1 :                                 valid = i.iter.Next()
     268           1 :                                 i.updateRangeKeyChangedGuess()
     269           1 :                         }
     270             :                 })
     271             :         })
     272           1 :         return valid
     273             : }
     274             : 
     275           1 : func (i *retryableIter) Prev() bool {
     276           1 :         var valid bool
     277           1 :         i.withPosition(func() {
     278           1 :                 i.withRetry(func() {
     279           1 :                         valid = i.iter.Prev()
     280           1 :                         i.updateRangeKeyChangedGuess()
     281           1 :                         for valid && i.shouldFilter() {
     282           1 :                                 valid = i.iter.Prev()
     283           1 :                                 i.updateRangeKeyChangedGuess()
     284           1 :                         }
     285             :                 })
     286             :         })
     287           1 :         return valid
     288             : }
     289             : 
     290           1 : func (i *retryableIter) PrevWithLimit(limit []byte) pebble.IterValidityState {
     291           1 :         var validity pebble.IterValidityState
     292           1 :         i.withPosition(func() {
     293           1 :                 i.withRetry(func() {
     294           1 :                         validity = i.iter.PrevWithLimit(limit)
     295           1 :                         i.updateRangeKeyChangedGuess()
     296           1 :                         for validity == pebble.IterValid && i.shouldFilter() {
     297           1 :                                 validity = i.iter.PrevWithLimit(limit)
     298           1 :                                 i.updateRangeKeyChangedGuess()
     299           1 :                         }
     300             :                 })
     301             :         })
     302           1 :         return validity
     303             : }
     304             : 
     305           1 : func (i *retryableIter) SeekGE(key []byte) bool {
     306           1 :         var valid bool
     307           1 :         i.withPosition(func() {
     308           1 :                 i.withRetry(func() { valid = i.iter.SeekGE(key) })
     309           1 :                 i.updateRangeKeyChangedGuess()
     310           1 :                 if valid && i.shouldFilter() {
     311           1 :                         valid = i.Next()
     312           1 :                 }
     313             :         })
     314           1 :         return valid
     315             : }
     316             : 
     317           1 : func (i *retryableIter) SeekGEWithLimit(key []byte, limit []byte) pebble.IterValidityState {
     318           1 :         var validity pebble.IterValidityState
     319           1 :         i.withPosition(func() {
     320           1 :                 i.withRetry(func() { validity = i.iter.SeekGEWithLimit(key, limit) })
     321           1 :                 i.updateRangeKeyChangedGuess()
     322           1 :                 if validity == pebble.IterValid && i.shouldFilter() {
     323           1 :                         validity = i.NextWithLimit(limit)
     324           1 :                 }
     325             :         })
     326           1 :         return validity
     327             : }
     328             : 
     329           1 : func (i *retryableIter) SeekLT(key []byte) bool {
     330           1 :         var valid bool
     331           1 :         i.withPosition(func() {
     332           1 :                 i.withRetry(func() { valid = i.iter.SeekLT(key) })
     333           1 :                 i.updateRangeKeyChangedGuess()
     334           1 :                 if valid && i.shouldFilter() {
     335           1 :                         valid = i.Prev()
     336           1 :                 }
     337             :         })
     338           1 :         return valid
     339             : }
     340             : 
     341           1 : func (i *retryableIter) SeekLTWithLimit(key []byte, limit []byte) pebble.IterValidityState {
     342           1 :         var validity pebble.IterValidityState
     343           1 :         i.withPosition(func() {
     344           1 :                 i.withRetry(func() { validity = i.iter.SeekLTWithLimit(key, limit) })
     345           1 :                 i.updateRangeKeyChangedGuess()
     346           1 :                 if validity == pebble.IterValid && i.shouldFilter() {
     347           1 :                         validity = i.PrevWithLimit(limit)
     348           1 :                 }
     349             :         })
     350           1 :         return validity
     351             : }
     352             : 
     353           1 : func (i *retryableIter) SeekPrefixGE(key []byte) bool {
     354           1 :         var valid bool
     355           1 :         i.withPosition(func() {
     356           1 :                 i.withRetry(func() { valid = i.iter.SeekPrefixGE(key) })
     357           1 :                 i.updateRangeKeyChangedGuess()
     358           1 :                 if valid && i.shouldFilter() {
     359           1 :                         valid = i.Next()
     360           1 :                 }
     361             :         })
     362           1 :         return valid
     363             : }
     364             : 
     365           1 : func (i *retryableIter) SetBounds(lower, upper []byte) {
     366           1 :         i.iter.SetBounds(lower, upper)
     367           1 : }
     368             : 
     369           1 : func (i *retryableIter) SetOptions(opts *pebble.IterOptions) {
     370           1 :         i.iter.SetOptions(opts)
     371           1 : }
     372             : 
     373           0 : func (i *retryableIter) Valid() bool {
     374           0 :         return i.iter.Valid()
     375           0 : }
     376             : 
     377           1 : func (i *retryableIter) Value() []byte {
     378           1 :         return i.iter.Value()
     379           1 : }

Generated by: LCOV version 1.14