Line data Source code
1 : /*
2 : * Copyright 2017 Dgraph Labs, Inc. and Contributors
3 : * Modifications copyright (C) 2017 Andy Kimball and Contributors
4 : *
5 : * Licensed under the Apache License, Version 2.0 (the "License")
6 : * you may not use this file except in compliance with the License.
7 : * You may obtain a copy of the License at
8 : *
9 : * http://www.apache.org/licenses/LICENSE-2.0
10 : *
11 : * Unless required by applicable law or agreed to in writing, software
12 : * distributed under the License is distributed on an "AS IS" BASIS,
13 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 : * See the License for the specific language governing permissions and
15 : * limitations under the License.
16 : */
17 :
18 : /*
19 : Adapted from RocksDB inline skiplist.
20 :
21 : Key differences:
22 : - No optimization for sequential inserts (no "prev").
23 : - No custom comparator.
24 : - Support overwrites. This requires care when we see the same key when inserting.
25 : For RocksDB or LevelDB, overwrites are implemented as a newer sequence number in the key, so
26 : there is no need for values. We don't intend to support versioning. In-place updates of values
27 : would be more efficient.
28 : - We discard all non-concurrent code.
29 : - We do not support Splices. This simplifies the code a lot.
30 : - No AllocateNode or other pointer arithmetic.
31 : - We combine the findLessThan, findGreaterOrEqual, etc into one function.
32 : */
33 :
34 : /*
35 : Further adapted from Badger: https://github.com/dgraph-io/badger.
36 :
37 : Key differences:
38 : - Support for previous pointers - doubly linked lists. Note that it's up to higher
39 : level code to deal with the intermediate state that occurs during insertion,
40 : where node A is linked to node B, but node B is not yet linked back to node A.
41 : - Iterator includes mutator functions.
42 : */
43 :
44 : /*
45 : Further adapted from arenaskl: https://github.com/andy-kimball/arenaskl
46 :
47 : Key differences:
48 : - Removed support for deletion.
49 : - Removed support for concurrency.
50 : - External storage of keys.
51 : - Node storage grows to an arbitrary size.
52 : */
53 :
54 : package batchskl // import "github.com/cockroachdb/pebble/internal/batchskl"
55 :
56 : import (
57 : "bytes"
58 : "encoding/binary"
59 : "fmt"
60 : "math"
61 : "time"
62 : "unsafe"
63 :
64 : "github.com/cockroachdb/errors"
65 : "github.com/cockroachdb/pebble/internal/base"
66 : "github.com/cockroachdb/pebble/internal/constants"
67 : "golang.org/x/exp/rand"
68 : )
69 :
70 : const (
71 : maxHeight = 20
72 : maxNodeSize = uint64(unsafe.Sizeof(node{}))
73 : linksSize = uint64(unsafe.Sizeof(links{}))
74 : maxNodesSize = constants.MaxUint32OrInt
75 : )
76 :
77 : var (
78 : // ErrExists indicates that a duplicate record was inserted. This should never
79 : // happen for normal usage of batchskl as every key should have a unique
80 : // sequence number.
81 : ErrExists = errors.New("record with this key already exists")
82 :
83 : // ErrTooManyRecords is a sentinel error returned when the size of the raw
84 : // nodes slice exceeds the maximum allowed size (currently 1 << 32 - 1). This
85 : // corresponds to ~117 M skiplist entries.
86 : ErrTooManyRecords = errors.New("too many records")
87 : )
88 :
89 : type links struct {
90 : next uint32
91 : prev uint32
92 : }
93 :
94 : type node struct {
95 : // The offset of the start of the record in the storage.
96 : offset uint32
97 : // The offset of the start and end of the key in storage.
98 : keyStart uint32
99 : keyEnd uint32
100 : // A fixed 8-byte abbreviation of the key, used to avoid retrieval of the key
101 : // during seek operations. The key retrieval can be expensive purely due to
102 : // cache misses while the abbreviatedKey stored here will be in the same
103 : // cache line as the key and the links making accessing and comparing against
104 : // it almost free.
105 : abbreviatedKey uint64
106 : // Most nodes do not need to use the full height of the link tower, since the
107 : // probability of each successive level decreases exponentially. Because
108 : // these elements are never accessed, they do not need to be allocated.
109 : // Therefore, when a node is allocated, its memory footprint is deliberately
110 : // truncated to not include unneeded link elements.
111 : links [maxHeight]links
112 : }
113 :
114 : // Skiplist is a fast, non-cocnurrent skiplist implementation that supports
115 : // forward and backward iteration. See arenaskl.Skiplist for a concurrent
116 : // skiplist. Keys and values are stored externally from the skiplist via the
117 : // Storage interface. Deletion is not supported. Instead, higher-level code is
118 : // expected to perform deletion via tombstones and needs to process those
119 : // tombstones appropriately during retrieval operations.
120 : type Skiplist struct {
121 : storage *[]byte
122 : cmp base.Compare
123 : abbreviatedKey base.AbbreviatedKey
124 : nodes []byte
125 : head uint32
126 : tail uint32
127 : height uint32 // Current height: 1 <= height <= maxHeight
128 : rand rand.PCGSource
129 : }
130 :
131 : var (
132 : probabilities [maxHeight]uint32
133 : )
134 :
135 1 : func init() {
136 1 : const pValue = 1 / math.E
137 1 :
138 1 : // Precompute the skiplist probabilities so that only a single random number
139 1 : // needs to be generated and so that the optimal pvalue can be used (inverse
140 1 : // of Euler's number).
141 1 : p := float64(1.0)
142 1 : for i := 0; i < maxHeight; i++ {
143 1 : probabilities[i] = uint32(float64(math.MaxUint32) * p)
144 1 : p *= pValue
145 1 : }
146 : }
147 :
148 : // NewSkiplist constructs and initializes a new, empty skiplist.
149 1 : func NewSkiplist(storage *[]byte, cmp base.Compare, abbreviatedKey base.AbbreviatedKey) *Skiplist {
150 1 : s := &Skiplist{}
151 1 : s.Init(storage, cmp, abbreviatedKey)
152 1 : return s
153 1 : }
154 :
155 : // Reset the fields in the skiplist for reuse.
156 0 : func (s *Skiplist) Reset() {
157 0 : *s = Skiplist{
158 0 : nodes: s.nodes[:0],
159 0 : height: 1,
160 0 : }
161 0 : const batchMaxRetainedSize = 1 << 20 // 1 MB
162 0 : if cap(s.nodes) > batchMaxRetainedSize {
163 0 : s.nodes = nil
164 0 : }
165 : }
166 :
167 : // Init the skiplist to empty and re-initialize.
168 1 : func (s *Skiplist) Init(storage *[]byte, cmp base.Compare, abbreviatedKey base.AbbreviatedKey) {
169 1 : *s = Skiplist{
170 1 : storage: storage,
171 1 : cmp: cmp,
172 1 : abbreviatedKey: abbreviatedKey,
173 1 : nodes: s.nodes[:0],
174 1 : height: 1,
175 1 : }
176 1 : s.rand.Seed(uint64(time.Now().UnixNano()))
177 1 :
178 1 : const initBufSize = 256
179 1 : if cap(s.nodes) < initBufSize {
180 1 : s.nodes = make([]byte, 0, initBufSize)
181 1 : }
182 :
183 : // Allocate head and tail nodes. While allocating a new node can fail, in the
184 : // context of initializing the skiplist we consider it unrecoverable.
185 1 : var err error
186 1 : s.head, err = s.newNode(maxHeight, 0, 0, 0, 0)
187 1 : if err != nil {
188 0 : panic(err)
189 : }
190 1 : s.tail, err = s.newNode(maxHeight, 0, 0, 0, 0)
191 1 : if err != nil {
192 0 : panic(err)
193 : }
194 :
195 : // Link all head/tail levels together.
196 1 : headNode := s.node(s.head)
197 1 : tailNode := s.node(s.tail)
198 1 : for i := uint32(0); i < maxHeight; i++ {
199 1 : headNode.links[i].next = s.tail
200 1 : tailNode.links[i].prev = s.head
201 1 : }
202 : }
203 :
204 : // Add adds a new key to the skiplist if it does not yet exist. If the record
205 : // already exists, then Add returns ErrRecordExists.
206 1 : func (s *Skiplist) Add(keyOffset uint32) error {
207 1 : data := (*s.storage)[keyOffset+1:]
208 1 : v, n := binary.Uvarint(data)
209 1 : if n <= 0 {
210 0 : return errors.Errorf("corrupted batch entry: %d", errors.Safe(keyOffset))
211 0 : }
212 1 : data = data[n:]
213 1 : if v > uint64(len(data)) {
214 0 : return errors.Errorf("corrupted batch entry: %d", errors.Safe(keyOffset))
215 0 : }
216 1 : keyStart := 1 + keyOffset + uint32(n)
217 1 : keyEnd := keyStart + uint32(v)
218 1 : key := data[:v]
219 1 : abbreviatedKey := s.abbreviatedKey(key)
220 1 :
221 1 : // spl holds the list of next and previous links for each level in the
222 1 : // skiplist indicating where the new node will be inserted.
223 1 : var spl [maxHeight]splice
224 1 :
225 1 : // Fast-path for in-order insertion of keys: compare the new key against the
226 1 : // last key.
227 1 : prev := s.getPrev(s.tail, 0)
228 1 : if prevNode := s.node(prev); prev == s.head ||
229 1 : abbreviatedKey > prevNode.abbreviatedKey ||
230 1 : (abbreviatedKey == prevNode.abbreviatedKey &&
231 1 : s.cmp(key, (*s.storage)[prevNode.keyStart:prevNode.keyEnd]) > 0) {
232 1 : for level := uint32(0); level < s.height; level++ {
233 1 : spl[level].prev = s.getPrev(s.tail, level)
234 1 : spl[level].next = s.tail
235 1 : }
236 1 : } else {
237 1 : s.findSplice(key, abbreviatedKey, &spl)
238 1 : }
239 :
240 1 : height := s.randomHeight()
241 1 : // Increase s.height as necessary.
242 1 : for ; s.height < height; s.height++ {
243 1 : spl[s.height].next = s.tail
244 1 : spl[s.height].prev = s.head
245 1 : }
246 :
247 : // We always insert from the base level and up. After you add a node in base
248 : // level, we cannot create a node in the level above because it would have
249 : // discovered the node in the base level.
250 1 : nd, err := s.newNode(height, keyOffset, keyStart, keyEnd, abbreviatedKey)
251 1 : if err != nil {
252 0 : return err
253 0 : }
254 1 : newNode := s.node(nd)
255 1 : for level := uint32(0); level < height; level++ {
256 1 : next := spl[level].next
257 1 : prev := spl[level].prev
258 1 : newNode.links[level].next = next
259 1 : newNode.links[level].prev = prev
260 1 : s.node(next).links[level].prev = nd
261 1 : s.node(prev).links[level].next = nd
262 1 : }
263 :
264 1 : return nil
265 : }
266 :
267 : // NewIter returns a new Iterator object. The lower and upper bound parameters
268 : // control the range of keys the iterator will return. Specifying for nil for
269 : // lower or upper bound disables the check for that boundary. Note that lower
270 : // bound is not checked on {SeekGE,First} and upper bound is not check on
271 : // {SeekLT,Last}. The user is expected to perform that check. Note that it is
272 : // safe for an iterator to be copied by value.
273 1 : func (s *Skiplist) NewIter(lower, upper []byte) Iterator {
274 1 : return Iterator{list: s, lower: lower, upper: upper}
275 1 : }
276 :
277 : func (s *Skiplist) newNode(
278 : height,
279 : offset, keyStart, keyEnd uint32, abbreviatedKey uint64,
280 1 : ) (uint32, error) {
281 1 : if height < 1 || height > maxHeight {
282 0 : panic("height cannot be less than one or greater than the max height")
283 : }
284 :
285 1 : unusedSize := uint64(maxHeight-int(height)) * linksSize
286 1 : nodeOffset, err := s.alloc(uint32(maxNodeSize - unusedSize))
287 1 : if err != nil {
288 0 : return 0, err
289 0 : }
290 1 : nd := s.node(nodeOffset)
291 1 :
292 1 : nd.offset = offset
293 1 : nd.keyStart = keyStart
294 1 : nd.keyEnd = keyEnd
295 1 : nd.abbreviatedKey = abbreviatedKey
296 1 : return nodeOffset, nil
297 : }
298 :
299 1 : func (s *Skiplist) alloc(size uint32) (uint32, error) {
300 1 : offset := uint64(len(s.nodes))
301 1 :
302 1 : // We only have a need for memory up to offset + size, but we never want
303 1 : // to allocate a node whose tail points into unallocated memory.
304 1 : minAllocSize := offset + maxNodeSize
305 1 : if uint64(cap(s.nodes)) < minAllocSize {
306 1 : allocSize := uint64(cap(s.nodes)) * 2
307 1 : if allocSize < minAllocSize {
308 0 : allocSize = minAllocSize
309 0 : }
310 : // Cap the allocation at the max allowed size to avoid wasted capacity.
311 1 : if allocSize > maxNodesSize {
312 0 : // The new record may still not fit within the allocation, in which case
313 0 : // we return early with an error. This avoids the panic below when we
314 0 : // resize the slice. It also avoids the allocation and copy.
315 0 : if uint64(offset)+uint64(size) > maxNodesSize {
316 0 : return 0, errors.Wrapf(ErrTooManyRecords,
317 0 : "alloc of new record (size=%d) would overflow uint32 (current size=%d)",
318 0 : uint64(offset)+uint64(size), offset,
319 0 : )
320 0 : }
321 0 : allocSize = maxNodesSize
322 : }
323 1 : tmp := make([]byte, len(s.nodes), allocSize)
324 1 : copy(tmp, s.nodes)
325 1 : s.nodes = tmp
326 : }
327 :
328 1 : newSize := uint32(offset) + size
329 1 : s.nodes = s.nodes[:newSize]
330 1 : return uint32(offset), nil
331 : }
332 :
333 1 : func (s *Skiplist) node(offset uint32) *node {
334 1 : return (*node)(unsafe.Pointer(&s.nodes[offset]))
335 1 : }
336 :
337 1 : func (s *Skiplist) randomHeight() uint32 {
338 1 : rnd := uint32(s.rand.Uint64())
339 1 : h := uint32(1)
340 1 : for h < maxHeight && rnd <= probabilities[h] {
341 1 : h++
342 1 : }
343 1 : return h
344 : }
345 :
346 1 : func (s *Skiplist) findSplice(key []byte, abbreviatedKey uint64, spl *[maxHeight]splice) {
347 1 : prev := s.head
348 1 :
349 1 : for level := s.height - 1; ; level-- {
350 1 : // The code in this loop is the same as findSpliceForLevel(). For some
351 1 : // reason, calling findSpliceForLevel() here is much much slower than the
352 1 : // inlined code below. The excess time is also caught up in the final
353 1 : // return statement which makes little sense. Revisit when in go1.14 or
354 1 : // later if inlining improves.
355 1 :
356 1 : next := s.getNext(prev, level)
357 1 : for next != s.tail {
358 1 : // Assume prev.key < key.
359 1 : nextNode := s.node(next)
360 1 : nextAbbreviatedKey := nextNode.abbreviatedKey
361 1 : if abbreviatedKey < nextAbbreviatedKey {
362 1 : // We are done for this level, since prev.key < key < next.key.
363 1 : break
364 : }
365 1 : if abbreviatedKey == nextAbbreviatedKey {
366 1 : if s.cmp(key, (*s.storage)[nextNode.keyStart:nextNode.keyEnd]) <= 0 {
367 1 : // We are done for this level, since prev.key < key <= next.key.
368 1 : break
369 : }
370 : }
371 :
372 : // Keep moving right on this level.
373 1 : prev = next
374 1 : next = nextNode.links[level].next
375 : }
376 :
377 1 : spl[level].prev = prev
378 1 : spl[level].next = next
379 1 : if level == 0 {
380 1 : break
381 : }
382 : }
383 : }
384 :
385 : func (s *Skiplist) findSpliceForLevel(
386 : key []byte, abbreviatedKey uint64, level, start uint32,
387 1 : ) (prev, next uint32) {
388 1 : prev = start
389 1 : next = s.getNext(prev, level)
390 1 :
391 1 : for next != s.tail {
392 1 : // Assume prev.key < key.
393 1 : nextNode := s.node(next)
394 1 : nextAbbreviatedKey := nextNode.abbreviatedKey
395 1 : if abbreviatedKey < nextAbbreviatedKey {
396 1 : // We are done for this level, since prev.key < key < next.key.
397 1 : break
398 : }
399 1 : if abbreviatedKey == nextAbbreviatedKey {
400 1 : if s.cmp(key, (*s.storage)[nextNode.keyStart:nextNode.keyEnd]) <= 0 {
401 1 : // We are done for this level, since prev.key < key < next.key.
402 1 : break
403 : }
404 : }
405 :
406 : // Keep moving right on this level.
407 1 : prev = next
408 1 : next = nextNode.links[level].next
409 : }
410 :
411 1 : return
412 : }
413 :
414 1 : func (s *Skiplist) getKey(nd uint32) base.InternalKey {
415 1 : n := s.node(nd)
416 1 : kind := base.InternalKeyKind((*s.storage)[n.offset])
417 1 : key := (*s.storage)[n.keyStart:n.keyEnd]
418 1 : return base.MakeInternalKey(key, uint64(n.offset)|base.InternalKeySeqNumBatch, kind)
419 1 : }
420 :
421 1 : func (s *Skiplist) getNext(nd, h uint32) uint32 {
422 1 : return s.node(nd).links[h].next
423 1 : }
424 :
425 1 : func (s *Skiplist) getPrev(nd, h uint32) uint32 {
426 1 : return s.node(nd).links[h].prev
427 1 : }
428 :
429 0 : func (s *Skiplist) debug() string {
430 0 : var buf bytes.Buffer
431 0 : for level := uint32(0); level < s.height; level++ {
432 0 : var count int
433 0 : for nd := s.head; nd != s.tail; nd = s.getNext(nd, level) {
434 0 : count++
435 0 : }
436 0 : fmt.Fprintf(&buf, "%d: %d\n", level, count)
437 : }
438 0 : return buf.String()
439 : }
440 :
441 : // Silence unused warning.
442 : var _ = (*Skiplist).debug
|