Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/hex"
11 : "fmt"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/internal/keyspan"
17 : )
18 :
19 : type prefixReplacingIterator struct {
20 : i Iterator
21 : cmp base.Compare
22 : src, dst []byte
23 : arg, arg2 []byte
24 : res InternalKey
25 : err error
26 : }
27 :
28 : var errInputPrefixMismatch = errors.New("key argument does not have prefix required for replacement")
29 : var errOutputPrefixMismatch = errors.New("key returned does not have prefix required for replacement")
30 :
31 : var _ Iterator = (*prefixReplacingIterator)(nil)
32 :
33 : // newPrefixReplacingIterator wraps an iterator over keys that have prefix `src`
34 : // in an iterator that will make them appear to have prefix `dst`. Every key
35 : // passed as an argument to methods on this iterator must have prefix `dst`, and
36 : // every key produced by the underlying iterator must have prefix `src`.
37 : //
38 : // INVARIANT: len(dst) > 0.
39 1 : func newPrefixReplacingIterator(i Iterator, src, dst []byte, cmp base.Compare) Iterator {
40 1 : if invariants.Enabled && len(dst) == 0 {
41 0 : panic("newPrefixReplacingIterator called without synthetic prefix")
42 : }
43 1 : return &prefixReplacingIterator{
44 1 : i: i,
45 1 : cmp: cmp,
46 1 : src: src, dst: dst,
47 1 : arg: append([]byte{}, src...), arg2: append([]byte{}, src...),
48 1 : res: InternalKey{UserKey: append([]byte{}, dst...)},
49 1 : }
50 : }
51 :
52 1 : func (p *prefixReplacingIterator) SetContext(ctx context.Context) {
53 1 : p.i.SetContext(ctx)
54 1 : }
55 :
56 1 : func (p *prefixReplacingIterator) rewriteArg(key []byte) []byte {
57 1 : if !bytes.HasPrefix(key, p.dst) {
58 1 : p.err = errInputPrefixMismatch
59 1 : return key
60 1 : }
61 1 : p.arg = append(p.arg[:len(p.src)], key[len(p.dst):]...)
62 1 : return p.arg
63 : }
64 :
65 1 : func (p *prefixReplacingIterator) rewriteArg2(key []byte) []byte {
66 1 : if !bytes.HasPrefix(key, p.dst) {
67 0 : p.err = errInputPrefixMismatch
68 0 : return key
69 0 : }
70 1 : p.arg2 = append(p.arg2[:len(p.src)], key[len(p.dst):]...)
71 1 : return p.arg2
72 : }
73 :
74 : func (p *prefixReplacingIterator) rewriteResult(
75 : k *InternalKey, v base.LazyValue,
76 1 : ) (*InternalKey, base.LazyValue) {
77 1 : if k == nil {
78 1 : return k, v
79 1 : }
80 1 : if !bytes.HasPrefix(k.UserKey, p.src) {
81 0 : p.err = errOutputPrefixMismatch
82 0 : if invariants.Enabled {
83 0 : panic(p.err)
84 : }
85 0 : return nil, base.LazyValue{}
86 : }
87 1 : p.res.Trailer = k.Trailer
88 1 : p.res.UserKey = append(p.res.UserKey[:len(p.dst)], k.UserKey[len(p.src):]...)
89 1 : return &p.res, v
90 : }
91 :
92 : // SeekGE implements the Iterator interface.
93 : func (p *prefixReplacingIterator) SeekGE(
94 : key []byte, flags base.SeekGEFlags,
95 1 : ) (*InternalKey, base.LazyValue) {
96 1 : return p.rewriteResult(p.i.SeekGE(p.rewriteArg(key), flags))
97 1 : }
98 :
99 : // SeekPrefixGE implements the Iterator interface.
100 : func (p *prefixReplacingIterator) SeekPrefixGE(
101 : prefix, key []byte, flags base.SeekGEFlags,
102 1 : ) (*InternalKey, base.LazyValue) {
103 1 : return p.rewriteResult(p.i.SeekPrefixGE(p.rewriteArg2(prefix), p.rewriteArg(key), flags))
104 1 : }
105 :
106 : // SeekLT implements the Iterator interface.
107 : func (p *prefixReplacingIterator) SeekLT(
108 : key []byte, flags base.SeekLTFlags,
109 1 : ) (*InternalKey, base.LazyValue) {
110 1 : cmp := p.cmp(key, p.dst)
111 1 : if cmp < 0 {
112 1 : // Exhaust the iterator by Prev()ing before the First key.
113 1 : p.i.First()
114 1 : return p.rewriteResult(p.i.Prev())
115 1 : }
116 1 : return p.rewriteResult(p.i.SeekLT(p.rewriteArg(key), flags))
117 : }
118 :
119 : // First implements the Iterator interface.
120 1 : func (p *prefixReplacingIterator) First() (*InternalKey, base.LazyValue) {
121 1 : return p.rewriteResult(p.i.First())
122 1 : }
123 :
124 : // Last implements the Iterator interface.
125 1 : func (p *prefixReplacingIterator) Last() (*InternalKey, base.LazyValue) {
126 1 : return p.rewriteResult(p.i.Last())
127 1 : }
128 :
129 : // Next implements the Iterator interface.
130 1 : func (p *prefixReplacingIterator) Next() (*InternalKey, base.LazyValue) {
131 1 : return p.rewriteResult(p.i.Next())
132 1 : }
133 :
134 : // NextPrefix implements the Iterator interface.
135 0 : func (p *prefixReplacingIterator) NextPrefix(succKey []byte) (*InternalKey, base.LazyValue) {
136 0 : return p.rewriteResult(p.i.NextPrefix(p.rewriteArg(succKey)))
137 0 : }
138 :
139 : // Prev implements the Iterator interface.
140 1 : func (p *prefixReplacingIterator) Prev() (*InternalKey, base.LazyValue) {
141 1 : return p.rewriteResult(p.i.Prev())
142 1 : }
143 :
144 : // Error implements the Iterator interface.
145 1 : func (p *prefixReplacingIterator) Error() error {
146 1 : if p.err != nil {
147 0 : return p.err
148 0 : }
149 1 : return p.i.Error()
150 : }
151 :
152 : // Close implements the Iterator interface.
153 1 : func (p *prefixReplacingIterator) Close() error {
154 1 : return p.i.Close()
155 1 : }
156 :
157 : // SetBounds implements the Iterator interface.
158 1 : func (p *prefixReplacingIterator) SetBounds(lower, upper []byte) {
159 1 : // Check if the underlying iterator requires un-rewritten bounds, i.e. if it
160 1 : // is going to rewrite them itself or pass them to something e.g. vState that
161 1 : // will rewrite them.
162 1 : if x, ok := p.i.(interface{ SetBoundsWithSyntheticPrefix() bool }); ok && x.SetBoundsWithSyntheticPrefix() {
163 0 : p.i.SetBounds(lower, upper)
164 0 : return
165 0 : }
166 1 : if lower != nil {
167 1 : lower = append([]byte{}, p.rewriteArg(lower)...)
168 1 : }
169 1 : if upper != nil {
170 1 : upper = append([]byte{}, p.rewriteArg(upper)...)
171 1 : }
172 1 : p.i.SetBounds(lower, upper)
173 : }
174 :
175 0 : func (p *prefixReplacingIterator) MaybeFilteredKeys() bool {
176 0 : return p.i.MaybeFilteredKeys()
177 0 : }
178 :
179 : // String implements the Iterator interface.
180 1 : func (p *prefixReplacingIterator) String() string {
181 1 : return fmt.Sprintf("%s [%s->%s]", p.i.String(), hex.EncodeToString(p.src), hex.EncodeToString(p.dst))
182 1 : }
183 :
184 1 : func (p *prefixReplacingIterator) SetCloseHook(fn func(i Iterator) error) {
185 1 : p.i.SetCloseHook(fn)
186 1 : }
187 :
188 : type prefixReplacingFragmentIterator struct {
189 : i keyspan.FragmentIterator
190 : src, dst []byte
191 : arg []byte
192 : out1, out2 []byte
193 : }
194 :
195 : // newPrefixReplacingFragmentIterator wraps a FragmentIterator over some reader
196 : // that contains range keys in some key span to make those range keys appear to
197 : // be remapped into some other key-span.
198 : func newPrefixReplacingFragmentIterator(
199 : i keyspan.FragmentIterator, src, dst []byte,
200 0 : ) keyspan.FragmentIterator {
201 0 : return &prefixReplacingFragmentIterator{
202 0 : i: i,
203 0 : src: src, dst: dst,
204 0 : arg: append([]byte{}, src...),
205 0 : out1: append([]byte(nil), dst...),
206 0 : out2: append([]byte(nil), dst...),
207 0 : }
208 0 : }
209 :
210 0 : func (p *prefixReplacingFragmentIterator) rewriteArg(key []byte) ([]byte, error) {
211 0 : if !bytes.HasPrefix(key, p.dst) {
212 0 : return nil, errInputPrefixMismatch
213 0 : }
214 0 : p.arg = append(p.arg[:len(p.src)], key[len(p.dst):]...)
215 0 : return p.arg, nil
216 : }
217 :
218 : func (p *prefixReplacingFragmentIterator) rewriteSpan(
219 : sp *keyspan.Span, err error,
220 0 : ) (*keyspan.Span, error) {
221 0 : if !bytes.HasPrefix(sp.Start, p.src) || !bytes.HasPrefix(sp.End, p.src) {
222 0 : return nil, errInputPrefixMismatch
223 0 : }
224 0 : sp.Start = append(p.out1[:len(p.dst)], sp.Start[len(p.src):]...)
225 0 : sp.End = append(p.out2[:len(p.dst)], sp.End[len(p.src):]...)
226 0 : return sp, nil
227 : }
228 :
229 : // SeekGE implements the FragmentIterator interface.
230 0 : func (p *prefixReplacingFragmentIterator) SeekGE(key []byte) (*keyspan.Span, error) {
231 0 : rewrittenKey, err := p.rewriteArg(key)
232 0 : if err != nil {
233 0 : return nil, err
234 0 : }
235 0 : return p.rewriteSpan(p.i.SeekGE(rewrittenKey))
236 : }
237 :
238 : // SeekLT implements the FragmentIterator interface.
239 0 : func (p *prefixReplacingFragmentIterator) SeekLT(key []byte) (*keyspan.Span, error) {
240 0 : rewrittenKey, err := p.rewriteArg(key)
241 0 : if err != nil {
242 0 : return nil, err
243 0 : }
244 0 : return p.rewriteSpan(p.i.SeekLT(rewrittenKey))
245 : }
246 :
247 : // First implements the FragmentIterator interface.
248 0 : func (p *prefixReplacingFragmentIterator) First() (*keyspan.Span, error) {
249 0 : return p.rewriteSpan(p.i.First())
250 0 : }
251 :
252 : // Last implements the FragmentIterator interface.
253 0 : func (p *prefixReplacingFragmentIterator) Last() (*keyspan.Span, error) {
254 0 : return p.rewriteSpan(p.i.Last())
255 0 : }
256 :
257 : // Close implements the FragmentIterator interface.
258 0 : func (p *prefixReplacingFragmentIterator) Next() (*keyspan.Span, error) {
259 0 : return p.rewriteSpan(p.i.Next())
260 0 : }
261 :
262 : // Prev implements the FragmentIterator interface.
263 0 : func (p *prefixReplacingFragmentIterator) Prev() (*keyspan.Span, error) {
264 0 : return p.rewriteSpan(p.i.Prev())
265 0 : }
266 :
267 : // Close implements the FragmentIterator interface.
268 0 : func (p *prefixReplacingFragmentIterator) Close() error {
269 0 : return p.i.Close()
270 0 : }
271 :
272 : // WrapChildren implements FragmentIterator.
273 0 : func (p *prefixReplacingFragmentIterator) WrapChildren(wrap keyspan.WrapFn) {
274 0 : p.i = wrap(p.i)
275 0 : }
|