Line data Source code
1 : /*
2 : * Copyright 2017 Dgraph Labs, Inc. and Contributors
3 : * Modifications copyright (C) 2017 Andy Kimball and Contributors
4 : *
5 : * Licensed under the Apache License, Version 2.0 (the "License");
6 : * you may not use this file except in compliance with the License.
7 : * You may obtain a copy of the License at
8 : *
9 : * http://www.apache.org/licenses/LICENSE-2.0
10 : *
11 : * Unless required by applicable law or agreed to in writing, software
12 : * distributed under the License is distributed on an "AS IS" BASIS,
13 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 : * See the License for the specific language governing permissions and
15 : * limitations under the License.
16 : */
17 :
18 : /*
19 : Adapted from RocksDB inline skiplist.
20 :
21 : Key differences:
22 : - No optimization for sequential inserts (no "prev").
23 : - No custom comparator.
24 : - Support overwrites. This requires care when we see the same key when inserting.
25 : For RocksDB or LevelDB, overwrites are implemented as a newer sequence number in the key, so
26 : there is no need for values. We don't intend to support versioning. In-place updates of values
27 : would be more efficient.
28 : - We discard all non-concurrent code.
29 : - We do not support Splices. This simplifies the code a lot.
30 : - No AllocateNode or other pointer arithmetic.
31 : - We combine the findLessThan, findGreaterOrEqual, etc into one function.
32 : */
33 :
34 : /*
35 : Further adapted from Badger: https://github.com/dgraph-io/badger.
36 :
37 : Key differences:
38 : - Support for previous pointers - doubly linked lists. Note that it's up to higher
39 : level code to deal with the intermediate state that occurs during insertion,
40 : where node A is linked to node B, but node B is not yet linked back to node A.
41 : - Iterator includes mutator functions.
42 : */
43 :
44 : package arenaskl // import "github.com/cockroachdb/pebble/internal/arenaskl"
45 :
46 : import (
47 : "math"
48 : "runtime"
49 : "sync/atomic"
50 : "unsafe"
51 :
52 : "github.com/cockroachdb/errors"
53 : "github.com/cockroachdb/pebble/internal/base"
54 : "github.com/cockroachdb/pebble/internal/fastrand"
55 : )
56 :
57 : const (
58 : maxHeight = 20
59 : maxNodeSize = int(unsafe.Sizeof(node{}))
60 : linksSize = int(unsafe.Sizeof(links{}))
61 : pValue = 1 / math.E
62 : )
63 :
64 : // ErrRecordExists indicates that an entry with the specified key already
65 : // exists in the skiplist. Duplicate entries are not directly supported and
66 : // instead must be handled by the user by appending a unique version suffix to
67 : // keys.
68 : var ErrRecordExists = errors.New("record with this key already exists")
69 :
70 : // Skiplist is a fast, concurrent skiplist implementation that supports forward
71 : // and backward iteration. See batchskl.Skiplist for a non-concurrent
72 : // skiplist. Keys and values are immutable once added to the skiplist and
73 : // deletion is not supported. Instead, higher-level code is expected to add new
74 : // entries that shadow existing entries and perform deletion via tombstones. It
75 : // is up to the user to process these shadow entries and tombstones
76 : // appropriately during retrieval.
77 : type Skiplist struct {
78 : arena *Arena
79 : cmp base.Compare
80 : head *node
81 : tail *node
82 : height atomic.Uint32 // Current height. 1 <= height <= maxHeight. CAS.
83 :
84 : // If set to true by tests, then extra delays are added to make it easier to
85 : // detect unusual race conditions.
86 : testing bool
87 : }
88 :
89 : // Inserter TODO(peter)
90 : type Inserter struct {
91 : spl [maxHeight]splice
92 : height uint32
93 : }
94 :
95 : // Add TODO(peter)
96 2 : func (ins *Inserter) Add(list *Skiplist, key base.InternalKey, value []byte) error {
97 2 : return list.addInternal(key, value, ins)
98 2 : }
99 :
100 : var (
101 : probabilities [maxHeight]uint32
102 : )
103 :
104 2 : func init() {
105 2 : // Precompute the skiplist probabilities so that only a single random number
106 2 : // needs to be generated and so that the optimal pvalue can be used (inverse
107 2 : // of Euler's number).
108 2 : p := float64(1.0)
109 2 : for i := 0; i < maxHeight; i++ {
110 2 : probabilities[i] = uint32(float64(math.MaxUint32) * p)
111 2 : p *= pValue
112 2 : }
113 : }
114 :
115 : // NewSkiplist constructs and initializes a new, empty skiplist. All nodes, keys,
116 : // and values in the skiplist will be allocated from the given arena.
117 1 : func NewSkiplist(arena *Arena, cmp base.Compare) *Skiplist {
118 1 : skl := &Skiplist{}
119 1 : skl.Reset(arena, cmp)
120 1 : return skl
121 1 : }
122 :
123 : // Reset the skiplist to empty and re-initialize.
124 2 : func (s *Skiplist) Reset(arena *Arena, cmp base.Compare) {
125 2 : // Allocate head and tail nodes.
126 2 : head, err := newRawNode(arena, maxHeight, 0, 0)
127 2 : if err != nil {
128 0 : panic("arenaSize is not large enough to hold the head node")
129 : }
130 2 : head.keyOffset = 0
131 2 :
132 2 : tail, err := newRawNode(arena, maxHeight, 0, 0)
133 2 : if err != nil {
134 0 : panic("arenaSize is not large enough to hold the tail node")
135 : }
136 2 : tail.keyOffset = 0
137 2 :
138 2 : // Link all head/tail levels together.
139 2 : headOffset := arena.getPointerOffset(unsafe.Pointer(head))
140 2 : tailOffset := arena.getPointerOffset(unsafe.Pointer(tail))
141 2 : for i := 0; i < maxHeight; i++ {
142 2 : head.tower[i].nextOffset.Store(tailOffset)
143 2 : tail.tower[i].prevOffset.Store(headOffset)
144 2 : }
145 :
146 2 : *s = Skiplist{
147 2 : arena: arena,
148 2 : cmp: cmp,
149 2 : head: head,
150 2 : tail: tail,
151 2 : }
152 2 : s.height.Store(1)
153 : }
154 :
155 : // Height returns the height of the highest tower within any of the nodes that
156 : // have ever been allocated as part of this skiplist.
157 2 : func (s *Skiplist) Height() uint32 { return s.height.Load() }
158 :
159 : // Arena returns the arena backing this skiplist.
160 2 : func (s *Skiplist) Arena() *Arena { return s.arena }
161 :
162 : // Size returns the number of bytes that have allocated from the arena.
163 2 : func (s *Skiplist) Size() uint32 { return s.arena.Size() }
164 :
165 : // Add adds a new key if it does not yet exist. If the key already exists, then
166 : // Add returns ErrRecordExists. If there isn't enough room in the arena, then
167 : // Add returns ErrArenaFull.
168 2 : func (s *Skiplist) Add(key base.InternalKey, value []byte) error {
169 2 : var ins Inserter
170 2 : return s.addInternal(key, value, &ins)
171 2 : }
172 :
173 2 : func (s *Skiplist) addInternal(key base.InternalKey, value []byte, ins *Inserter) error {
174 2 : if s.findSplice(key, ins) {
175 1 : // Found a matching node, but handle case where it's been deleted.
176 1 : return ErrRecordExists
177 1 : }
178 :
179 2 : if s.testing {
180 1 : // Add delay to make it easier to test race between this thread
181 1 : // and another thread that sees the intermediate state between
182 1 : // finding the splice and using it.
183 1 : runtime.Gosched()
184 1 : }
185 :
186 2 : nd, height, err := s.newNode(key, value)
187 2 : if err != nil {
188 1 : return err
189 1 : }
190 :
191 2 : ndOffset := s.arena.getPointerOffset(unsafe.Pointer(nd))
192 2 :
193 2 : // We always insert from the base level and up. After you add a node in base
194 2 : // level, we cannot create a node in the level above because it would have
195 2 : // discovered the node in the base level.
196 2 : var found bool
197 2 : var invalidateSplice bool
198 2 : for i := 0; i < int(height); i++ {
199 2 : prev := ins.spl[i].prev
200 2 : next := ins.spl[i].next
201 2 :
202 2 : if prev == nil {
203 2 : // New node increased the height of the skiplist, so assume that the
204 2 : // new level has not yet been populated.
205 2 : if next != nil {
206 0 : panic("next is expected to be nil, since prev is nil")
207 : }
208 :
209 2 : prev = s.head
210 2 : next = s.tail
211 : }
212 :
213 : // +----------------+ +------------+ +----------------+
214 : // | prev | | nd | | next |
215 : // | prevNextOffset |---->| | | |
216 : // | |<----| prevOffset | | |
217 : // | | | nextOffset |---->| |
218 : // | | | |<----| nextPrevOffset |
219 : // +----------------+ +------------+ +----------------+
220 : //
221 : // 1. Initialize prevOffset and nextOffset to point to prev and next.
222 : // 2. CAS prevNextOffset to repoint from next to nd.
223 : // 3. CAS nextPrevOffset to repoint from prev to nd.
224 2 : for {
225 2 : prevOffset := s.arena.getPointerOffset(unsafe.Pointer(prev))
226 2 : nextOffset := s.arena.getPointerOffset(unsafe.Pointer(next))
227 2 : nd.tower[i].init(prevOffset, nextOffset)
228 2 :
229 2 : // Check whether next has an updated link to prev. If it does not,
230 2 : // that can mean one of two things:
231 2 : // 1. The thread that added the next node hasn't yet had a chance
232 2 : // to add the prev link (but will shortly).
233 2 : // 2. Another thread has added a new node between prev and next.
234 2 : nextPrevOffset := next.prevOffset(i)
235 2 : if nextPrevOffset != prevOffset {
236 1 : // Determine whether #1 or #2 is true by checking whether prev
237 1 : // is still pointing to next. As long as the atomic operations
238 1 : // have at least acquire/release semantics (no need for
239 1 : // sequential consistency), this works, as it is equivalent to
240 1 : // the "publication safety" pattern.
241 1 : prevNextOffset := prev.nextOffset(i)
242 1 : if prevNextOffset == nextOffset {
243 1 : // Ok, case #1 is true, so help the other thread along by
244 1 : // updating the next node's prev link.
245 1 : next.casPrevOffset(i, nextPrevOffset, prevOffset)
246 1 : }
247 : }
248 :
249 2 : if prev.casNextOffset(i, nextOffset, ndOffset) {
250 2 : // Managed to insert nd between prev and next, so update the next
251 2 : // node's prev link and go to the next level.
252 2 : if s.testing {
253 1 : // Add delay to make it easier to test race between this thread
254 1 : // and another thread that sees the intermediate state between
255 1 : // setting next and setting prev.
256 1 : runtime.Gosched()
257 1 : }
258 :
259 2 : next.casPrevOffset(i, prevOffset, ndOffset)
260 2 : break
261 : }
262 :
263 : // CAS failed. We need to recompute prev and next. It is unlikely to
264 : // be helpful to try to use a different level as we redo the search,
265 : // because it is unlikely that lots of nodes are inserted between prev
266 : // and next.
267 1 : prev, next, found = s.findSpliceForLevel(key, i, prev)
268 1 : if found {
269 1 : if i != 0 {
270 0 : panic("how can another thread have inserted a node at a non-base level?")
271 : }
272 :
273 1 : return ErrRecordExists
274 : }
275 1 : invalidateSplice = true
276 : }
277 : }
278 :
279 : // If we had to recompute the splice for a level, invalidate the entire
280 : // cached splice.
281 2 : if invalidateSplice {
282 1 : ins.height = 0
283 2 : } else {
284 2 : // The splice was valid. We inserted a node between spl[i].prev and
285 2 : // spl[i].next. Optimistically update spl[i].prev for use in a subsequent
286 2 : // call to add.
287 2 : for i := uint32(0); i < height; i++ {
288 2 : ins.spl[i].prev = nd
289 2 : }
290 : }
291 :
292 2 : return nil
293 : }
294 :
295 : // NewIter returns a new Iterator object. The lower and upper bound parameters
296 : // control the range of keys the iterator will return. Specifying for nil for
297 : // lower or upper bound disables the check for that boundary. Note that lower
298 : // bound is not checked on {SeekGE,First} and upper bound is not check on
299 : // {SeekLT,Last}. The user is expected to perform that check. Note that it is
300 : // safe for an iterator to be copied by value.
301 2 : func (s *Skiplist) NewIter(lower, upper []byte) *Iterator {
302 2 : it := iterPool.Get().(*Iterator)
303 2 : *it = Iterator{list: s, nd: s.head, lower: lower, upper: upper}
304 2 : return it
305 2 : }
306 :
307 : // NewFlushIter returns a new flushIterator, which is similar to an Iterator
308 : // but also sets the current number of the bytes that have been iterated
309 : // through.
310 2 : func (s *Skiplist) NewFlushIter() base.InternalIterator {
311 2 : return &flushIterator{
312 2 : Iterator: Iterator{list: s, nd: s.head},
313 2 : }
314 2 : }
315 :
316 : func (s *Skiplist) newNode(
317 : key base.InternalKey, value []byte,
318 2 : ) (nd *node, height uint32, err error) {
319 2 : height = s.randomHeight()
320 2 : nd, err = newNode(s.arena, height, key, value)
321 2 : if err != nil {
322 1 : return
323 1 : }
324 :
325 : // Try to increase s.height via CAS.
326 2 : listHeight := s.Height()
327 2 : for height > listHeight {
328 2 : if s.height.CompareAndSwap(listHeight, height) {
329 2 : // Successfully increased skiplist.height.
330 2 : break
331 : }
332 :
333 0 : listHeight = s.Height()
334 : }
335 :
336 2 : return
337 : }
338 :
339 2 : func (s *Skiplist) randomHeight() uint32 {
340 2 : rnd := fastrand.Uint32()
341 2 :
342 2 : h := uint32(1)
343 2 : for h < maxHeight && rnd <= probabilities[h] {
344 2 : h++
345 2 : }
346 :
347 2 : return h
348 : }
349 :
350 2 : func (s *Skiplist) findSplice(key base.InternalKey, ins *Inserter) (found bool) {
351 2 : listHeight := s.Height()
352 2 : var level int
353 2 :
354 2 : prev := s.head
355 2 : if ins.height < listHeight {
356 2 : // Our cached height is less than the list height, which means there were
357 2 : // inserts that increased the height of the list. Recompute the splice from
358 2 : // scratch.
359 2 : ins.height = listHeight
360 2 : level = int(ins.height)
361 2 : } else {
362 2 : // Our cached height is equal to the list height.
363 2 : for ; level < int(listHeight); level++ {
364 2 : spl := &ins.spl[level]
365 2 : if s.getNext(spl.prev, level) != spl.next {
366 1 : // One or more nodes have been inserted between the splice at this
367 1 : // level.
368 1 : continue
369 : }
370 2 : if spl.prev != s.head && !s.keyIsAfterNode(spl.prev, key) {
371 2 : // Key lies before splice.
372 2 : level = int(listHeight)
373 2 : break
374 : }
375 2 : if spl.next != s.tail && s.keyIsAfterNode(spl.next, key) {
376 2 : // Key lies after splice.
377 2 : level = int(listHeight)
378 2 : break
379 : }
380 : // The splice brackets the key!
381 2 : prev = spl.prev
382 2 : break
383 : }
384 : }
385 :
386 2 : for level = level - 1; level >= 0; level-- {
387 2 : var next *node
388 2 : prev, next, found = s.findSpliceForLevel(key, level, prev)
389 2 : if next == nil {
390 0 : next = s.tail
391 0 : }
392 2 : ins.spl[level].init(prev, next)
393 : }
394 :
395 2 : return
396 : }
397 :
398 : func (s *Skiplist) findSpliceForLevel(
399 : key base.InternalKey, level int, start *node,
400 2 : ) (prev, next *node, found bool) {
401 2 : prev = start
402 2 :
403 2 : for {
404 2 : // Assume prev.key < key.
405 2 : next = s.getNext(prev, level)
406 2 : if next == s.tail {
407 2 : // Tail node, so done.
408 2 : break
409 : }
410 :
411 2 : offset, size := next.keyOffset, next.keySize
412 2 : nextKey := s.arena.buf[offset : offset+size]
413 2 : cmp := s.cmp(key.UserKey, nextKey)
414 2 : if cmp < 0 {
415 2 : // We are done for this level, since prev.key < key < next.key.
416 2 : break
417 : }
418 2 : if cmp == 0 {
419 2 : // User-key equality.
420 2 : if key.Trailer == next.keyTrailer {
421 1 : // Internal key equality.
422 1 : found = true
423 1 : break
424 : }
425 2 : if key.Trailer > next.keyTrailer {
426 2 : // We are done for this level, since prev.key < key < next.key.
427 2 : break
428 : }
429 : }
430 :
431 : // Keep moving right on this level.
432 2 : prev = next
433 : }
434 :
435 2 : return
436 : }
437 :
438 2 : func (s *Skiplist) keyIsAfterNode(nd *node, key base.InternalKey) bool {
439 2 : ndKey := s.arena.buf[nd.keyOffset : nd.keyOffset+nd.keySize]
440 2 : cmp := s.cmp(ndKey, key.UserKey)
441 2 : if cmp < 0 {
442 2 : return true
443 2 : }
444 2 : if cmp > 0 {
445 2 : return false
446 2 : }
447 : // User-key equality.
448 2 : if key.Trailer == nd.keyTrailer {
449 0 : // Internal key equality.
450 0 : return false
451 0 : }
452 2 : return key.Trailer < nd.keyTrailer
453 : }
454 :
455 2 : func (s *Skiplist) getNext(nd *node, h int) *node {
456 2 : offset := nd.tower[h].nextOffset.Load()
457 2 : return (*node)(s.arena.getPointer(offset))
458 2 : }
459 :
460 2 : func (s *Skiplist) getPrev(nd *node, h int) *node {
461 2 : offset := nd.tower[h].prevOffset.Load()
462 2 : return (*node)(s.arena.getPointer(offset))
463 2 : }
|