Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use 2 : // of this source code is governed by a BSD-style license that can be found in 3 : // the LICENSE file. 4 : 5 : package metamorphic 6 : 7 : import ( 8 : "github.com/cockroachdb/errors" 9 : "github.com/cockroachdb/pebble" 10 : "github.com/cockroachdb/pebble/vfs/errorfs" 11 : ) 12 : 13 : // A RetryPolicy determines what error values should trigger a retry of an 14 : // operation. 15 : type RetryPolicy func(error) bool 16 : 17 : var ( 18 : // NeverRetry implements a RetryPolicy that never retries. 19 2 : NeverRetry = func(error) bool { return false } 20 : // RetryInjected implements a RetryPolicy that retries whenever an 21 : // errorfs.ErrInjected error is returned. 22 0 : RetryInjected RetryPolicy = func(err error) bool { 23 0 : return errors.Is(err, errorfs.ErrInjected) 24 0 : } 25 : ) 26 : 27 2 : func withRetries(fn func() error, retryPolicy RetryPolicy) error { 28 2 : for { 29 2 : if err := fn(); !retryPolicy(err) { 30 2 : return err 31 2 : } 32 : } 33 : } 34 : 35 : // retryableIter holds an iterator and the state necessary to reset it to its 36 : // state after the last successful operation. This allows us to retry failed 37 : // iterator operations by running them again on a non-error iterator with the 38 : // same pre-operation state. 39 : type retryableIter struct { 40 : iter *pebble.Iterator 41 : lastKey []byte 42 : needRetry RetryPolicy 43 : } 44 : 45 2 : func (i *retryableIter) withRetry(fn func()) { 46 2 : for { 47 2 : fn() 48 2 : if !i.needRetry(i.iter.Error()) { 49 2 : break 50 : } 51 0 : for i.needRetry(i.iter.Error()) { 52 0 : i.iter.SeekGE(i.lastKey) 53 0 : } 54 : } 55 : 56 2 : i.lastKey = i.lastKey[:0] 57 2 : if i.iter.Valid() { 58 2 : i.lastKey = append(i.lastKey, i.iter.Key()...) 59 2 : } 60 : } 61 : 62 2 : func (i *retryableIter) Close() error { 63 2 : return i.iter.Close() 64 2 : } 65 : 66 2 : func (i *retryableIter) Error() error { 67 2 : return i.iter.Error() 68 2 : } 69 : 70 2 : func (i *retryableIter) First() bool { 71 2 : var valid bool 72 2 : i.withRetry(func() { 73 2 : valid = i.iter.First() 74 2 : }) 75 2 : return valid 76 : } 77 : 78 2 : func (i *retryableIter) Key() []byte { 79 2 : return i.iter.Key() 80 2 : } 81 : 82 2 : func (i *retryableIter) RangeKeyChanged() bool { 83 2 : return i.iter.RangeKeyChanged() 84 2 : } 85 : 86 2 : func (i *retryableIter) HasPointAndRange() (bool, bool) { 87 2 : return i.iter.HasPointAndRange() 88 2 : } 89 : 90 2 : func (i *retryableIter) RangeBounds() ([]byte, []byte) { 91 2 : return i.iter.RangeBounds() 92 2 : } 93 : 94 2 : func (i *retryableIter) RangeKeys() []pebble.RangeKeyData { 95 2 : return i.iter.RangeKeys() 96 2 : } 97 : 98 2 : func (i *retryableIter) Last() bool { 99 2 : var valid bool 100 2 : i.withRetry(func() { valid = i.iter.Last() }) 101 2 : return valid 102 : } 103 : 104 2 : func (i *retryableIter) Next() bool { 105 2 : var valid bool 106 2 : i.withRetry(func() { 107 2 : valid = i.iter.Next() 108 2 : }) 109 2 : return valid 110 : } 111 : 112 2 : func (i *retryableIter) NextWithLimit(limit []byte) pebble.IterValidityState { 113 2 : var validity pebble.IterValidityState 114 2 : i.withRetry(func() { 115 2 : validity = i.iter.NextWithLimit(limit) 116 2 : }) 117 2 : return validity 118 : 119 : } 120 : 121 2 : func (i *retryableIter) NextPrefix() bool { 122 2 : var valid bool 123 2 : i.withRetry(func() { 124 2 : valid = i.iter.NextPrefix() 125 2 : }) 126 2 : return valid 127 : } 128 : 129 2 : func (i *retryableIter) Prev() bool { 130 2 : var valid bool 131 2 : i.withRetry(func() { 132 2 : valid = i.iter.Prev() 133 2 : }) 134 2 : return valid 135 : } 136 : 137 2 : func (i *retryableIter) PrevWithLimit(limit []byte) pebble.IterValidityState { 138 2 : var validity pebble.IterValidityState 139 2 : i.withRetry(func() { 140 2 : validity = i.iter.PrevWithLimit(limit) 141 2 : }) 142 2 : return validity 143 : } 144 : 145 2 : func (i *retryableIter) SeekGE(key []byte) bool { 146 2 : var valid bool 147 2 : i.withRetry(func() { valid = i.iter.SeekGE(key) }) 148 2 : return valid 149 : } 150 : 151 2 : func (i *retryableIter) SeekGEWithLimit(key []byte, limit []byte) pebble.IterValidityState { 152 2 : var validity pebble.IterValidityState 153 2 : i.withRetry(func() { validity = i.iter.SeekGEWithLimit(key, limit) }) 154 2 : return validity 155 : } 156 : 157 2 : func (i *retryableIter) SeekLT(key []byte) bool { 158 2 : var valid bool 159 2 : i.withRetry(func() { valid = i.iter.SeekLT(key) }) 160 2 : return valid 161 : } 162 : 163 2 : func (i *retryableIter) SeekLTWithLimit(key []byte, limit []byte) pebble.IterValidityState { 164 2 : var validity pebble.IterValidityState 165 2 : i.withRetry(func() { validity = i.iter.SeekLTWithLimit(key, limit) }) 166 2 : return validity 167 : } 168 : 169 2 : func (i *retryableIter) SeekPrefixGE(key []byte) bool { 170 2 : var valid bool 171 2 : i.withRetry(func() { valid = i.iter.SeekPrefixGE(key) }) 172 2 : return valid 173 : } 174 : 175 2 : func (i *retryableIter) SetBounds(lower, upper []byte) { 176 2 : i.iter.SetBounds(lower, upper) 177 2 : } 178 : 179 2 : func (i *retryableIter) SetOptions(opts *pebble.IterOptions) { 180 2 : i.iter.SetOptions(opts) 181 2 : } 182 : 183 0 : func (i *retryableIter) Valid() bool { 184 0 : return i.iter.Valid() 185 0 : } 186 : 187 2 : func (i *retryableIter) Value() []byte { 188 2 : return i.iter.Value() 189 2 : }