LCOV - code coverage report
Current view: top level - pebble/internal/arenaskl - skl.go (source / functions) Hit Total Coverage
Test: 2023-12-01 08:08Z 51fca96d - tests + meta.lcov Lines: 240 250 96.0 %
Date: 2023-12-01 08:09:44 Functions: 0 0 -

          Line data    Source code
       1             : /*
       2             :  * Copyright 2017 Dgraph Labs, Inc. and Contributors
       3             :  * Modifications copyright (C) 2017 Andy Kimball and Contributors
       4             :  *
       5             :  * Licensed under the Apache License, Version 2.0 (the "License");
       6             :  * you may not use this file except in compliance with the License.
       7             :  * You may obtain a copy of the License at
       8             :  *
       9             :  *     http://www.apache.org/licenses/LICENSE-2.0
      10             :  *
      11             :  * Unless required by applicable law or agreed to in writing, software
      12             :  * distributed under the License is distributed on an "AS IS" BASIS,
      13             :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      14             :  * See the License for the specific language governing permissions and
      15             :  * limitations under the License.
      16             :  */
      17             : 
      18             : /*
      19             : Adapted from RocksDB inline skiplist.
      20             : 
      21             : Key differences:
      22             : - No optimization for sequential inserts (no "prev").
      23             : - No custom comparator.
      24             : - Support overwrites. This requires care when we see the same key when inserting.
      25             :   For RocksDB or LevelDB, overwrites are implemented as a newer sequence number in the key, so
      26             :         there is no need for values. We don't intend to support versioning. In-place updates of values
      27             :         would be more efficient.
      28             : - We discard all non-concurrent code.
      29             : - We do not support Splices. This simplifies the code a lot.
      30             : - No AllocateNode or other pointer arithmetic.
      31             : - We combine the findLessThan, findGreaterOrEqual, etc into one function.
      32             : */
      33             : 
      34             : /*
      35             : Further adapted from Badger: https://github.com/dgraph-io/badger.
      36             : 
      37             : Key differences:
      38             : - Support for previous pointers - doubly linked lists. Note that it's up to higher
      39             :   level code to deal with the intermediate state that occurs during insertion,
      40             :   where node A is linked to node B, but node B is not yet linked back to node A.
      41             : - Iterator includes mutator functions.
      42             : */
      43             : 
      44             : package arenaskl // import "github.com/cockroachdb/pebble/internal/arenaskl"
      45             : 
      46             : import (
      47             :         "math"
      48             :         "runtime"
      49             :         "sync/atomic"
      50             :         "unsafe"
      51             : 
      52             :         "github.com/cockroachdb/errors"
      53             :         "github.com/cockroachdb/pebble/internal/base"
      54             :         "github.com/cockroachdb/pebble/internal/fastrand"
      55             : )
      56             : 
      57             : const (
      58             :         maxHeight   = 20
      59             :         maxNodeSize = int(unsafe.Sizeof(node{}))
      60             :         linksSize   = int(unsafe.Sizeof(links{}))
      61             :         pValue      = 1 / math.E
      62             : )
      63             : 
      64             : // ErrRecordExists indicates that an entry with the specified key already
      65             : // exists in the skiplist. Duplicate entries are not directly supported and
      66             : // instead must be handled by the user by appending a unique version suffix to
      67             : // keys.
      68             : var ErrRecordExists = errors.New("record with this key already exists")
      69             : 
      70             : // Skiplist is a fast, concurrent skiplist implementation that supports forward
      71             : // and backward iteration. See batchskl.Skiplist for a non-concurrent
      72             : // skiplist. Keys and values are immutable once added to the skiplist and
      73             : // deletion is not supported. Instead, higher-level code is expected to add new
      74             : // entries that shadow existing entries and perform deletion via tombstones. It
      75             : // is up to the user to process these shadow entries and tombstones
      76             : // appropriately during retrieval.
      77             : type Skiplist struct {
      78             :         arena  *Arena
      79             :         cmp    base.Compare
      80             :         head   *node
      81             :         tail   *node
      82             :         height atomic.Uint32 // Current height. 1 <= height <= maxHeight. CAS.
      83             : 
      84             :         // If set to true by tests, then extra delays are added to make it easier to
      85             :         // detect unusual race conditions.
      86             :         testing bool
      87             : }
      88             : 
      89             : // Inserter TODO(peter)
      90             : type Inserter struct {
      91             :         spl    [maxHeight]splice
      92             :         height uint32
      93             : }
      94             : 
      95             : // Add TODO(peter)
      96           2 : func (ins *Inserter) Add(list *Skiplist, key base.InternalKey, value []byte) error {
      97           2 :         return list.addInternal(key, value, ins)
      98           2 : }
      99             : 
     100             : var (
     101             :         probabilities [maxHeight]uint32
     102             : )
     103             : 
     104           2 : func init() {
     105           2 :         // Precompute the skiplist probabilities so that only a single random number
     106           2 :         // needs to be generated and so that the optimal pvalue can be used (inverse
     107           2 :         // of Euler's number).
     108           2 :         p := float64(1.0)
     109           2 :         for i := 0; i < maxHeight; i++ {
     110           2 :                 probabilities[i] = uint32(float64(math.MaxUint32) * p)
     111           2 :                 p *= pValue
     112           2 :         }
     113             : }
     114             : 
     115             : // NewSkiplist constructs and initializes a new, empty skiplist. All nodes, keys,
     116             : // and values in the skiplist will be allocated from the given arena.
     117           1 : func NewSkiplist(arena *Arena, cmp base.Compare) *Skiplist {
     118           1 :         skl := &Skiplist{}
     119           1 :         skl.Reset(arena, cmp)
     120           1 :         return skl
     121           1 : }
     122             : 
     123             : // Reset the skiplist to empty and re-initialize.
     124           2 : func (s *Skiplist) Reset(arena *Arena, cmp base.Compare) {
     125           2 :         // Allocate head and tail nodes.
     126           2 :         head, err := newRawNode(arena, maxHeight, 0, 0)
     127           2 :         if err != nil {
     128           0 :                 panic("arenaSize is not large enough to hold the head node")
     129             :         }
     130           2 :         head.keyOffset = 0
     131           2 : 
     132           2 :         tail, err := newRawNode(arena, maxHeight, 0, 0)
     133           2 :         if err != nil {
     134           0 :                 panic("arenaSize is not large enough to hold the tail node")
     135             :         }
     136           2 :         tail.keyOffset = 0
     137           2 : 
     138           2 :         // Link all head/tail levels together.
     139           2 :         headOffset := arena.getPointerOffset(unsafe.Pointer(head))
     140           2 :         tailOffset := arena.getPointerOffset(unsafe.Pointer(tail))
     141           2 :         for i := 0; i < maxHeight; i++ {
     142           2 :                 head.tower[i].nextOffset.Store(tailOffset)
     143           2 :                 tail.tower[i].prevOffset.Store(headOffset)
     144           2 :         }
     145             : 
     146           2 :         *s = Skiplist{
     147           2 :                 arena: arena,
     148           2 :                 cmp:   cmp,
     149           2 :                 head:  head,
     150           2 :                 tail:  tail,
     151           2 :         }
     152           2 :         s.height.Store(1)
     153             : }
     154             : 
     155             : // Height returns the height of the highest tower within any of the nodes that
     156             : // have ever been allocated as part of this skiplist.
     157           2 : func (s *Skiplist) Height() uint32 { return s.height.Load() }
     158             : 
     159             : // Arena returns the arena backing this skiplist.
     160           2 : func (s *Skiplist) Arena() *Arena { return s.arena }
     161             : 
     162             : // Size returns the number of bytes that have allocated from the arena.
     163           2 : func (s *Skiplist) Size() uint32 { return s.arena.Size() }
     164             : 
     165             : // Add adds a new key if it does not yet exist. If the key already exists, then
     166             : // Add returns ErrRecordExists. If there isn't enough room in the arena, then
     167             : // Add returns ErrArenaFull.
     168           2 : func (s *Skiplist) Add(key base.InternalKey, value []byte) error {
     169           2 :         var ins Inserter
     170           2 :         return s.addInternal(key, value, &ins)
     171           2 : }
     172             : 
     173           2 : func (s *Skiplist) addInternal(key base.InternalKey, value []byte, ins *Inserter) error {
     174           2 :         if s.findSplice(key, ins) {
     175           1 :                 // Found a matching node, but handle case where it's been deleted.
     176           1 :                 return ErrRecordExists
     177           1 :         }
     178             : 
     179           2 :         if s.testing {
     180           1 :                 // Add delay to make it easier to test race between this thread
     181           1 :                 // and another thread that sees the intermediate state between
     182           1 :                 // finding the splice and using it.
     183           1 :                 runtime.Gosched()
     184           1 :         }
     185             : 
     186           2 :         nd, height, err := s.newNode(key, value)
     187           2 :         if err != nil {
     188           1 :                 return err
     189           1 :         }
     190             : 
     191           2 :         ndOffset := s.arena.getPointerOffset(unsafe.Pointer(nd))
     192           2 : 
     193           2 :         // We always insert from the base level and up. After you add a node in base
     194           2 :         // level, we cannot create a node in the level above because it would have
     195           2 :         // discovered the node in the base level.
     196           2 :         var found bool
     197           2 :         var invalidateSplice bool
     198           2 :         for i := 0; i < int(height); i++ {
     199           2 :                 prev := ins.spl[i].prev
     200           2 :                 next := ins.spl[i].next
     201           2 : 
     202           2 :                 if prev == nil {
     203           2 :                         // New node increased the height of the skiplist, so assume that the
     204           2 :                         // new level has not yet been populated.
     205           2 :                         if next != nil {
     206           0 :                                 panic("next is expected to be nil, since prev is nil")
     207             :                         }
     208             : 
     209           2 :                         prev = s.head
     210           2 :                         next = s.tail
     211             :                 }
     212             : 
     213             :                 // +----------------+     +------------+     +----------------+
     214             :                 // |      prev      |     |     nd     |     |      next      |
     215             :                 // | prevNextOffset |---->|            |     |                |
     216             :                 // |                |<----| prevOffset |     |                |
     217             :                 // |                |     | nextOffset |---->|                |
     218             :                 // |                |     |            |<----| nextPrevOffset |
     219             :                 // +----------------+     +------------+     +----------------+
     220             :                 //
     221             :                 // 1. Initialize prevOffset and nextOffset to point to prev and next.
     222             :                 // 2. CAS prevNextOffset to repoint from next to nd.
     223             :                 // 3. CAS nextPrevOffset to repoint from prev to nd.
     224           2 :                 for {
     225           2 :                         prevOffset := s.arena.getPointerOffset(unsafe.Pointer(prev))
     226           2 :                         nextOffset := s.arena.getPointerOffset(unsafe.Pointer(next))
     227           2 :                         nd.tower[i].init(prevOffset, nextOffset)
     228           2 : 
     229           2 :                         // Check whether next has an updated link to prev. If it does not,
     230           2 :                         // that can mean one of two things:
     231           2 :                         //   1. The thread that added the next node hasn't yet had a chance
     232           2 :                         //      to add the prev link (but will shortly).
     233           2 :                         //   2. Another thread has added a new node between prev and next.
     234           2 :                         nextPrevOffset := next.prevOffset(i)
     235           2 :                         if nextPrevOffset != prevOffset {
     236           1 :                                 // Determine whether #1 or #2 is true by checking whether prev
     237           1 :                                 // is still pointing to next. As long as the atomic operations
     238           1 :                                 // have at least acquire/release semantics (no need for
     239           1 :                                 // sequential consistency), this works, as it is equivalent to
     240           1 :                                 // the "publication safety" pattern.
     241           1 :                                 prevNextOffset := prev.nextOffset(i)
     242           1 :                                 if prevNextOffset == nextOffset {
     243           1 :                                         // Ok, case #1 is true, so help the other thread along by
     244           1 :                                         // updating the next node's prev link.
     245           1 :                                         next.casPrevOffset(i, nextPrevOffset, prevOffset)
     246           1 :                                 }
     247             :                         }
     248             : 
     249           2 :                         if prev.casNextOffset(i, nextOffset, ndOffset) {
     250           2 :                                 // Managed to insert nd between prev and next, so update the next
     251           2 :                                 // node's prev link and go to the next level.
     252           2 :                                 if s.testing {
     253           1 :                                         // Add delay to make it easier to test race between this thread
     254           1 :                                         // and another thread that sees the intermediate state between
     255           1 :                                         // setting next and setting prev.
     256           1 :                                         runtime.Gosched()
     257           1 :                                 }
     258             : 
     259           2 :                                 next.casPrevOffset(i, prevOffset, ndOffset)
     260           2 :                                 break
     261             :                         }
     262             : 
     263             :                         // CAS failed. We need to recompute prev and next. It is unlikely to
     264             :                         // be helpful to try to use a different level as we redo the search,
     265             :                         // because it is unlikely that lots of nodes are inserted between prev
     266             :                         // and next.
     267           1 :                         prev, next, found = s.findSpliceForLevel(key, i, prev)
     268           1 :                         if found {
     269           1 :                                 if i != 0 {
     270           0 :                                         panic("how can another thread have inserted a node at a non-base level?")
     271             :                                 }
     272             : 
     273           1 :                                 return ErrRecordExists
     274             :                         }
     275           1 :                         invalidateSplice = true
     276             :                 }
     277             :         }
     278             : 
     279             :         // If we had to recompute the splice for a level, invalidate the entire
     280             :         // cached splice.
     281           2 :         if invalidateSplice {
     282           1 :                 ins.height = 0
     283           2 :         } else {
     284           2 :                 // The splice was valid. We inserted a node between spl[i].prev and
     285           2 :                 // spl[i].next. Optimistically update spl[i].prev for use in a subsequent
     286           2 :                 // call to add.
     287           2 :                 for i := uint32(0); i < height; i++ {
     288           2 :                         ins.spl[i].prev = nd
     289           2 :                 }
     290             :         }
     291             : 
     292           2 :         return nil
     293             : }
     294             : 
     295             : // NewIter returns a new Iterator object. The lower and upper bound parameters
     296             : // control the range of keys the iterator will return. Specifying for nil for
     297             : // lower or upper bound disables the check for that boundary. Note that lower
     298             : // bound is not checked on {SeekGE,First} and upper bound is not check on
     299             : // {SeekLT,Last}. The user is expected to perform that check. Note that it is
     300             : // safe for an iterator to be copied by value.
     301           2 : func (s *Skiplist) NewIter(lower, upper []byte) *Iterator {
     302           2 :         it := iterPool.Get().(*Iterator)
     303           2 :         *it = Iterator{list: s, nd: s.head, lower: lower, upper: upper}
     304           2 :         return it
     305           2 : }
     306             : 
     307             : // NewFlushIter returns a new flushIterator, which is similar to an Iterator
     308             : // but also sets the current number of the bytes that have been iterated
     309             : // through.
     310           2 : func (s *Skiplist) NewFlushIter(bytesFlushed *uint64) base.InternalIterator {
     311           2 :         return &flushIterator{
     312           2 :                 Iterator:      Iterator{list: s, nd: s.head},
     313           2 :                 bytesIterated: bytesFlushed,
     314           2 :         }
     315           2 : }
     316             : 
     317             : func (s *Skiplist) newNode(
     318             :         key base.InternalKey, value []byte,
     319           2 : ) (nd *node, height uint32, err error) {
     320           2 :         height = s.randomHeight()
     321           2 :         nd, err = newNode(s.arena, height, key, value)
     322           2 :         if err != nil {
     323           1 :                 return
     324           1 :         }
     325             : 
     326             :         // Try to increase s.height via CAS.
     327           2 :         listHeight := s.Height()
     328           2 :         for height > listHeight {
     329           2 :                 if s.height.CompareAndSwap(listHeight, height) {
     330           2 :                         // Successfully increased skiplist.height.
     331           2 :                         break
     332             :                 }
     333             : 
     334           0 :                 listHeight = s.Height()
     335             :         }
     336             : 
     337           2 :         return
     338             : }
     339             : 
     340           2 : func (s *Skiplist) randomHeight() uint32 {
     341           2 :         rnd := fastrand.Uint32()
     342           2 : 
     343           2 :         h := uint32(1)
     344           2 :         for h < maxHeight && rnd <= probabilities[h] {
     345           2 :                 h++
     346           2 :         }
     347             : 
     348           2 :         return h
     349             : }
     350             : 
     351           2 : func (s *Skiplist) findSplice(key base.InternalKey, ins *Inserter) (found bool) {
     352           2 :         listHeight := s.Height()
     353           2 :         var level int
     354           2 : 
     355           2 :         prev := s.head
     356           2 :         if ins.height < listHeight {
     357           2 :                 // Our cached height is less than the list height, which means there were
     358           2 :                 // inserts that increased the height of the list. Recompute the splice from
     359           2 :                 // scratch.
     360           2 :                 ins.height = listHeight
     361           2 :                 level = int(ins.height)
     362           2 :         } else {
     363           2 :                 // Our cached height is equal to the list height.
     364           2 :                 for ; level < int(listHeight); level++ {
     365           2 :                         spl := &ins.spl[level]
     366           2 :                         if s.getNext(spl.prev, level) != spl.next {
     367           1 :                                 // One or more nodes have been inserted between the splice at this
     368           1 :                                 // level.
     369           1 :                                 continue
     370             :                         }
     371           2 :                         if spl.prev != s.head && !s.keyIsAfterNode(spl.prev, key) {
     372           2 :                                 // Key lies before splice.
     373           2 :                                 level = int(listHeight)
     374           2 :                                 break
     375             :                         }
     376           2 :                         if spl.next != s.tail && s.keyIsAfterNode(spl.next, key) {
     377           2 :                                 // Key lies after splice.
     378           2 :                                 level = int(listHeight)
     379           2 :                                 break
     380             :                         }
     381             :                         // The splice brackets the key!
     382           2 :                         prev = spl.prev
     383           2 :                         break
     384             :                 }
     385             :         }
     386             : 
     387           2 :         for level = level - 1; level >= 0; level-- {
     388           2 :                 var next *node
     389           2 :                 prev, next, found = s.findSpliceForLevel(key, level, prev)
     390           2 :                 if next == nil {
     391           0 :                         next = s.tail
     392           0 :                 }
     393           2 :                 ins.spl[level].init(prev, next)
     394             :         }
     395             : 
     396           2 :         return
     397             : }
     398             : 
     399             : func (s *Skiplist) findSpliceForLevel(
     400             :         key base.InternalKey, level int, start *node,
     401           2 : ) (prev, next *node, found bool) {
     402           2 :         prev = start
     403           2 : 
     404           2 :         for {
     405           2 :                 // Assume prev.key < key.
     406           2 :                 next = s.getNext(prev, level)
     407           2 :                 if next == s.tail {
     408           2 :                         // Tail node, so done.
     409           2 :                         break
     410             :                 }
     411             : 
     412           2 :                 offset, size := next.keyOffset, next.keySize
     413           2 :                 nextKey := s.arena.buf[offset : offset+size]
     414           2 :                 cmp := s.cmp(key.UserKey, nextKey)
     415           2 :                 if cmp < 0 {
     416           2 :                         // We are done for this level, since prev.key < key < next.key.
     417           2 :                         break
     418             :                 }
     419           2 :                 if cmp == 0 {
     420           2 :                         // User-key equality.
     421           2 :                         if key.Trailer == next.keyTrailer {
     422           1 :                                 // Internal key equality.
     423           1 :                                 found = true
     424           1 :                                 break
     425             :                         }
     426           2 :                         if key.Trailer > next.keyTrailer {
     427           2 :                                 // We are done for this level, since prev.key < key < next.key.
     428           2 :                                 break
     429             :                         }
     430             :                 }
     431             : 
     432             :                 // Keep moving right on this level.
     433           2 :                 prev = next
     434             :         }
     435             : 
     436           2 :         return
     437             : }
     438             : 
     439           2 : func (s *Skiplist) keyIsAfterNode(nd *node, key base.InternalKey) bool {
     440           2 :         ndKey := s.arena.buf[nd.keyOffset : nd.keyOffset+nd.keySize]
     441           2 :         cmp := s.cmp(ndKey, key.UserKey)
     442           2 :         if cmp < 0 {
     443           2 :                 return true
     444           2 :         }
     445           2 :         if cmp > 0 {
     446           2 :                 return false
     447           2 :         }
     448             :         // User-key equality.
     449           1 :         if key.Trailer == nd.keyTrailer {
     450           0 :                 // Internal key equality.
     451           0 :                 return false
     452           0 :         }
     453           1 :         return key.Trailer < nd.keyTrailer
     454             : }
     455             : 
     456           2 : func (s *Skiplist) getNext(nd *node, h int) *node {
     457           2 :         offset := nd.tower[h].nextOffset.Load()
     458           2 :         return (*node)(s.arena.getPointer(offset))
     459           2 : }
     460             : 
     461           2 : func (s *Skiplist) getPrev(nd *node, h int) *node {
     462           2 :         offset := nd.tower[h].prevOffset.Load()
     463           2 :         return (*node)(s.arena.getPointer(offset))
     464           2 : }

Generated by: LCOV version 1.14