Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/hex"
11 : "fmt"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/internal/keyspan"
17 : )
18 :
19 : type prefixReplacingIterator struct {
20 : i Iterator
21 : cmp base.Compare
22 : src, dst []byte
23 : arg, arg2 []byte
24 : res InternalKey
25 : err error
26 : }
27 :
28 : var errInputPrefixMismatch = errors.New("key argument does not have prefix required for replacement")
29 : var errOutputPrefixMismatch = errors.New("key returned does not have prefix required for replacement")
30 :
31 : var _ Iterator = (*prefixReplacingIterator)(nil)
32 :
33 : // newPrefixReplacingIterator wraps an iterator over keys that have prefix `src`
34 : // in an iterator that will make them appear to have prefix `dst`. Every key
35 : // passed as an argument to methods on this iterator must have prefix `dst`, and
36 : // every key produced by the underlying iterator must have prefix `src`.
37 : //
38 : // INVARIANT: len(dst) > 0.
39 0 : func newPrefixReplacingIterator(i Iterator, src, dst []byte, cmp base.Compare) Iterator {
40 0 : if invariants.Enabled && len(dst) == 0 {
41 0 : panic("newPrefixReplacingIterator called without synthetic prefix")
42 : }
43 0 : return &prefixReplacingIterator{
44 0 : i: i,
45 0 : cmp: cmp,
46 0 : src: src, dst: dst,
47 0 : arg: append([]byte{}, src...), arg2: append([]byte{}, src...),
48 0 : res: InternalKey{UserKey: append([]byte{}, dst...)},
49 0 : }
50 : }
51 :
52 0 : func (p *prefixReplacingIterator) SetContext(ctx context.Context) {
53 0 : p.i.SetContext(ctx)
54 0 : }
55 :
56 0 : func (p *prefixReplacingIterator) rewriteArg(key []byte) []byte {
57 0 : if !bytes.HasPrefix(key, p.dst) {
58 0 : p.err = errInputPrefixMismatch
59 0 : return key
60 0 : }
61 0 : p.arg = append(p.arg[:len(p.src)], key[len(p.dst):]...)
62 0 : return p.arg
63 : }
64 :
65 0 : func (p *prefixReplacingIterator) rewriteArg2(key []byte) []byte {
66 0 : if !bytes.HasPrefix(key, p.dst) {
67 0 : p.err = errInputPrefixMismatch
68 0 : return key
69 0 : }
70 0 : p.arg2 = append(p.arg2[:len(p.src)], key[len(p.dst):]...)
71 0 : return p.arg2
72 : }
73 :
74 : func (p *prefixReplacingIterator) rewriteResult(
75 : k *InternalKey, v base.LazyValue,
76 0 : ) (*InternalKey, base.LazyValue) {
77 0 : if k == nil {
78 0 : return k, v
79 0 : }
80 0 : if !bytes.HasPrefix(k.UserKey, p.src) {
81 0 : p.err = errOutputPrefixMismatch
82 0 : if invariants.Enabled {
83 0 : panic(p.err)
84 : }
85 0 : return nil, base.LazyValue{}
86 : }
87 0 : p.res.Trailer = k.Trailer
88 0 : p.res.UserKey = append(p.res.UserKey[:len(p.dst)], k.UserKey[len(p.src):]...)
89 0 : return &p.res, v
90 : }
91 :
92 : // SeekGE implements the Iterator interface.
93 : func (p *prefixReplacingIterator) SeekGE(
94 : key []byte, flags base.SeekGEFlags,
95 0 : ) (*InternalKey, base.LazyValue) {
96 0 : return p.rewriteResult(p.i.SeekGE(p.rewriteArg(key), flags))
97 0 : }
98 :
99 : // SeekPrefixGE implements the Iterator interface.
100 : func (p *prefixReplacingIterator) SeekPrefixGE(
101 : prefix, key []byte, flags base.SeekGEFlags,
102 0 : ) (*InternalKey, base.LazyValue) {
103 0 : return p.rewriteResult(p.i.SeekPrefixGE(p.rewriteArg2(prefix), p.rewriteArg(key), flags))
104 0 : }
105 :
106 : // SeekLT implements the Iterator interface.
107 : func (p *prefixReplacingIterator) SeekLT(
108 : key []byte, flags base.SeekLTFlags,
109 0 : ) (*InternalKey, base.LazyValue) {
110 0 : cmp := p.cmp(key, p.dst)
111 0 : if cmp < 0 {
112 0 : // Exhaust the iterator by Prev()ing before the First key.
113 0 : p.i.First()
114 0 : return p.rewriteResult(p.i.Prev())
115 0 : }
116 0 : return p.rewriteResult(p.i.SeekLT(p.rewriteArg(key), flags))
117 : }
118 :
119 : // First implements the Iterator interface.
120 0 : func (p *prefixReplacingIterator) First() (*InternalKey, base.LazyValue) {
121 0 : return p.rewriteResult(p.i.First())
122 0 : }
123 :
124 : // Last implements the Iterator interface.
125 0 : func (p *prefixReplacingIterator) Last() (*InternalKey, base.LazyValue) {
126 0 : return p.rewriteResult(p.i.Last())
127 0 : }
128 :
129 : // Next implements the Iterator interface.
130 0 : func (p *prefixReplacingIterator) Next() (*InternalKey, base.LazyValue) {
131 0 : return p.rewriteResult(p.i.Next())
132 0 : }
133 :
134 : // NextPrefix implements the Iterator interface.
135 0 : func (p *prefixReplacingIterator) NextPrefix(succKey []byte) (*InternalKey, base.LazyValue) {
136 0 : return p.rewriteResult(p.i.NextPrefix(p.rewriteArg(succKey)))
137 0 : }
138 :
139 : // Prev implements the Iterator interface.
140 0 : func (p *prefixReplacingIterator) Prev() (*InternalKey, base.LazyValue) {
141 0 : return p.rewriteResult(p.i.Prev())
142 0 : }
143 :
144 : // Error implements the Iterator interface.
145 0 : func (p *prefixReplacingIterator) Error() error {
146 0 : if p.err != nil {
147 0 : return p.err
148 0 : }
149 0 : return p.i.Error()
150 : }
151 :
152 : // Close implements the Iterator interface.
153 0 : func (p *prefixReplacingIterator) Close() error {
154 0 : return p.i.Close()
155 0 : }
156 :
157 : // SetBounds implements the Iterator interface.
158 0 : func (p *prefixReplacingIterator) SetBounds(lower, upper []byte) {
159 0 : // Check if the underlying iterator requires un-rewritten bounds, i.e. if it
160 0 : // is going to rewrite them itself or pass them to something e.g. vState that
161 0 : // will rewrite them.
162 0 : if x, ok := p.i.(interface{ SetBoundsWithSyntheticPrefix() bool }); ok && x.SetBoundsWithSyntheticPrefix() {
163 0 : p.i.SetBounds(lower, upper)
164 0 : return
165 0 : }
166 0 : p.i.SetBounds(p.rewriteArg(lower), p.rewriteArg2(upper))
167 : }
168 :
169 0 : func (p *prefixReplacingIterator) MaybeFilteredKeys() bool {
170 0 : return p.i.MaybeFilteredKeys()
171 0 : }
172 :
173 : // String implements the Iterator interface.
174 0 : func (p *prefixReplacingIterator) String() string {
175 0 : return fmt.Sprintf("%s [%s->%s]", p.i.String(), hex.EncodeToString(p.src), hex.EncodeToString(p.dst))
176 0 : }
177 :
178 0 : func (p *prefixReplacingIterator) SetCloseHook(fn func(i Iterator) error) {
179 0 : p.i.SetCloseHook(fn)
180 0 : }
181 :
182 : type prefixReplacingFragmentIterator struct {
183 : i keyspan.FragmentIterator
184 : err error
185 : src, dst []byte
186 : arg []byte
187 : out1, out2 []byte
188 : }
189 :
190 : // newPrefixReplacingFragmentIterator wraps a FragmentIterator over some reader
191 : // that contains range keys in some key span to make those range keys appear to
192 : // be remapped into some other key-span.
193 : func newPrefixReplacingFragmentIterator(
194 : i keyspan.FragmentIterator, src, dst []byte,
195 0 : ) keyspan.FragmentIterator {
196 0 : return &prefixReplacingFragmentIterator{
197 0 : i: i,
198 0 : src: src, dst: dst,
199 0 : arg: append([]byte{}, src...),
200 0 : out1: append([]byte(nil), dst...),
201 0 : out2: append([]byte(nil), dst...),
202 0 : }
203 0 : }
204 :
205 0 : func (p *prefixReplacingFragmentIterator) rewriteArg(key []byte) []byte {
206 0 : if !bytes.HasPrefix(key, p.dst) {
207 0 : p.err = errInputPrefixMismatch
208 0 : return key
209 0 : }
210 0 : p.arg = append(p.arg[:len(p.src)], key[len(p.dst):]...)
211 0 : return p.arg
212 : }
213 :
214 0 : func (p *prefixReplacingFragmentIterator) rewriteSpan(sp *keyspan.Span) *keyspan.Span {
215 0 : if !bytes.HasPrefix(sp.Start, p.src) || !bytes.HasPrefix(sp.End, p.src) {
216 0 : p.err = errInputPrefixMismatch
217 0 : return sp
218 0 : }
219 0 : sp.Start = append(p.out1[:len(p.dst)], sp.Start[len(p.src):]...)
220 0 : sp.End = append(p.out2[:len(p.dst)], sp.End[len(p.src):]...)
221 0 : return sp
222 : }
223 :
224 : // SeekGE implements the FragmentIterator interface.
225 0 : func (p *prefixReplacingFragmentIterator) SeekGE(key []byte) *keyspan.Span {
226 0 : return p.rewriteSpan(p.i.SeekGE(p.rewriteArg(key)))
227 0 : }
228 :
229 : // SeekLT implements the FragmentIterator interface.
230 0 : func (p *prefixReplacingFragmentIterator) SeekLT(key []byte) *keyspan.Span {
231 0 : return p.rewriteSpan(p.i.SeekLT(p.rewriteArg(key)))
232 0 : }
233 :
234 : // First implements the FragmentIterator interface.
235 0 : func (p *prefixReplacingFragmentIterator) First() *keyspan.Span {
236 0 : return p.rewriteSpan(p.i.First())
237 0 : }
238 :
239 : // Last implements the FragmentIterator interface.
240 0 : func (p *prefixReplacingFragmentIterator) Last() *keyspan.Span {
241 0 : return p.rewriteSpan(p.i.Last())
242 0 : }
243 :
244 : // Close implements the FragmentIterator interface.
245 0 : func (p *prefixReplacingFragmentIterator) Next() *keyspan.Span {
246 0 : return p.rewriteSpan(p.i.Next())
247 0 : }
248 :
249 : // Prev implements the FragmentIterator interface.
250 0 : func (p *prefixReplacingFragmentIterator) Prev() *keyspan.Span {
251 0 : return p.rewriteSpan(p.i.Prev())
252 0 : }
253 :
254 : // Error implements the FragmentIterator interface.
255 0 : func (p *prefixReplacingFragmentIterator) Error() error {
256 0 : if p.err != nil {
257 0 : return p.err
258 0 : }
259 0 : return p.i.Error()
260 : }
261 :
262 : // Close implements the FragmentIterator interface.
263 0 : func (p *prefixReplacingFragmentIterator) Close() error {
264 0 : return p.i.Close()
265 0 : }
|