Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "io"
11 : "sync"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/keyspan"
16 : "github.com/cockroachdb/pebble/internal/manifest"
17 : "github.com/cockroachdb/pebble/internal/treeprinter"
18 : "github.com/cockroachdb/pebble/sstable/blob"
19 : "github.com/cockroachdb/pebble/sstable/block"
20 : )
21 :
22 : // Get gets the value for the given key. It returns ErrNotFound if the DB does
23 : // not contain the key.
24 : //
25 : // The caller should not modify the contents of the returned slice, but it is
26 : // safe to modify the contents of the argument after Get returns. The returned
27 : // slice will remain valid until the returned Closer is closed. On success, the
28 : // caller MUST call closer.Close() or a memory leak will occur.
29 1 : func (d *DB) Get(key []byte) ([]byte, io.Closer, error) {
30 1 : return d.getInternal(key, nil /* batch */, nil /* snapshot */)
31 1 : }
32 :
33 : type getIterAlloc struct {
34 : dbi Iterator
35 : keyBuf []byte
36 : get getIter
37 : }
38 :
39 : // Close closes the contained iterator and recycles the alloc.
40 : //
41 : // During a successful call to DB.Get, *getIterAlloc is returned to the caller
42 : // as the io.Closer.
43 1 : func (g *getIterAlloc) Close() error {
44 1 : err := g.dbi.Close()
45 1 : keyBuf := g.keyBuf
46 1 : if cap(g.dbi.keyBuf) < maxKeyBufCacheSize && cap(g.dbi.keyBuf) > cap(keyBuf) {
47 1 : keyBuf = g.dbi.keyBuf
48 1 : }
49 1 : *g = getIterAlloc{}
50 1 : g.keyBuf = keyBuf
51 1 : getIterAllocPool.Put(g)
52 1 : return err
53 : }
54 :
55 : var getIterAllocPool = sync.Pool{
56 1 : New: func() interface{} {
57 1 : return &getIterAlloc{}
58 1 : },
59 : }
60 :
61 1 : func (d *DB) getInternal(key []byte, b *Batch, s *Snapshot) ([]byte, io.Closer, error) {
62 1 : if err := d.closed.Load(); err != nil {
63 1 : panic(err)
64 : }
65 :
66 : // Grab and reference the current readState. This prevents the underlying
67 : // files in the associated version from being deleted if there is a current
68 : // compaction. The readState is unref'd by Iterator.Close().
69 1 : readState := d.loadReadState()
70 1 :
71 1 : // Determine the seqnum to read at after grabbing the read state (current and
72 1 : // memtables) above.
73 1 : var seqNum base.SeqNum
74 1 : if s != nil {
75 1 : seqNum = s.seqNum
76 1 : } else {
77 1 : seqNum = d.mu.versions.visibleSeqNum.Load()
78 1 : }
79 :
80 1 : buf := getIterAllocPool.Get().(*getIterAlloc)
81 1 :
82 1 : get := &buf.get
83 1 : *get = getIter{
84 1 : comparer: d.opts.Comparer,
85 1 : newIters: d.newIters,
86 1 : snapshot: seqNum,
87 1 : iterOpts: IterOptions{
88 1 : // TODO(sumeer): replace with a parameter provided by the caller.
89 1 : Category: categoryGet,
90 1 : logger: d.opts.Logger,
91 1 : snapshotForHideObsoletePoints: seqNum,
92 1 : },
93 1 : key: key,
94 1 : // Compute the key prefix for bloom filtering.
95 1 : prefix: key[:d.opts.Comparer.Split(key)],
96 1 : batch: b,
97 1 : mem: readState.memtables,
98 1 : l0: readState.current.L0SublevelFiles,
99 1 : version: readState.current,
100 1 : }
101 1 :
102 1 : // Strip off memtables which cannot possibly contain the seqNum being read
103 1 : // at.
104 1 : for len(get.mem) > 0 {
105 1 : n := len(get.mem)
106 1 : if logSeqNum := get.mem[n-1].logSeqNum; logSeqNum < seqNum {
107 1 : break
108 : }
109 1 : get.mem = get.mem[:n-1]
110 : }
111 :
112 1 : i := &buf.dbi
113 1 : pointIter := get
114 1 : *i = Iterator{
115 1 : ctx: context.Background(),
116 1 : iter: pointIter,
117 1 : pointIter: pointIter,
118 1 : merge: d.merge,
119 1 : comparer: d.opts.Comparer,
120 1 : readState: readState,
121 1 : keyBuf: buf.keyBuf,
122 1 : }
123 1 : // Set up a blob value fetcher to use for retrieving values from blob files.
124 1 : i.blobValueFetcher.Init(&readState.current.BlobFiles, d.fileCache, block.NoReadEnv,
125 1 : blob.SuggestedCachedReaders(readState.current.MaxReadAmp()))
126 1 : get.iiopts.blobValueFetcher = &i.blobValueFetcher
127 1 :
128 1 : if !i.First() {
129 1 : err := buf.Close()
130 1 : if err != nil {
131 1 : return nil, nil, err
132 1 : }
133 1 : return nil, nil, ErrNotFound
134 : }
135 1 : val, err := i.ValueAndErr()
136 1 : if err != nil {
137 1 : return nil, nil, errors.CombineErrors(err, buf.Close())
138 1 : }
139 1 : return val, buf, nil
140 : }
141 :
142 : // getIter is an internal iterator used to perform gets. It iterates through
143 : // the values for a particular key, level by level. It is not a general purpose
144 : // internalIterator, but specialized for Get operations so that it loads data
145 : // lazily.
146 : type getIter struct {
147 : comparer *Comparer
148 : newIters tableNewIters
149 : snapshot base.SeqNum
150 : iterOpts IterOptions
151 : iiopts internalIterOpts
152 : key []byte
153 : prefix []byte
154 : iter internalIterator
155 : level int
156 : batch *Batch
157 : mem flushableList
158 : l0 []manifest.LevelSlice
159 : version *manifest.Version
160 : iterKV *base.InternalKV
161 : // tombstoned and tombstonedSeqNum track whether the key has been deleted by
162 : // a range delete tombstone. The first visible (at getIter.snapshot) range
163 : // deletion encounterd transitions tombstoned to true. The tombstonedSeqNum
164 : // field is updated to hold the sequence number of the tombstone.
165 : tombstoned bool
166 : tombstonedSeqNum base.SeqNum
167 : err error
168 : }
169 :
170 : // TODO(sumeer): CockroachDB code doesn't use getIter, but, for completeness,
171 : // make this implement InternalIteratorWithStats.
172 :
173 : // getIter implements the base.InternalIterator interface.
174 : var _ base.InternalIterator = (*getIter)(nil)
175 :
176 0 : func (g *getIter) String() string {
177 0 : return fmt.Sprintf("len(l0)=%d, len(mem)=%d, level=%d", len(g.l0), len(g.mem), g.level)
178 0 : }
179 :
180 0 : func (g *getIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
181 0 : panic("pebble: SeekGE unimplemented")
182 : }
183 :
184 0 : func (g *getIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) *base.InternalKV {
185 0 : return g.SeekPrefixGEStrict(prefix, key, flags)
186 0 : }
187 :
188 0 : func (g *getIter) SeekPrefixGEStrict(prefix, key []byte, flags base.SeekGEFlags) *base.InternalKV {
189 0 : panic("pebble: SeekPrefixGE unimplemented")
190 : }
191 :
192 0 : func (g *getIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
193 0 : panic("pebble: SeekLT unimplemented")
194 : }
195 :
196 1 : func (g *getIter) First() *base.InternalKV {
197 1 : return g.Next()
198 1 : }
199 :
200 0 : func (g *getIter) Last() *base.InternalKV {
201 0 : panic("pebble: Last unimplemented")
202 : }
203 :
204 1 : func (g *getIter) Next() *base.InternalKV {
205 1 : // If g.iter != nil, we're already iterating through a level. Next. Note
206 1 : // that it's possible the next key within the level is still relevant (eg,
207 1 : // MERGE keys written in the presence of an LSM snapshot).
208 1 : //
209 1 : // NB: We can't perform this Next below, in the for loop, because when we
210 1 : // open an iterator into the next level, we need to seek to the key.
211 1 : if g.iter != nil {
212 1 : g.iterKV = g.iter.Next()
213 1 : if err := g.iter.Error(); err != nil {
214 0 : g.err = err
215 0 : return nil
216 0 : }
217 : }
218 :
219 : // This for loop finds the next internal key in the LSM that is equal to
220 : // g.key, visible at g.snapshot and not shadowed by a range deletion. If it
221 : // exhausts a level, it initializes iterators for the next level.
222 1 : for {
223 1 : if g.iter != nil {
224 1 : if g.iterKV != nil {
225 1 : // Check if the current KV pair is deleted by a range deletion.
226 1 : if g.tombstoned && g.tombstonedSeqNum > g.iterKV.SeqNum() {
227 1 : // We have a range tombstone covering this key. Rather than
228 1 : // return a point or range deletion here, we return nil and
229 1 : // close our internal iterator stopping iteration.
230 1 : g.err = g.iter.Close()
231 1 : g.iter = nil
232 1 : return nil
233 1 : }
234 :
235 : // Is this the correct user key?
236 1 : if g.comparer.Equal(g.key, g.iterKV.K.UserKey) {
237 1 : // If the KV pair is not visible at the get's snapshot,
238 1 : // Next. The level may still contain older keys with the
239 1 : // same user key that are visible.
240 1 : if !g.iterKV.Visible(g.snapshot, base.SeqNumMax) {
241 1 : g.iterKV = g.iter.Next()
242 1 : continue
243 : }
244 1 : return g.iterKV
245 : }
246 : }
247 : // We've advanced the iterator passed the desired key. Move on to the
248 : // next memtable / level.
249 1 : g.err = g.iter.Close()
250 1 : g.iter = nil
251 1 : if g.err != nil {
252 1 : return nil
253 1 : }
254 : }
255 : // g.iter == nil; we need to initialize the next iterator.
256 1 : if !g.initializeNextIterator() {
257 1 : return nil
258 1 : }
259 1 : g.iterKV = g.iter.SeekPrefixGE(g.prefix, g.key, base.SeekGEFlagsNone)
260 : }
261 : }
262 :
263 0 : func (g *getIter) Prev() *base.InternalKV {
264 0 : panic("pebble: Prev unimplemented")
265 : }
266 :
267 0 : func (g *getIter) NextPrefix([]byte) *base.InternalKV {
268 0 : panic("pebble: NextPrefix unimplemented")
269 : }
270 :
271 1 : func (g *getIter) Error() error {
272 1 : return g.err
273 1 : }
274 :
275 1 : func (g *getIter) Close() error {
276 1 : if g.iter != nil {
277 1 : if err := g.iter.Close(); err != nil && g.err == nil {
278 0 : g.err = err
279 0 : }
280 1 : g.iter = nil
281 : }
282 1 : return g.err
283 : }
284 :
285 0 : func (g *getIter) SetBounds(lower, upper []byte) {
286 0 : panic("pebble: SetBounds unimplemented")
287 : }
288 :
289 0 : func (g *getIter) SetContext(_ context.Context) {}
290 :
291 : // DebugTree is part of the InternalIterator interface.
292 0 : func (g *getIter) DebugTree(tp treeprinter.Node) {
293 0 : n := tp.Childf("%T(%p)", g, g)
294 0 : if g.iter != nil {
295 0 : g.iter.DebugTree(n)
296 0 : }
297 : }
298 :
299 1 : func (g *getIter) initializeNextIterator() (ok bool) {
300 1 : // A batch's keys shadow all other keys, so we visit the batch first.
301 1 : if g.batch != nil {
302 1 : if g.batch.index == nil {
303 0 : g.err = ErrNotIndexed
304 0 : g.iterKV = nil
305 0 : return false
306 0 : }
307 1 : g.iter = g.batch.newInternalIter(nil)
308 1 : if !g.maybeSetTombstone(g.batch.newRangeDelIter(nil,
309 1 : // Get always reads the entirety of the batch's history, so no
310 1 : // batch keys should be filtered.
311 1 : base.SeqNumMax,
312 1 : )) {
313 0 : return false
314 0 : }
315 1 : g.batch = nil
316 1 : return true
317 : }
318 :
319 : // If we're trying to initialize the next level of the iterator stack but
320 : // have a tombstone from a previous level, it is guaranteed to delete keys
321 : // in lower levels. This key is deleted.
322 1 : if g.tombstoned {
323 1 : return false
324 1 : }
325 :
326 : // Create iterators from memtables from newest to oldest.
327 1 : if n := len(g.mem); n > 0 {
328 1 : m := g.mem[n-1]
329 1 : g.iter = m.newIter(nil)
330 1 : if !g.maybeSetTombstone(m.newRangeDelIter(nil)) {
331 0 : return false
332 0 : }
333 1 : g.mem = g.mem[:n-1]
334 1 : return true
335 : }
336 :
337 : // Visit each sublevel of L0 individually, so that we only need to read
338 : // at most one file per sublevel.
339 1 : if g.level == 0 {
340 1 : // Create iterators from L0 from newest to oldest.
341 1 : if n := len(g.l0); n > 0 {
342 1 : files := g.l0[n-1].Iter()
343 1 : g.l0 = g.l0[:n-1]
344 1 :
345 1 : iter, rangeDelIter, err := g.getSSTableIterators(files, manifest.L0Sublevel(n))
346 1 : if err != nil {
347 1 : g.err = firstError(g.err, err)
348 1 : return false
349 1 : }
350 1 : if !g.maybeSetTombstone(rangeDelIter) {
351 0 : return false
352 0 : }
353 1 : g.iter = iter
354 1 : return true
355 : }
356 : // We've exhausted all the sublevels of L0. Progress to L1.
357 1 : g.level++
358 : }
359 1 : for g.level < numLevels {
360 1 : if g.version.Levels[g.level].Empty() {
361 1 : g.level++
362 1 : continue
363 : }
364 : // Open the next level of the LSM.
365 1 : iter, rangeDelIter, err := g.getSSTableIterators(g.version.Levels[g.level].Iter(), manifest.Level(g.level))
366 1 : if err != nil {
367 1 : g.err = firstError(g.err, err)
368 1 : return false
369 1 : }
370 1 : if !g.maybeSetTombstone(rangeDelIter) {
371 0 : return false
372 0 : }
373 1 : g.level++
374 1 : g.iter = iter
375 1 : return true
376 : }
377 : // We've exhausted all levels of the LSM.
378 1 : return false
379 : }
380 :
381 : // getSSTableIterators returns a point iterator and a range deletion iterator
382 : // for the sstable in files that overlaps with the key g.key. Pebble does not
383 : // split user keys across adjacent sstables within a level, ensuring that at
384 : // most one sstable overlaps g.key.
385 : func (g *getIter) getSSTableIterators(
386 : files manifest.LevelIterator, level manifest.Layer,
387 1 : ) (internalIterator, keyspan.FragmentIterator, error) {
388 1 : files = files.Filter(manifest.KeyTypePoint)
389 1 : m := files.SeekGE(g.comparer.Compare, g.key)
390 1 : if m == nil {
391 1 : return emptyIter, nil, nil
392 1 : }
393 : // m is now positioned at the file containing the first point key ≥ `g.key`.
394 : // Does it exist and possibly contain point keys with the user key 'g.key'?
395 1 : if m == nil || !m.HasPointKeys || g.comparer.Compare(m.PointKeyBounds.SmallestUserKey(), g.key) > 0 {
396 1 : return emptyIter, nil, nil
397 1 : }
398 : // m may possibly contain point (or range deletion) keys relevant to g.key.
399 1 : g.iterOpts.layer = level
400 1 : iters, err := g.newIters(context.Background(), m, &g.iterOpts, g.iiopts, iterPointKeys|iterRangeDeletions)
401 1 : if err != nil {
402 1 : return emptyIter, nil, err
403 1 : }
404 1 : return iters.Point(), iters.RangeDeletion(), nil
405 : }
406 :
407 : // maybeSetTombstone updates g.tombstoned[SeqNum] to reflect the presence of a
408 : // range deletion covering g.key, if there are any. It returns true if
409 : // successful, or false if an error occurred and the caller should abort
410 : // iteration.
411 1 : func (g *getIter) maybeSetTombstone(rangeDelIter keyspan.FragmentIterator) (ok bool) {
412 1 : if rangeDelIter == nil {
413 1 : // Nothing to do.
414 1 : return true
415 1 : }
416 : // Find the range deletion that covers the sought key, if any.
417 1 : t, err := keyspan.Get(g.comparer.Compare, rangeDelIter, g.key)
418 1 : if err != nil {
419 0 : g.err = firstError(g.err, err)
420 0 : return false
421 0 : }
422 : // Find the most recent visible range deletion's sequence number. We only
423 : // care about the most recent range deletion that's visible because it's the
424 : // "most powerful."
425 1 : g.tombstonedSeqNum, g.tombstoned = t.LargestVisibleSeqNum(g.snapshot)
426 1 : rangeDelIter.Close()
427 1 : return true
428 : }
|