Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "context"
9 :
10 : "github.com/cockroachdb/pebble/internal/base"
11 : "github.com/cockroachdb/pebble/internal/keyspan"
12 : "github.com/cockroachdb/pebble/internal/rangekey"
13 : )
14 :
15 : // VirtualReader wraps Reader. Its purpose is to restrict functionality of the
16 : // Reader which should be inaccessible to virtual sstables, and enforce bounds
17 : // invariants associated with virtual sstables. All reads on virtual sstables
18 : // should go through a VirtualReader.
19 : //
20 : // INVARIANT: Any iterators created through a virtual reader will guarantee that
21 : // they don't expose keys outside the virtual sstable bounds.
22 : type VirtualReader struct {
23 : vState virtualState
24 : reader *Reader
25 : Properties CommonProperties
26 : }
27 :
28 : var _ CommonReader = (*VirtualReader)(nil)
29 :
30 : // Lightweight virtual sstable state which can be passed to sstable iterators.
31 : type virtualState struct {
32 : lower InternalKey
33 : upper InternalKey
34 : fileNum base.FileNum
35 : Compare Compare
36 : isSharedIngested bool
37 : prefixChange *PrefixReplacement
38 : }
39 :
40 : // VirtualReaderParams are the parameters necessary to create a VirtualReader.
41 : type VirtualReaderParams struct {
42 : Lower InternalKey
43 : Upper InternalKey
44 : FileNum base.FileNum
45 : IsSharedIngested bool
46 : // Size is an estimate of the size of the [Lower, Upper) section of the table.
47 : Size uint64
48 : // BackingSize is the total size of the backing table. The ratio between Size
49 : // and BackingSize is used to estimate statistics.
50 : BackingSize uint64
51 : // TODO(radu): these should be moved to sstable.IterTransforms.
52 : PrefixReplacement *PrefixReplacement
53 : }
54 :
55 : // MakeVirtualReader is used to contruct a reader which can read from virtual
56 : // sstables.
57 1 : func MakeVirtualReader(reader *Reader, p VirtualReaderParams) VirtualReader {
58 1 : vState := virtualState{
59 1 : lower: p.Lower,
60 1 : upper: p.Upper,
61 1 : fileNum: p.FileNum,
62 1 : Compare: reader.Compare,
63 1 : isSharedIngested: p.IsSharedIngested,
64 1 : prefixChange: p.PrefixReplacement,
65 1 : }
66 1 : v := VirtualReader{
67 1 : vState: vState,
68 1 : reader: reader,
69 1 : }
70 1 :
71 1 : // Scales the given value by the (Size / BackingSize) ratio, rounding up.
72 1 : scale := func(a uint64) uint64 {
73 1 : return (a*p.Size + p.BackingSize - 1) / p.BackingSize
74 1 : }
75 :
76 1 : v.Properties.RawKeySize = scale(reader.Properties.RawKeySize)
77 1 : v.Properties.RawValueSize = scale(reader.Properties.RawValueSize)
78 1 : v.Properties.NumEntries = scale(reader.Properties.NumEntries)
79 1 : v.Properties.NumDeletions = scale(reader.Properties.NumDeletions)
80 1 : v.Properties.NumRangeDeletions = scale(reader.Properties.NumRangeDeletions)
81 1 : v.Properties.NumRangeKeyDels = scale(reader.Properties.NumRangeKeyDels)
82 1 :
83 1 : // Note that we rely on NumRangeKeySets for correctness. If the sstable may
84 1 : // contain range keys, then NumRangeKeySets must be > 0. ceilDiv works because
85 1 : // meta.Size will not be 0 for virtual sstables.
86 1 : v.Properties.NumRangeKeySets = scale(reader.Properties.NumRangeKeySets)
87 1 : v.Properties.ValueBlocksSize = scale(reader.Properties.ValueBlocksSize)
88 1 : v.Properties.NumSizedDeletions = scale(reader.Properties.NumSizedDeletions)
89 1 : v.Properties.RawPointTombstoneKeySize = scale(reader.Properties.RawPointTombstoneKeySize)
90 1 : v.Properties.RawPointTombstoneValueSize = scale(reader.Properties.RawPointTombstoneValueSize)
91 1 :
92 1 : return v
93 : }
94 :
95 : // NewCompactionIter is the compaction iterator function for virtual readers.
96 : func (v *VirtualReader) NewCompactionIter(
97 : transforms IterTransforms,
98 : bytesIterated *uint64,
99 : categoryAndQoS CategoryAndQoS,
100 : statsCollector *CategoryStatsCollector,
101 : rp ReaderProvider,
102 : bufferPool *BufferPool,
103 1 : ) (Iterator, error) {
104 1 : i, err := v.reader.newCompactionIter(
105 1 : transforms, bytesIterated, categoryAndQoS, statsCollector, rp, &v.vState, bufferPool)
106 1 : if err == nil && v.vState.prefixChange.UsePrefixReplacementIterator() {
107 0 : i = newPrefixReplacingIterator(
108 0 : i, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix,
109 0 : v.vState.lower.UserKey, v.reader.Compare,
110 0 : )
111 0 : }
112 1 : return i, err
113 : }
114 :
115 : // NewIterWithBlockPropertyFiltersAndContextEtc wraps
116 : // Reader.NewIterWithBlockPropertyFiltersAndContext. We assume that the passed
117 : // in [lower, upper) bounds will have at least some overlap with the virtual
118 : // sstable bounds. No overlap is not currently supported in the iterator.
119 : func (v *VirtualReader) NewIterWithBlockPropertyFiltersAndContextEtc(
120 : ctx context.Context,
121 : transforms IterTransforms,
122 : lower, upper []byte,
123 : filterer *BlockPropertiesFilterer,
124 : useFilterBlock bool,
125 : stats *base.InternalIteratorStats,
126 : categoryAndQoS CategoryAndQoS,
127 : statsCollector *CategoryStatsCollector,
128 : rp ReaderProvider,
129 1 : ) (Iterator, error) {
130 1 : i, err := v.reader.newIterWithBlockPropertyFiltersAndContext(
131 1 : ctx, transforms, lower, upper, filterer, useFilterBlock,
132 1 : stats, categoryAndQoS, statsCollector, rp, &v.vState)
133 1 : // NB: for block level prefix replacement,
134 1 : if err == nil && v.vState.prefixChange.UsePrefixReplacementIterator() {
135 0 : i = newPrefixReplacingIterator(
136 0 : i, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix,
137 0 : v.vState.lower.UserKey, v.reader.Compare,
138 0 : )
139 0 : }
140 1 : return i, err
141 : }
142 :
143 : // ValidateBlockChecksumsOnBacking will call ValidateBlockChecksumsOnBacking on the underlying reader.
144 : // Note that block checksum validation is NOT restricted to virtual sstable bounds.
145 1 : func (v *VirtualReader) ValidateBlockChecksumsOnBacking() error {
146 1 : return v.reader.ValidateBlockChecksums()
147 1 : }
148 :
149 : // NewRawRangeDelIter wraps Reader.NewRawRangeDelIter.
150 : func (v *VirtualReader) NewRawRangeDelIter(
151 : transforms IterTransforms,
152 1 : ) (keyspan.FragmentIterator, error) {
153 1 : iter, err := v.reader.NewRawRangeDelIter(transforms)
154 1 : if err != nil {
155 0 : return nil, err
156 0 : }
157 1 : if iter == nil {
158 1 : return nil, nil
159 1 : }
160 1 : lower := &v.vState.lower
161 1 : upper := &v.vState.upper
162 1 :
163 1 : if v.vState.prefixChange.UsePrefixReplacementIterator() {
164 0 : lower = &InternalKey{UserKey: v.vState.prefixChange.Invert(lower.UserKey), Trailer: lower.Trailer}
165 0 : upper = &InternalKey{UserKey: v.vState.prefixChange.Invert(upper.UserKey), Trailer: upper.Trailer}
166 0 :
167 0 : iter = keyspan.Truncate(
168 0 : v.reader.Compare, iter, lower.UserKey, upper.UserKey,
169 0 : lower, upper, !v.vState.upper.IsExclusiveSentinel(), /* panicOnUpperTruncate */
170 0 : )
171 0 : return newPrefixReplacingFragmentIterator(
172 0 : iter, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix,
173 0 : v.vState.lower.UserKey, v.reader.Compare,
174 0 : ), nil
175 0 : }
176 :
177 : // Truncation of spans isn't allowed at a user key that also contains points
178 : // in the same virtual sstable, as it would lead to covered points getting
179 : // uncovered. Set panicOnUpperTruncate to true if the file's upper bound
180 : // is not an exclusive sentinel.
181 : //
182 : // As an example, if an sstable contains a rangedel a-c and point keys at
183 : // a.SET.2 and b.SET.3, the file bounds [a#2,SET-b#RANGEDELSENTINEL] are
184 : // allowed (as they exclude b.SET.3), or [a#2,SET-c#RANGEDELSENTINEL] (as it
185 : // includes both point keys), but not [a#2,SET-b#3,SET] (as it would truncate
186 : // the rangedel at b and lead to the point being uncovered).
187 1 : return keyspan.Truncate(
188 1 : v.reader.Compare, iter, lower.UserKey, upper.UserKey,
189 1 : lower, upper, !v.vState.upper.IsExclusiveSentinel(), /* panicOnUpperTruncate */
190 1 : ), nil
191 : }
192 :
193 : // NewRawRangeKeyIter wraps Reader.NewRawRangeKeyIter.
194 : func (v *VirtualReader) NewRawRangeKeyIter(
195 : transforms IterTransforms,
196 1 : ) (keyspan.FragmentIterator, error) {
197 1 : syntheticSeqNum := transforms.SyntheticSeqNum
198 1 : if v.vState.isSharedIngested {
199 1 : // Don't pass a synthetic sequence number for shared ingested sstables. We
200 1 : // need to know the materialized sequence numbers, and we will set up the
201 1 : // appropriate sequence number substitution below.
202 1 : transforms.SyntheticSeqNum = 0
203 1 : }
204 1 : iter, err := v.reader.NewRawRangeKeyIter(transforms)
205 1 : if err != nil {
206 0 : return nil, err
207 0 : }
208 1 : if iter == nil {
209 1 : return nil, nil
210 1 : }
211 1 : lower := &v.vState.lower
212 1 : upper := &v.vState.upper
213 1 :
214 1 : if v.vState.isSharedIngested {
215 1 : // We need to coalesce range keys within each sstable, and then apply the
216 1 : // synthetic sequence number. For this, we use ForeignSSTTransformer.
217 1 : //
218 1 : // TODO(bilal): Avoid these allocations by hoisting the transformer and
219 1 : // transform iter into VirtualReader.
220 1 : transform := &rangekey.ForeignSSTTransformer{
221 1 : Equal: v.reader.Equal,
222 1 : SeqNum: uint64(syntheticSeqNum),
223 1 : }
224 1 : transformIter := &keyspan.TransformerIter{
225 1 : FragmentIterator: iter,
226 1 : Transformer: transform,
227 1 : Compare: v.reader.Compare,
228 1 : }
229 1 : iter = transformIter
230 1 : }
231 :
232 1 : if v.vState.prefixChange.UsePrefixReplacementIterator() {
233 0 : lower = &InternalKey{UserKey: v.vState.prefixChange.Invert(lower.UserKey), Trailer: lower.Trailer}
234 0 : upper = &InternalKey{UserKey: v.vState.prefixChange.Invert(upper.UserKey), Trailer: upper.Trailer}
235 0 : iter = keyspan.Truncate(
236 0 : v.reader.Compare, iter, lower.UserKey, upper.UserKey,
237 0 : lower, upper, !v.vState.upper.IsExclusiveSentinel(), /* panicOnUpperTruncate */
238 0 : )
239 0 : return newPrefixReplacingFragmentIterator(
240 0 : iter, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix,
241 0 : v.vState.lower.UserKey, v.reader.Compare,
242 0 : ), nil
243 0 : }
244 :
245 : // Truncation of spans isn't allowed at a user key that also contains points
246 : // in the same virtual sstable, as it would lead to covered points getting
247 : // uncovered. Set panicOnUpperTruncate to true if the file's upper bound
248 : // is not an exclusive sentinel.
249 : //
250 : // As an example, if an sstable contains a range key a-c and point keys at
251 : // a.SET.2 and b.SET.3, the file bounds [a#2,SET-b#RANGEKEYSENTINEL] are
252 : // allowed (as they exclude b.SET.3), or [a#2,SET-c#RANGEKEYSENTINEL] (as it
253 : // includes both point keys), but not [a#2,SET-b#3,SET] (as it would truncate
254 : // the range key at b and lead to the point being uncovered).
255 1 : return keyspan.Truncate(
256 1 : v.reader.Compare, iter, lower.UserKey, upper.UserKey,
257 1 : lower, upper, !v.vState.upper.IsExclusiveSentinel(), /* panicOnUpperTruncate */
258 1 : ), nil
259 : }
260 :
261 : // Constrain bounds will narrow the start, end bounds if they do not fit within
262 : // the virtual sstable. The function will return if the new end key is
263 : // inclusive.
264 : func (v *virtualState) constrainBounds(
265 : start, end []byte, endInclusive bool,
266 1 : ) (lastKeyInclusive bool, first []byte, last []byte) {
267 1 : first = start
268 1 : if start == nil || v.Compare(start, v.lower.UserKey) < 0 {
269 1 : first = v.lower.UserKey
270 1 : }
271 :
272 : // Note that we assume that start, end has some overlap with the virtual
273 : // sstable bounds.
274 1 : last = v.upper.UserKey
275 1 : lastKeyInclusive = !v.upper.IsExclusiveSentinel()
276 1 : if end != nil {
277 1 : cmp := v.Compare(end, v.upper.UserKey)
278 1 : switch {
279 1 : case cmp == 0:
280 1 : lastKeyInclusive = !v.upper.IsExclusiveSentinel() && endInclusive
281 1 : last = v.upper.UserKey
282 1 : case cmp > 0:
283 1 : lastKeyInclusive = !v.upper.IsExclusiveSentinel()
284 1 : last = v.upper.UserKey
285 1 : default:
286 1 : lastKeyInclusive = endInclusive
287 1 : last = end
288 : }
289 : }
290 1 : if v.prefixChange.UsePrefixReplacementIterator() {
291 0 : first = v.prefixChange.Invert(first)
292 0 : last = v.prefixChange.Invert(last)
293 0 : }
294 : // TODO(bananabrick): What if someone passes in bounds completely outside of
295 : // virtual sstable bounds?
296 1 : return lastKeyInclusive, first, last
297 : }
298 :
299 : // EstimateDiskUsage just calls VirtualReader.reader.EstimateDiskUsage after
300 : // enforcing the virtual sstable bounds.
301 1 : func (v *VirtualReader) EstimateDiskUsage(start, end []byte) (uint64, error) {
302 1 : _, f, l := v.vState.constrainBounds(start, end, true /* endInclusive */)
303 1 : return v.reader.EstimateDiskUsage(f, l)
304 1 : }
305 :
306 : // CommonProperties implements the CommonReader interface.
307 1 : func (v *VirtualReader) CommonProperties() *CommonProperties {
308 1 : return &v.Properties
309 1 : }
|