Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/hex"
11 : "fmt"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/internal/keyspan"
17 : )
18 :
19 : type prefixReplacingIterator struct {
20 : i Iterator
21 : cmp base.Compare
22 : src, dst []byte
23 : arg, arg2 []byte
24 : res InternalKey
25 : err error
26 : }
27 :
28 : var errInputPrefixMismatch = errors.New("key argument does not have prefix required for replacement")
29 : var errOutputPrefixMismatch = errors.New("key returned does not have prefix required for replacement")
30 :
31 : var _ Iterator = (*prefixReplacingIterator)(nil)
32 :
33 : // newPrefixReplacingIterator wraps an iterator over keys that have prefix `src`
34 : // in an iterator that will make them appear to have prefix `dst`. Every key
35 : // passed as an argument to methods on this iterator must have prefix `dst`, and
36 : // every key produced by the underlying iterator must have prefix `src`.
37 : //
38 : // INVARIANT: len(dst) > 0.
39 0 : func newPrefixReplacingIterator(i Iterator, src, dst []byte, cmp base.Compare) Iterator {
40 0 : if invariants.Enabled && len(dst) == 0 {
41 0 : panic("newPrefixReplacingIterator called without synthetic prefix")
42 : }
43 0 : return &prefixReplacingIterator{
44 0 : i: i,
45 0 : cmp: cmp,
46 0 : src: src, dst: dst,
47 0 : arg: append([]byte{}, src...), arg2: append([]byte{}, src...),
48 0 : res: InternalKey{UserKey: append([]byte{}, dst...)},
49 0 : }
50 : }
51 :
52 0 : func (p *prefixReplacingIterator) SetContext(ctx context.Context) {
53 0 : p.i.SetContext(ctx)
54 0 : }
55 :
56 0 : func (p *prefixReplacingIterator) rewriteArg(key []byte) []byte {
57 0 : if !bytes.HasPrefix(key, p.dst) {
58 0 : p.err = errInputPrefixMismatch
59 0 : return key
60 0 : }
61 0 : p.arg = append(p.arg[:len(p.src)], key[len(p.dst):]...)
62 0 : return p.arg
63 : }
64 :
65 0 : func (p *prefixReplacingIterator) rewriteArg2(key []byte) []byte {
66 0 : if !bytes.HasPrefix(key, p.dst) {
67 0 : p.err = errInputPrefixMismatch
68 0 : return key
69 0 : }
70 0 : p.arg2 = append(p.arg2[:len(p.src)], key[len(p.dst):]...)
71 0 : return p.arg2
72 : }
73 :
74 : func (p *prefixReplacingIterator) rewriteResult(
75 : k *InternalKey, v base.LazyValue,
76 0 : ) (*InternalKey, base.LazyValue) {
77 0 : if k == nil {
78 0 : return k, v
79 0 : }
80 0 : if !bytes.HasPrefix(k.UserKey, p.src) {
81 0 : p.err = errOutputPrefixMismatch
82 0 : if invariants.Enabled {
83 0 : panic(p.err)
84 : }
85 0 : return nil, base.LazyValue{}
86 : }
87 0 : p.res.Trailer = k.Trailer
88 0 : p.res.UserKey = append(p.res.UserKey[:len(p.dst)], k.UserKey[len(p.src):]...)
89 0 : return &p.res, v
90 : }
91 :
92 : // SeekGE implements the Iterator interface.
93 : func (p *prefixReplacingIterator) SeekGE(
94 : key []byte, flags base.SeekGEFlags,
95 0 : ) (*InternalKey, base.LazyValue) {
96 0 : return p.rewriteResult(p.i.SeekGE(p.rewriteArg(key), flags))
97 0 : }
98 :
99 : // SeekPrefixGE implements the Iterator interface.
100 : func (p *prefixReplacingIterator) SeekPrefixGE(
101 : prefix, key []byte, flags base.SeekGEFlags,
102 0 : ) (*InternalKey, base.LazyValue) {
103 0 : return p.rewriteResult(p.i.SeekPrefixGE(p.rewriteArg2(prefix), p.rewriteArg(key), flags))
104 0 : }
105 :
106 : // SeekLT implements the Iterator interface.
107 : func (p *prefixReplacingIterator) SeekLT(
108 : key []byte, flags base.SeekLTFlags,
109 0 : ) (*InternalKey, base.LazyValue) {
110 0 : cmp := p.cmp(key, p.dst)
111 0 : if cmp < 0 {
112 0 : // Exhaust the iterator by Prev()ing before the First key.
113 0 : p.i.First()
114 0 : return p.rewriteResult(p.i.Prev())
115 0 : }
116 0 : return p.rewriteResult(p.i.SeekLT(p.rewriteArg(key), flags))
117 : }
118 :
119 : // First implements the Iterator interface.
120 0 : func (p *prefixReplacingIterator) First() (*InternalKey, base.LazyValue) {
121 0 : return p.rewriteResult(p.i.First())
122 0 : }
123 :
124 : // Last implements the Iterator interface.
125 0 : func (p *prefixReplacingIterator) Last() (*InternalKey, base.LazyValue) {
126 0 : return p.rewriteResult(p.i.Last())
127 0 : }
128 :
129 : // Next implements the Iterator interface.
130 0 : func (p *prefixReplacingIterator) Next() (*InternalKey, base.LazyValue) {
131 0 : return p.rewriteResult(p.i.Next())
132 0 : }
133 :
134 : // NextPrefix implements the Iterator interface.
135 0 : func (p *prefixReplacingIterator) NextPrefix(succKey []byte) (*InternalKey, base.LazyValue) {
136 0 : return p.rewriteResult(p.i.NextPrefix(p.rewriteArg(succKey)))
137 0 : }
138 :
139 : // Prev implements the Iterator interface.
140 0 : func (p *prefixReplacingIterator) Prev() (*InternalKey, base.LazyValue) {
141 0 : return p.rewriteResult(p.i.Prev())
142 0 : }
143 :
144 : // Error implements the Iterator interface.
145 0 : func (p *prefixReplacingIterator) Error() error {
146 0 : if p.err != nil {
147 0 : return p.err
148 0 : }
149 0 : return p.i.Error()
150 : }
151 :
152 : // Close implements the Iterator interface.
153 0 : func (p *prefixReplacingIterator) Close() error {
154 0 : return p.i.Close()
155 0 : }
156 :
157 : // SetBounds implements the Iterator interface.
158 0 : func (p *prefixReplacingIterator) SetBounds(lower, upper []byte) {
159 0 : // Check if the underlying iterator requires un-rewritten bounds, i.e. if it
160 0 : // is going to rewrite them itself or pass them to something e.g. vState that
161 0 : // will rewrite them.
162 0 : if x, ok := p.i.(interface{ SetBoundsWithSyntheticPrefix() bool }); ok && x.SetBoundsWithSyntheticPrefix() {
163 0 : p.i.SetBounds(lower, upper)
164 0 : return
165 0 : }
166 0 : if lower != nil {
167 0 : lower = append([]byte{}, p.rewriteArg(lower)...)
168 0 : }
169 0 : if upper != nil {
170 0 : upper = append([]byte{}, p.rewriteArg(upper)...)
171 0 : }
172 0 : p.i.SetBounds(lower, upper)
173 : }
174 :
175 0 : func (p *prefixReplacingIterator) MaybeFilteredKeys() bool {
176 0 : return p.i.MaybeFilteredKeys()
177 0 : }
178 :
179 : // String implements the Iterator interface.
180 0 : func (p *prefixReplacingIterator) String() string {
181 0 : return fmt.Sprintf("%s [%s->%s]", p.i.String(), hex.EncodeToString(p.src), hex.EncodeToString(p.dst))
182 0 : }
183 :
184 0 : func (p *prefixReplacingIterator) SetCloseHook(fn func(i Iterator) error) {
185 0 : p.i.SetCloseHook(fn)
186 0 : }
187 :
188 : type prefixReplacingFragmentIterator struct {
189 : i keyspan.FragmentIterator
190 : src, dst []byte
191 : arg []byte
192 : out1, out2 []byte
193 : }
194 :
195 : // newPrefixReplacingFragmentIterator wraps a FragmentIterator over some reader
196 : // that contains range keys in some key span to make those range keys appear to
197 : // be remapped into some other key-span.
198 : func newPrefixReplacingFragmentIterator(
199 : i keyspan.FragmentIterator, src, dst []byte,
200 0 : ) keyspan.FragmentIterator {
201 0 : return &prefixReplacingFragmentIterator{
202 0 : i: i,
203 0 : src: src, dst: dst,
204 0 : arg: append([]byte{}, src...),
205 0 : out1: append([]byte(nil), dst...),
206 0 : out2: append([]byte(nil), dst...),
207 0 : }
208 0 : }
209 :
210 0 : func (p *prefixReplacingFragmentIterator) rewriteArg(key []byte) ([]byte, error) {
211 0 : if !bytes.HasPrefix(key, p.dst) {
212 0 : return nil, errInputPrefixMismatch
213 0 : }
214 0 : p.arg = append(p.arg[:len(p.src)], key[len(p.dst):]...)
215 0 : return p.arg, nil
216 : }
217 :
218 : func (p *prefixReplacingFragmentIterator) rewriteSpan(
219 : sp *keyspan.Span, err error,
220 0 : ) (*keyspan.Span, error) {
221 0 : if !bytes.HasPrefix(sp.Start, p.src) || !bytes.HasPrefix(sp.End, p.src) {
222 0 : return nil, errInputPrefixMismatch
223 0 : }
224 0 : sp.Start = append(p.out1[:len(p.dst)], sp.Start[len(p.src):]...)
225 0 : sp.End = append(p.out2[:len(p.dst)], sp.End[len(p.src):]...)
226 0 : return sp, nil
227 : }
228 :
229 : // SeekGE implements the FragmentIterator interface.
230 0 : func (p *prefixReplacingFragmentIterator) SeekGE(key []byte) (*keyspan.Span, error) {
231 0 : rewrittenKey, err := p.rewriteArg(key)
232 0 : if err != nil {
233 0 : return nil, err
234 0 : }
235 0 : return p.rewriteSpan(p.i.SeekGE(rewrittenKey))
236 : }
237 :
238 : // SeekLT implements the FragmentIterator interface.
239 0 : func (p *prefixReplacingFragmentIterator) SeekLT(key []byte) (*keyspan.Span, error) {
240 0 : rewrittenKey, err := p.rewriteArg(key)
241 0 : if err != nil {
242 0 : return nil, err
243 0 : }
244 0 : return p.rewriteSpan(p.i.SeekLT(rewrittenKey))
245 : }
246 :
247 : // First implements the FragmentIterator interface.
248 0 : func (p *prefixReplacingFragmentIterator) First() (*keyspan.Span, error) {
249 0 : return p.rewriteSpan(p.i.First())
250 0 : }
251 :
252 : // Last implements the FragmentIterator interface.
253 0 : func (p *prefixReplacingFragmentIterator) Last() (*keyspan.Span, error) {
254 0 : return p.rewriteSpan(p.i.Last())
255 0 : }
256 :
257 : // Close implements the FragmentIterator interface.
258 0 : func (p *prefixReplacingFragmentIterator) Next() (*keyspan.Span, error) {
259 0 : return p.rewriteSpan(p.i.Next())
260 0 : }
261 :
262 : // Prev implements the FragmentIterator interface.
263 0 : func (p *prefixReplacingFragmentIterator) Prev() (*keyspan.Span, error) {
264 0 : return p.rewriteSpan(p.i.Prev())
265 0 : }
266 :
267 : // Close implements the FragmentIterator interface.
268 0 : func (p *prefixReplacingFragmentIterator) Close() error {
269 0 : return p.i.Close()
270 0 : }
271 :
272 : // WrapChildren implements FragmentIterator.
273 0 : func (p *prefixReplacingFragmentIterator) WrapChildren(wrap keyspan.WrapFn) {
274 0 : p.i = wrap(p.i)
275 0 : }
|