Line data Source code
1 : /*
2 : * Copyright 2017 Dgraph Labs, Inc. and Contributors
3 : * Modifications copyright (C) 2017 Andy Kimball and Contributors
4 : *
5 : * Licensed under the Apache License, Version 2.0 (the "License");
6 : * you may not use this file except in compliance with the License.
7 : * You may obtain a copy of the License at
8 : *
9 : * http://www.apache.org/licenses/LICENSE-2.0
10 : *
11 : * Unless required by applicable law or agreed to in writing, software
12 : * distributed under the License is distributed on an "AS IS" BASIS,
13 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 : * See the License for the specific language governing permissions and
15 : * limitations under the License.
16 : */
17 :
18 : /*
19 : Adapted from RocksDB inline skiplist.
20 :
21 : Key differences:
22 : - No optimization for sequential inserts (no "prev").
23 : - No custom comparator.
24 : - Support overwrites. This requires care when we see the same key when inserting.
25 : For RocksDB or LevelDB, overwrites are implemented as a newer sequence number in the key, so
26 : there is no need for values. We don't intend to support versioning. In-place updates of values
27 : would be more efficient.
28 : - We discard all non-concurrent code.
29 : - We do not support Splices. This simplifies the code a lot.
30 : - No AllocateNode or other pointer arithmetic.
31 : - We combine the findLessThan, findGreaterOrEqual, etc into one function.
32 : */
33 :
34 : /*
35 : Further adapted from Badger: https://github.com/dgraph-io/badger.
36 :
37 : Key differences:
38 : - Support for previous pointers - doubly linked lists. Note that it's up to higher
39 : level code to deal with the intermediate state that occurs during insertion,
40 : where node A is linked to node B, but node B is not yet linked back to node A.
41 : - Iterator includes mutator functions.
42 : */
43 :
44 : package arenaskl // import "github.com/cockroachdb/pebble/internal/arenaskl"
45 :
46 : import (
47 : "math"
48 : "runtime"
49 : "sync/atomic"
50 : "unsafe"
51 :
52 : "github.com/cockroachdb/errors"
53 : "github.com/cockroachdb/pebble/internal/base"
54 : "github.com/cockroachdb/pebble/internal/fastrand"
55 : )
56 :
57 : const (
58 : maxHeight = 20
59 : maxNodeSize = int(unsafe.Sizeof(node{}))
60 : linksSize = int(unsafe.Sizeof(links{}))
61 : pValue = 1 / math.E
62 : )
63 :
64 : // ErrRecordExists indicates that an entry with the specified key already
65 : // exists in the skiplist. Duplicate entries are not directly supported and
66 : // instead must be handled by the user by appending a unique version suffix to
67 : // keys.
68 : var ErrRecordExists = errors.New("record with this key already exists")
69 :
70 : // Skiplist is a fast, concurrent skiplist implementation that supports forward
71 : // and backward iteration. See batchskl.Skiplist for a non-concurrent
72 : // skiplist. Keys and values are immutable once added to the skiplist and
73 : // deletion is not supported. Instead, higher-level code is expected to add new
74 : // entries that shadow existing entries and perform deletion via tombstones. It
75 : // is up to the user to process these shadow entries and tombstones
76 : // appropriately during retrieval.
77 : type Skiplist struct {
78 : arena *Arena
79 : cmp base.Compare
80 : head *node
81 : tail *node
82 : height atomic.Uint32 // Current height. 1 <= height <= maxHeight. CAS.
83 :
84 : // If set to true by tests, then extra delays are added to make it easier to
85 : // detect unusual race conditions.
86 : testing bool
87 : }
88 :
89 : // Inserter TODO(peter)
90 : type Inserter struct {
91 : spl [maxHeight]splice
92 : height uint32
93 : }
94 :
95 : // Add TODO(peter)
96 1 : func (ins *Inserter) Add(list *Skiplist, key base.InternalKey, value []byte) error {
97 1 : return list.addInternal(key, value, ins)
98 1 : }
99 :
100 : var (
101 : probabilities [maxHeight]uint32
102 : )
103 :
104 1 : func init() {
105 1 : // Precompute the skiplist probabilities so that only a single random number
106 1 : // needs to be generated and so that the optimal pvalue can be used (inverse
107 1 : // of Euler's number).
108 1 : p := float64(1.0)
109 1 : for i := 0; i < maxHeight; i++ {
110 1 : probabilities[i] = uint32(float64(math.MaxUint32) * p)
111 1 : p *= pValue
112 1 : }
113 : }
114 :
115 : // NewSkiplist constructs and initializes a new, empty skiplist. All nodes, keys,
116 : // and values in the skiplist will be allocated from the given arena.
117 0 : func NewSkiplist(arena *Arena, cmp base.Compare) *Skiplist {
118 0 : skl := &Skiplist{}
119 0 : skl.Reset(arena, cmp)
120 0 : return skl
121 0 : }
122 :
123 : // Reset the skiplist to empty and re-initialize.
124 1 : func (s *Skiplist) Reset(arena *Arena, cmp base.Compare) {
125 1 : // Allocate head and tail nodes.
126 1 : head, err := newRawNode(arena, maxHeight, 0, 0)
127 1 : if err != nil {
128 0 : panic("arenaSize is not large enough to hold the head node")
129 : }
130 1 : head.keyOffset = 0
131 1 :
132 1 : tail, err := newRawNode(arena, maxHeight, 0, 0)
133 1 : if err != nil {
134 0 : panic("arenaSize is not large enough to hold the tail node")
135 : }
136 1 : tail.keyOffset = 0
137 1 :
138 1 : // Link all head/tail levels together.
139 1 : headOffset := arena.getPointerOffset(unsafe.Pointer(head))
140 1 : tailOffset := arena.getPointerOffset(unsafe.Pointer(tail))
141 1 : for i := 0; i < maxHeight; i++ {
142 1 : head.tower[i].nextOffset.Store(tailOffset)
143 1 : tail.tower[i].prevOffset.Store(headOffset)
144 1 : }
145 :
146 1 : *s = Skiplist{
147 1 : arena: arena,
148 1 : cmp: cmp,
149 1 : head: head,
150 1 : tail: tail,
151 1 : }
152 1 : s.height.Store(1)
153 : }
154 :
155 : // Height returns the height of the highest tower within any of the nodes that
156 : // have ever been allocated as part of this skiplist.
157 1 : func (s *Skiplist) Height() uint32 { return s.height.Load() }
158 :
159 : // Arena returns the arena backing this skiplist.
160 1 : func (s *Skiplist) Arena() *Arena { return s.arena }
161 :
162 : // Size returns the number of bytes that have allocated from the arena.
163 1 : func (s *Skiplist) Size() uint32 { return s.arena.Size() }
164 :
165 : // Add adds a new key if it does not yet exist. If the key already exists, then
166 : // Add returns ErrRecordExists. If there isn't enough room in the arena, then
167 : // Add returns ErrArenaFull.
168 1 : func (s *Skiplist) Add(key base.InternalKey, value []byte) error {
169 1 : var ins Inserter
170 1 : return s.addInternal(key, value, &ins)
171 1 : }
172 :
173 1 : func (s *Skiplist) addInternal(key base.InternalKey, value []byte, ins *Inserter) error {
174 1 : if s.findSplice(key, ins) {
175 0 : // Found a matching node, but handle case where it's been deleted.
176 0 : return ErrRecordExists
177 0 : }
178 :
179 1 : if s.testing {
180 0 : // Add delay to make it easier to test race between this thread
181 0 : // and another thread that sees the intermediate state between
182 0 : // finding the splice and using it.
183 0 : runtime.Gosched()
184 0 : }
185 :
186 1 : nd, height, err := s.newNode(key, value)
187 1 : if err != nil {
188 0 : return err
189 0 : }
190 :
191 1 : ndOffset := s.arena.getPointerOffset(unsafe.Pointer(nd))
192 1 :
193 1 : // We always insert from the base level and up. After you add a node in base
194 1 : // level, we cannot create a node in the level above because it would have
195 1 : // discovered the node in the base level.
196 1 : var found bool
197 1 : var invalidateSplice bool
198 1 : for i := 0; i < int(height); i++ {
199 1 : prev := ins.spl[i].prev
200 1 : next := ins.spl[i].next
201 1 :
202 1 : if prev == nil {
203 1 : // New node increased the height of the skiplist, so assume that the
204 1 : // new level has not yet been populated.
205 1 : if next != nil {
206 0 : panic("next is expected to be nil, since prev is nil")
207 : }
208 :
209 1 : prev = s.head
210 1 : next = s.tail
211 : }
212 :
213 : // +----------------+ +------------+ +----------------+
214 : // | prev | | nd | | next |
215 : // | prevNextOffset |---->| | | |
216 : // | |<----| prevOffset | | |
217 : // | | | nextOffset |---->| |
218 : // | | | |<----| nextPrevOffset |
219 : // +----------------+ +------------+ +----------------+
220 : //
221 : // 1. Initialize prevOffset and nextOffset to point to prev and next.
222 : // 2. CAS prevNextOffset to repoint from next to nd.
223 : // 3. CAS nextPrevOffset to repoint from prev to nd.
224 1 : for {
225 1 : prevOffset := s.arena.getPointerOffset(unsafe.Pointer(prev))
226 1 : nextOffset := s.arena.getPointerOffset(unsafe.Pointer(next))
227 1 : nd.tower[i].init(prevOffset, nextOffset)
228 1 :
229 1 : // Check whether next has an updated link to prev. If it does not,
230 1 : // that can mean one of two things:
231 1 : // 1. The thread that added the next node hasn't yet had a chance
232 1 : // to add the prev link (but will shortly).
233 1 : // 2. Another thread has added a new node between prev and next.
234 1 : nextPrevOffset := next.prevOffset(i)
235 1 : if nextPrevOffset != prevOffset {
236 0 : // Determine whether #1 or #2 is true by checking whether prev
237 0 : // is still pointing to next. As long as the atomic operations
238 0 : // have at least acquire/release semantics (no need for
239 0 : // sequential consistency), this works, as it is equivalent to
240 0 : // the "publication safety" pattern.
241 0 : prevNextOffset := prev.nextOffset(i)
242 0 : if prevNextOffset == nextOffset {
243 0 : // Ok, case #1 is true, so help the other thread along by
244 0 : // updating the next node's prev link.
245 0 : next.casPrevOffset(i, nextPrevOffset, prevOffset)
246 0 : }
247 : }
248 :
249 1 : if prev.casNextOffset(i, nextOffset, ndOffset) {
250 1 : // Managed to insert nd between prev and next, so update the next
251 1 : // node's prev link and go to the next level.
252 1 : if s.testing {
253 0 : // Add delay to make it easier to test race between this thread
254 0 : // and another thread that sees the intermediate state between
255 0 : // setting next and setting prev.
256 0 : runtime.Gosched()
257 0 : }
258 :
259 1 : next.casPrevOffset(i, prevOffset, ndOffset)
260 1 : break
261 : }
262 :
263 : // CAS failed. We need to recompute prev and next. It is unlikely to
264 : // be helpful to try to use a different level as we redo the search,
265 : // because it is unlikely that lots of nodes are inserted between prev
266 : // and next.
267 0 : prev, next, found = s.findSpliceForLevel(key, i, prev)
268 0 : if found {
269 0 : if i != 0 {
270 0 : panic("how can another thread have inserted a node at a non-base level?")
271 : }
272 :
273 0 : return ErrRecordExists
274 : }
275 0 : invalidateSplice = true
276 : }
277 : }
278 :
279 : // If we had to recompute the splice for a level, invalidate the entire
280 : // cached splice.
281 1 : if invalidateSplice {
282 0 : ins.height = 0
283 1 : } else {
284 1 : // The splice was valid. We inserted a node between spl[i].prev and
285 1 : // spl[i].next. Optimistically update spl[i].prev for use in a subsequent
286 1 : // call to add.
287 1 : for i := uint32(0); i < height; i++ {
288 1 : ins.spl[i].prev = nd
289 1 : }
290 : }
291 :
292 1 : return nil
293 : }
294 :
295 : // NewIter returns a new Iterator object. The lower and upper bound parameters
296 : // control the range of keys the iterator will return. Specifying for nil for
297 : // lower or upper bound disables the check for that boundary. Note that lower
298 : // bound is not checked on {SeekGE,First} and upper bound is not check on
299 : // {SeekLT,Last}. The user is expected to perform that check. Note that it is
300 : // safe for an iterator to be copied by value.
301 1 : func (s *Skiplist) NewIter(lower, upper []byte) *Iterator {
302 1 : it := iterPool.Get().(*Iterator)
303 1 : *it = Iterator{list: s, nd: s.head, lower: lower, upper: upper}
304 1 : return it
305 1 : }
306 :
307 : // NewFlushIter returns a new flushIterator, which is similar to an Iterator
308 : // but also sets the current number of the bytes that have been iterated
309 : // through.
310 1 : func (s *Skiplist) NewFlushIter() base.InternalIterator {
311 1 : return &flushIterator{
312 1 : Iterator: Iterator{list: s, nd: s.head},
313 1 : }
314 1 : }
315 :
316 : func (s *Skiplist) newNode(
317 : key base.InternalKey, value []byte,
318 1 : ) (nd *node, height uint32, err error) {
319 1 : height = s.randomHeight()
320 1 : nd, err = newNode(s.arena, height, key, value)
321 1 : if err != nil {
322 0 : return
323 0 : }
324 :
325 : // Try to increase s.height via CAS.
326 1 : listHeight := s.Height()
327 1 : for height > listHeight {
328 1 : if s.height.CompareAndSwap(listHeight, height) {
329 1 : // Successfully increased skiplist.height.
330 1 : break
331 : }
332 :
333 0 : listHeight = s.Height()
334 : }
335 :
336 1 : return
337 : }
338 :
339 1 : func (s *Skiplist) randomHeight() uint32 {
340 1 : rnd := fastrand.Uint32()
341 1 :
342 1 : h := uint32(1)
343 1 : for h < maxHeight && rnd <= probabilities[h] {
344 1 : h++
345 1 : }
346 :
347 1 : return h
348 : }
349 :
350 1 : func (s *Skiplist) findSplice(key base.InternalKey, ins *Inserter) (found bool) {
351 1 : listHeight := s.Height()
352 1 : var level int
353 1 :
354 1 : prev := s.head
355 1 : if ins.height < listHeight {
356 1 : // Our cached height is less than the list height, which means there were
357 1 : // inserts that increased the height of the list. Recompute the splice from
358 1 : // scratch.
359 1 : ins.height = listHeight
360 1 : level = int(ins.height)
361 1 : } else {
362 1 : // Our cached height is equal to the list height.
363 1 : for ; level < int(listHeight); level++ {
364 1 : spl := &ins.spl[level]
365 1 : if s.getNext(spl.prev, level) != spl.next {
366 0 : // One or more nodes have been inserted between the splice at this
367 0 : // level.
368 0 : continue
369 : }
370 1 : if spl.prev != s.head && !s.keyIsAfterNode(spl.prev, key) {
371 1 : // Key lies before splice.
372 1 : level = int(listHeight)
373 1 : break
374 : }
375 1 : if spl.next != s.tail && s.keyIsAfterNode(spl.next, key) {
376 1 : // Key lies after splice.
377 1 : level = int(listHeight)
378 1 : break
379 : }
380 : // The splice brackets the key!
381 1 : prev = spl.prev
382 1 : break
383 : }
384 : }
385 :
386 1 : for level = level - 1; level >= 0; level-- {
387 1 : var next *node
388 1 : prev, next, found = s.findSpliceForLevel(key, level, prev)
389 1 : if next == nil {
390 0 : next = s.tail
391 0 : }
392 1 : ins.spl[level].init(prev, next)
393 : }
394 :
395 1 : return
396 : }
397 :
398 : func (s *Skiplist) findSpliceForLevel(
399 : key base.InternalKey, level int, start *node,
400 1 : ) (prev, next *node, found bool) {
401 1 : prev = start
402 1 :
403 1 : for {
404 1 : // Assume prev.key < key.
405 1 : next = s.getNext(prev, level)
406 1 : if next == s.tail {
407 1 : // Tail node, so done.
408 1 : break
409 : }
410 :
411 1 : offset, size := next.keyOffset, next.keySize
412 1 : nextKey := s.arena.buf[offset : offset+size]
413 1 : cmp := s.cmp(key.UserKey, nextKey)
414 1 : if cmp < 0 {
415 1 : // We are done for this level, since prev.key < key < next.key.
416 1 : break
417 : }
418 1 : if cmp == 0 {
419 1 : // User-key equality.
420 1 : if key.Trailer == next.keyTrailer {
421 0 : // Internal key equality.
422 0 : found = true
423 0 : break
424 : }
425 1 : if key.Trailer > next.keyTrailer {
426 1 : // We are done for this level, since prev.key < key < next.key.
427 1 : break
428 : }
429 : }
430 :
431 : // Keep moving right on this level.
432 1 : prev = next
433 : }
434 :
435 1 : return
436 : }
437 :
438 1 : func (s *Skiplist) keyIsAfterNode(nd *node, key base.InternalKey) bool {
439 1 : ndKey := s.arena.buf[nd.keyOffset : nd.keyOffset+nd.keySize]
440 1 : cmp := s.cmp(ndKey, key.UserKey)
441 1 : if cmp < 0 {
442 1 : return true
443 1 : }
444 1 : if cmp > 0 {
445 1 : return false
446 1 : }
447 : // User-key equality.
448 1 : if key.Trailer == nd.keyTrailer {
449 0 : // Internal key equality.
450 0 : return false
451 0 : }
452 1 : return key.Trailer < nd.keyTrailer
453 : }
454 :
455 1 : func (s *Skiplist) getNext(nd *node, h int) *node {
456 1 : offset := nd.tower[h].nextOffset.Load()
457 1 : return (*node)(s.arena.getPointer(offset))
458 1 : }
459 :
460 1 : func (s *Skiplist) getPrev(nd *node, h int) *node {
461 1 : offset := nd.tower[h].prevOffset.Load()
462 1 : return (*node)(s.arena.getPointer(offset))
463 1 : }
|