Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "cmp"
9 : "fmt"
10 : "slices"
11 : "strings"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/testkeys"
17 : "github.com/stretchr/testify/require"
18 : )
19 :
20 : // keyMeta is metadata associated with an (objID, key) pair, where objID is
21 : // a writer containing the key.
22 : type keyMeta struct {
23 : objID objID
24 : key []byte
25 : // history provides the history of writer operations applied against this
26 : // key on this object. history is always ordered by non-decreasing
27 : // metaTimestamp.
28 : history keyHistory
29 : }
30 :
31 0 : func (m *keyMeta) clear() {
32 0 : m.history = m.history[:0]
33 0 : }
34 :
35 : // mergeInto merges this metadata into the metadata for other, appending all of
36 : // its individual operations to dst at the provided timestamp.
37 0 : func (m *keyMeta) mergeInto(dst *keyMeta, ts int) {
38 0 : for _, op := range m.history {
39 0 : // If the key is being merged into a database object and the operation
40 0 : // is a delete, we can clear the destination history. Database objects
41 0 : // are end points in the merging of keys and won't be the source of a
42 0 : // future merge. Deletions cause all other operations to behave as
43 0 : // though the key was never written to the database at all, so we don't
44 0 : // need to consider it for maintaining single delete invariants.
45 0 : //
46 0 : // NB: There's a subtlety here in that isDelete() will return true if
47 0 : // opType is a writerSingleDelete, but single deletes are capable of
48 0 : // leaking information about the history of writes. However, that's
49 0 : // okay, because as long as we're properly generating single deletes
50 0 : // according to the W1 invariant described in keyManager's comment, a
51 0 : // single delete is equivalent to delete for the current history.
52 0 : if dst.objID.tag() == dbTag && op.opType.isDelete() {
53 0 : dst.clear()
54 0 : continue
55 : }
56 0 : dst.history = append(dst.history, keyHistoryItem{
57 0 : opType: op.opType,
58 0 : metaTimestamp: ts,
59 0 : })
60 : }
61 : }
62 :
63 : type bounds struct {
64 : smallest []byte
65 : largest []byte
66 : largestExcl bool // is largest exclusive?
67 : }
68 :
69 0 : func (b bounds) checkValid(cmp base.Compare) {
70 0 : if c := cmp(b.smallest, b.largest); c > 0 {
71 0 : panic(fmt.Sprintf("invalid bound [%q, %q]", b.smallest, b.largest))
72 0 : } else if c == 0 && b.largestExcl {
73 0 : panic(fmt.Sprintf("invalid bound [%q, %q)", b.smallest, b.largest))
74 : }
75 : }
76 :
77 0 : func (b bounds) String() string {
78 0 : if b.largestExcl {
79 0 : return fmt.Sprintf("[%q,%q)", b.smallest, b.largest)
80 0 : }
81 0 : return fmt.Sprintf("[%q,%q]", b.smallest, b.largest)
82 : }
83 :
84 : // Overlaps returns true iff the bounds intersect.
85 0 : func (b *bounds) Overlaps(cmp base.Compare, other bounds) bool {
86 0 : if b.IsUnset() || other.IsUnset() {
87 0 : return false
88 0 : }
89 : // Is b strictly before other?
90 0 : if v := cmp(b.largest, other.smallest); v < 0 || (v == 0 && b.largestExcl) {
91 0 : return false
92 0 : }
93 : // Is b strictly after other?
94 0 : if v := cmp(b.smallest, other.largest); v > 0 || (v == 0 && other.largestExcl) {
95 0 : return false
96 0 : }
97 0 : return true
98 : }
99 :
100 : // IsUnset returns true if the bounds haven't been set.
101 0 : func (b bounds) IsUnset() bool {
102 0 : return b.smallest == nil && b.largest == nil
103 0 : }
104 :
105 : // Expand potentially expands the receiver bounds to include the other given
106 : // bounds. If the receiver is unset, the other bounds are copied.
107 0 : func (b *bounds) Expand(cmp base.Compare, other bounds) {
108 0 : if other.IsUnset() {
109 0 : return
110 0 : }
111 0 : other.checkValid(cmp)
112 0 : if b.IsUnset() {
113 0 : *b = other
114 0 : return
115 0 : }
116 0 : if cmp(b.smallest, other.smallest) > 0 {
117 0 : b.smallest = other.smallest
118 0 : }
119 0 : if v := cmp(b.largest, other.largest); v < 0 || (v == 0 && b.largestExcl) {
120 0 : b.largest = other.largest
121 0 : b.largestExcl = other.largestExcl
122 0 : }
123 : }
124 :
125 : // keyManager tracks the write operations performed on keys in the generation
126 : // phase of the metamorphic test. It maintains histories of operations performed
127 : // against every unique user key on every writer object. These histories inform
128 : // operation generation in order to maintain invariants that Pebble requires of
129 : // end users, mostly around single deletions.
130 : //
131 : // A single deletion has a subtle requirement of the writer:
132 : //
133 : // W1: The writer may only single delete a key `k` if `k` has been Set once
134 : // (and never MergeD) since the last delete.
135 : //
136 : // When a SINGLEDEL key deletes a SET key within a compaction, both the SET and
137 : // the SINGLEDEL keys are elided. If multiple SETs of the key exist within the
138 : // LSM, the SINGLEDEL reveals the lower SET. This behavior is dependent on the
139 : // internal LSM state and nondeterministic. To ensure determinism, the end user
140 : // must satisfy W1 and use single delete only when they can guarantee that the
141 : // key has been set at most once since the last delete, preventing this rollback
142 : // to a previous value.
143 : //
144 : // This W1 invariant requires a delicate dance during operation generation,
145 : // because independent batches may be independently built and committed. With
146 : // multi-instance variants of the metamorphic tests, keys in batches may
147 : // ultimately be committed to any of several DB instances. To satisfy these
148 : // requirements, the key manager tracks the history of every key on every
149 : // writable object. When generating a new single deletion operation, the
150 : // generator asks the key manager for a set of keys for which a single delete
151 : // maintains the W1 invariant within the object itself. This object-local W1
152 : // invariant (OLW1) is equivalent to W1 if one only ever performs write
153 : // operations directly against individual DB objects.
154 : //
155 : // However with the existence of batches that receive writes independent of DB
156 : // objects, W1 may be violated by appending the histories of two objects that
157 : // independently satisfy OLW1. Consider a sequence such as:
158 : //
159 : // 1. db1.Set("foo")
160 : // 2. batch1.Set("foo")
161 : // 3. batch1.SingleDelete("foo")
162 : // 4. db1.Apply(batch1)
163 : //
164 : // Both db1 and batch1 satisfy the object-local invariant OLW1. However the
165 : // composition of the histories created by appending batch1's operations to db1
166 : // creates a history that now violates W1 on db1. To detect this violation,
167 : // batch applications/commits and ingestions examine the tail of the destination
168 : // object's history and the head of the source batch's history. When a violation
169 : // is detected, these operations insert additional Delete operations to clear
170 : // the conflicting keys before proceeding with the conflicting operation. These
171 : // deletes reset the key history.
172 : //
173 : // Note that this generation-time key tracking requires that operations be
174 : // infallible, because a runtime failure would cause the key manager's state to
175 : // diverge from the runtime object state. Ingestion operations pose an obstacle,
176 : // because the generator may generate ingestions that fail due to overlapping
177 : // sstables. Today, this complication is sidestepped by avoiding ingestion of
178 : // multiple batches containing deletes or single deletes since loss of those
179 : // specific operations on a key are what we cannot tolerate (doing SingleDelete
180 : // on a key that has not been written to because the Set was lost is harmless).
181 : //
182 : // TODO(jackson): Instead, compute smallest and largest bounds of batches so
183 : // that we know at generation-time whether or not an ingestion operation will
184 : // fail and can avoid updating key state.
185 : type keyManager struct {
186 : comparer *base.Comparer
187 :
188 : // metaTimestamp is used to provide a ordering over certain operations like
189 : // iter creation, updates to keys. Keeping track of the timestamp allows us
190 : // to make determinations such as whether a key will be visible to an
191 : // iterator.
192 : metaTimestamp int
193 :
194 : byObj map[objID]*objKeyMeta
195 : // globalKeys represents all the keys that have been generated so far. Not
196 : // all these keys have been written to. globalKeys is sorted.
197 : globalKeys [][]byte
198 : // globalKeysMap contains the same keys as globalKeys but in a map. It
199 : // ensures no duplication.
200 : globalKeysMap map[string]bool
201 : // globalKeyPrefixes contains all the key prefixes (as defined by the
202 : // comparer's Split) generated so far. globalKeyPrefixes is sorted.
203 : globalKeyPrefixes [][]byte
204 : // globalKeyPrefixesMap contains the same keys as globalKeyPrefixes. It
205 : // ensures no duplication.
206 : globalKeyPrefixesMap map[string]struct{}
207 : }
208 :
209 : type objKeyMeta struct {
210 : id objID
211 : // List of keys, and what has happened to each in this object.
212 : // Will be transferred when needed.
213 : keys map[string]*keyMeta
214 : // bounds holds user key bounds encompassing all the keys set within an
215 : // object. It's updated within `update` when a new op is generated.
216 : bounds bounds
217 : // These flags are true if the object has had range del or range key operations.
218 : hasRangeDels bool
219 : hasRangeKeys bool
220 : hasRangeKeyUnset bool
221 : }
222 :
223 : // MergeKey adds the given key at the given meta timestamp, merging the histories as needed.
224 0 : func (okm *objKeyMeta) MergeKey(k *keyMeta, ts int) {
225 0 : meta, ok := okm.keys[string(k.key)]
226 0 : if !ok {
227 0 : meta = &keyMeta{
228 0 : objID: okm.id,
229 0 : key: k.key,
230 0 : }
231 0 : okm.keys[string(k.key)] = meta
232 0 : }
233 0 : k.mergeInto(meta, ts)
234 : }
235 :
236 : // CollapseKeys collapses the history of all keys. Used with ingestion operation
237 : // which only use the last value of any given key.
238 0 : func (okm *objKeyMeta) CollapseKeys() {
239 0 : for _, keyMeta := range okm.keys {
240 0 : keyMeta.history = keyMeta.history.collapsed()
241 0 : }
242 : }
243 :
244 : // MergeFrom merges the `from` metadata into this one, appending all of its
245 : // individual operations at the provided timestamp.
246 0 : func (okm *objKeyMeta) MergeFrom(from *objKeyMeta, metaTimestamp int, cmp base.Compare) {
247 0 : // The result should be the same regardless of the ordering of the keys.
248 0 : for _, k := range from.keys {
249 0 : okm.MergeKey(k, metaTimestamp)
250 0 : }
251 0 : okm.bounds.Expand(cmp, from.bounds)
252 0 : okm.hasRangeDels = okm.hasRangeDels || from.hasRangeDels
253 0 : okm.hasRangeKeys = okm.hasRangeKeys || from.hasRangeKeys
254 0 : okm.hasRangeKeyUnset = okm.hasRangeKeyUnset || from.hasRangeKeyUnset
255 : }
256 :
257 : // objKeyMeta looks up the objKeyMeta for a given object, creating it if necessary.
258 0 : func (k *keyManager) objKeyMeta(o objID) *objKeyMeta {
259 0 : m, ok := k.byObj[o]
260 0 : if !ok {
261 0 : m = &objKeyMeta{
262 0 : id: o,
263 0 : keys: make(map[string]*keyMeta),
264 0 : }
265 0 : k.byObj[o] = m
266 0 : }
267 0 : return m
268 : }
269 :
270 : // SortedKeysForObj returns all the entries in objKeyMeta(o).keys, in sorted
271 : // order.
272 0 : func (k *keyManager) SortedKeysForObj(o objID) []keyMeta {
273 0 : okm := k.objKeyMeta(o)
274 0 : res := make([]keyMeta, 0, len(okm.keys))
275 0 : for _, m := range okm.keys {
276 0 : res = append(res, *m)
277 0 : }
278 0 : slices.SortFunc(res, func(a, b keyMeta) int {
279 0 : cmp := k.comparer.Compare(a.key, b.key)
280 0 : if cmp == 0 {
281 0 : panic(fmt.Sprintf("distinct keys %q and %q compared as equal", a.key, b.key))
282 : }
283 0 : return cmp
284 : })
285 0 : return res
286 : }
287 :
288 : // InRangeKeysForObj returns all keys in the range [lower, upper) associated with the
289 : // given object, in sorted order. If either of the bounds is nil, it is ignored.
290 0 : func (k *keyManager) InRangeKeysForObj(o objID, lower, upper []byte) []keyMeta {
291 0 : var inRangeKeys []keyMeta
292 0 : for _, km := range k.SortedKeysForObj(o) {
293 0 : if (lower == nil || k.comparer.Compare(km.key, lower) >= 0) &&
294 0 : (upper == nil || k.comparer.Compare(km.key, upper) < 0) {
295 0 : inRangeKeys = append(inRangeKeys, km)
296 0 : }
297 : }
298 0 : return inRangeKeys
299 :
300 : }
301 :
302 : // KeysForExternalIngest returns the keys that will be ingested with an external
303 : // object (taking into consideration the bounds, synthetic suffix, etc).
304 0 : func (k *keyManager) KeysForExternalIngest(obj externalObjWithBounds) []keyMeta {
305 0 : var res []keyMeta
306 0 : for _, km := range k.SortedKeysForObj(obj.externalObjID) {
307 0 : // Apply prefix and suffix changes, then check the bounds.
308 0 : if obj.syntheticPrefix.IsSet() {
309 0 : km.key = obj.syntheticPrefix.Apply(km.key)
310 0 : }
311 0 : if obj.syntheticSuffix.IsSet() {
312 0 : n := k.comparer.Split(km.key)
313 0 : km.key = append(km.key[:n:n], obj.syntheticSuffix...)
314 0 : }
315 0 : if k.comparer.Compare(km.key, obj.bounds.Start) >= 0 && k.comparer.Compare(km.key, obj.bounds.End) < 0 {
316 0 : res = append(res, km)
317 0 : }
318 : }
319 : // Check for duplicate resulting keys.
320 0 : for i := 1; i < len(res); i++ {
321 0 : if k.comparer.Compare(res[i].key, res[i-1].key) == 0 {
322 0 : panic(fmt.Sprintf("duplicate external ingest key %q", res[i].key))
323 : }
324 : }
325 0 : return res
326 : }
327 :
328 0 : func (k *keyManager) nextMetaTimestamp() int {
329 0 : ret := k.metaTimestamp
330 0 : k.metaTimestamp++
331 0 : return ret
332 0 : }
333 :
334 : // newKeyManager returns a pointer to a new keyManager. Callers should
335 : // interact with this using addNewKey, knownKeys, update methods only.
336 0 : func newKeyManager(numInstances int) *keyManager {
337 0 : m := &keyManager{
338 0 : comparer: testkeys.Comparer,
339 0 : byObj: make(map[objID]*objKeyMeta),
340 0 : globalKeysMap: make(map[string]bool),
341 0 : globalKeyPrefixesMap: make(map[string]struct{}),
342 0 : }
343 0 : for i := 1; i <= max(numInstances, 1); i++ {
344 0 : m.objKeyMeta(makeObjID(dbTag, uint32(i)))
345 0 : }
346 0 : return m
347 : }
348 :
349 : // addNewKey adds the given key to the key manager for global key tracking.
350 : // Returns false iff this is not a new key.
351 0 : func (k *keyManager) addNewKey(key []byte) bool {
352 0 : if k.globalKeysMap[string(key)] {
353 0 : return false
354 0 : }
355 0 : insertSorted(k.comparer.Compare, &k.globalKeys, key)
356 0 : k.globalKeysMap[string(key)] = true
357 0 :
358 0 : prefixLen := k.comparer.Split(key)
359 0 : if prefixLen == 0 {
360 0 : panic(fmt.Sprintf("key %q has zero length prefix", key))
361 : }
362 0 : if _, ok := k.globalKeyPrefixesMap[string(key[:prefixLen])]; !ok {
363 0 : insertSorted(k.comparer.Compare, &k.globalKeyPrefixes, key[:prefixLen])
364 0 : k.globalKeyPrefixesMap[string(key[:prefixLen])] = struct{}{}
365 0 : }
366 0 : return true
367 : }
368 :
369 : // getOrInit returns the keyMeta for the (objID, key) pair, if it exists, else
370 : // allocates, initializes and returns a new value.
371 0 : func (k *keyManager) getOrInit(id objID, key []byte) *keyMeta {
372 0 : objKeys := k.objKeyMeta(id)
373 0 : m, ok := objKeys.keys[string(key)]
374 0 : if ok {
375 0 : return m
376 0 : }
377 0 : m = &keyMeta{
378 0 : objID: id,
379 0 : key: key,
380 0 : }
381 0 : // Initialize the key-to-meta index.
382 0 : objKeys.keys[string(key)] = m
383 0 : // Expand the object's bounds to contain this key if they don't already.
384 0 : objKeys.bounds.Expand(k.comparer.Compare, k.makeSingleKeyBounds(key))
385 0 : return m
386 : }
387 :
388 : // mergeObjectInto merges obj key metadata from an object into another and
389 : // deletes the metadata for the source object (which must not be used again).
390 0 : func (k *keyManager) mergeObjectInto(from, to objID) {
391 0 : toMeta := k.objKeyMeta(to)
392 0 : ts := k.nextMetaTimestamp()
393 0 : toMeta.MergeFrom(k.objKeyMeta(from), ts, k.comparer.Compare)
394 0 :
395 0 : delete(k.byObj, from)
396 0 : }
397 :
398 : // expandBounds expands the incrementally maintained bounds of o to be at least
399 : // as wide as `b`.
400 0 : func (k *keyManager) expandBounds(o objID, b bounds) {
401 0 : k.objKeyMeta(o).bounds.Expand(k.comparer.Compare, b)
402 0 : }
403 :
404 : // doObjectBoundsOverlap returns true iff any of the named objects have key
405 : // bounds that overlap any other named object.
406 0 : func (k *keyManager) doObjectBoundsOverlap(objIDs []objID) bool {
407 0 : for i := range objIDs {
408 0 : ib, iok := k.byObj[objIDs[i]]
409 0 : if !iok {
410 0 : continue
411 : }
412 0 : for j := i + 1; j < len(objIDs); j++ {
413 0 : jb, jok := k.byObj[objIDs[j]]
414 0 : if !jok {
415 0 : continue
416 : }
417 0 : if ib.bounds.Overlaps(k.comparer.Compare, jb.bounds) {
418 0 : return true
419 0 : }
420 : }
421 : }
422 0 : return false
423 : }
424 :
425 : // checkForSingleDelConflicts examines all the keys written to srcObj, and
426 : // determines whether any of the contained single deletes would be
427 : // nondeterministic if applied to dstObj in dstObj's current state. It returns a
428 : // slice of all the keys that are found to conflict. In order to preserve
429 : // determinism, the caller must delete the key from the destination before
430 : // writing src's mutations to dst in order to ensure determinism.
431 : //
432 : // It takes a `srcCollapsed` parameter that determines whether the source
433 : // history should be "collapsed" (see keyHistory.collapsed) before determining
434 : // whether the applied state will conflict. This is required to facilitate
435 : // ingestOps which are NOT equivalent to committing the batch, because they can
436 : // only commit 1 internal point key at each unique user key.
437 0 : func (k *keyManager) checkForSingleDelConflicts(srcObj, dstObj objID, srcCollapsed bool) [][]byte {
438 0 : dstKeys := k.objKeyMeta(dstObj)
439 0 : var conflicts [][]byte
440 0 : for _, src := range k.SortedKeysForObj(srcObj) {
441 0 : if srcCollapsed {
442 0 : src.history = src.history.collapsed()
443 0 : }
444 0 : if k.checkForSingleDelConflict(src, dstKeys) {
445 0 : conflicts = append(conflicts, src.key)
446 0 : }
447 : }
448 0 : return conflicts
449 : }
450 :
451 : // checkForSingleDelConflict returns true if applying the history of the source
452 : // key on top of the given object results in a possible SingleDel
453 : // nondeterminism. See checkForSingleDelConflicts.
454 0 : func (k *keyManager) checkForSingleDelConflict(src keyMeta, dstObjKeyMeta *objKeyMeta) bool {
455 0 : // Single delete generation logic already ensures that both the source
456 0 : // object and the destination object's single deletes are deterministic
457 0 : // within the context of their existing writes. However, applying the source
458 0 : // keys on top of the destination object may violate the invariants.
459 0 : // Consider:
460 0 : //
461 0 : // src: a.SET; a.SINGLEDEL;
462 0 : // dst: a.SET;
463 0 : //
464 0 : // The merged view is:
465 0 : //
466 0 : // a.SET; a.SET; a.SINGLEDEL;
467 0 : //
468 0 : // This is invalid, because there is more than 1 value mutation of the
469 0 : // key before the single delete.
470 0 : //
471 0 : // We walk the source object's history in chronological order, looking
472 0 : // for a single delete that was written before a DEL/RANGEDEL. (NB: We
473 0 : // don't need to look beyond a DEL/RANGEDEL, because these deletes bound
474 0 : // any subsequently-written single deletes to applying to the keys
475 0 : // within src's history between the two tombstones. We already know from
476 0 : // per-object history invariants that any such single delete must be
477 0 : // deterministic with respect to src's keys.)
478 0 : var srcHasUnboundedSingleDelete bool
479 0 : var srcValuesBeforeSingleDelete int
480 0 :
481 0 : // When the srcObj is being ingested (srcCollapsed=t), the semantics
482 0 : // change. We must first "collapse" the key's history to represent the
483 0 : // ingestion semantics.
484 0 : srcHistory := src.history
485 0 :
486 0 : srcloop:
487 0 : for _, item := range srcHistory {
488 0 : switch item.opType {
489 0 : case OpWriterDelete, OpWriterDeleteRange:
490 0 : // We found a DEL or RANGEDEL before any single delete. If src
491 0 : // contains additional single deletes, their effects are limited
492 0 : // to applying to later keys. Combining the two object histories
493 0 : // doesn't pose any determinism risk.
494 0 : return false
495 :
496 0 : case OpWriterSingleDelete:
497 0 : // We found a single delete. Since we found this single delete
498 0 : // before a DEL or RANGEDEL, this delete has the potential to
499 0 : // affect the visibility of keys in `dstObj`. We'll need to look
500 0 : // for potential conflicts down below.
501 0 : srcHasUnboundedSingleDelete = true
502 0 : if srcValuesBeforeSingleDelete > 1 {
503 0 : panic(errors.AssertionFailedf("unexpectedly found %d sets/merges before single del",
504 0 : srcValuesBeforeSingleDelete))
505 : }
506 0 : break srcloop
507 :
508 0 : case OpWriterSet, OpWriterMerge:
509 0 : // We found a SET or MERGE operation for this key. If there's a
510 0 : // subsequent single delete, we'll need to make sure there's not
511 0 : // a SET or MERGE in the dst too.
512 0 : srcValuesBeforeSingleDelete++
513 :
514 0 : default:
515 0 : panic(errors.AssertionFailedf("unexpected optype %d", item.opType))
516 : }
517 : }
518 0 : if !srcHasUnboundedSingleDelete {
519 0 : return false
520 0 : }
521 :
522 0 : dst, ok := dstObjKeyMeta.keys[string(src.key)]
523 0 : // If the destination writer has no record of the key, the combined key
524 0 : // history is simply the src object's key history which is valid due to
525 0 : // per-object single deletion invariants.
526 0 : if !ok {
527 0 : return false
528 0 : }
529 :
530 : // We need to examine the trailing key history on dst.
531 0 : consecutiveValues := srcValuesBeforeSingleDelete
532 0 : for i := len(dst.history) - 1; i >= 0; i-- {
533 0 : switch dst.history[i].opType {
534 0 : case OpWriterSet, OpWriterMerge:
535 0 : // A SET/MERGE may conflict if there's more than 1 consecutive
536 0 : // SET/MERGEs.
537 0 : consecutiveValues++
538 0 : if consecutiveValues > 1 {
539 0 : return true
540 0 : }
541 0 : case OpWriterDelete, OpWriterSingleDelete, OpWriterDeleteRange:
542 0 : // Dels clear the history, enabling use of single delete.
543 0 : return false
544 :
545 0 : default:
546 0 : panic(errors.AssertionFailedf("unexpected optype %d", dst.history[i].opType))
547 : }
548 : }
549 0 : return false
550 : }
551 :
552 : // update updates the internal state of the keyManager according to the given
553 : // op.
554 0 : func (k *keyManager) update(o op) {
555 0 : switch s := o.(type) {
556 0 : case *setOp:
557 0 : meta := k.getOrInit(s.writerID, s.key)
558 0 : meta.history = append(meta.history, keyHistoryItem{
559 0 : opType: OpWriterSet,
560 0 : metaTimestamp: k.nextMetaTimestamp(),
561 0 : })
562 0 : case *mergeOp:
563 0 : meta := k.getOrInit(s.writerID, s.key)
564 0 : meta.history = append(meta.history, keyHistoryItem{
565 0 : opType: OpWriterMerge,
566 0 : metaTimestamp: k.nextMetaTimestamp(),
567 0 : })
568 0 : case *deleteOp:
569 0 : meta := k.getOrInit(s.writerID, s.key)
570 0 : if meta.objID.tag() == dbTag {
571 0 : meta.clear()
572 0 : } else {
573 0 : meta.history = append(meta.history, keyHistoryItem{
574 0 : opType: OpWriterDelete,
575 0 : metaTimestamp: k.nextMetaTimestamp(),
576 0 : })
577 0 : }
578 0 : case *deleteRangeOp:
579 0 : // We track the history of discrete point keys, but a range deletion
580 0 : // applies over a continuous key span of infinite keys. However, the key
581 0 : // manager knows all keys that have been used in all operations, so we
582 0 : // can discretize the range tombstone by adding it to every known key
583 0 : // within the range.
584 0 : ts := k.nextMetaTimestamp()
585 0 : keyRange := pebble.KeyRange{Start: s.start, End: s.end}
586 0 : for _, key := range k.knownKeysInRange(keyRange) {
587 0 : meta := k.getOrInit(s.writerID, key)
588 0 : if meta.objID.tag() == dbTag {
589 0 : meta.clear()
590 0 : } else {
591 0 : meta.history = append(meta.history, keyHistoryItem{
592 0 : opType: OpWriterDeleteRange,
593 0 : metaTimestamp: ts,
594 0 : })
595 0 : }
596 : }
597 0 : k.expandBounds(s.writerID, k.makeEndExclusiveBounds(s.start, s.end))
598 0 : k.objKeyMeta(s.writerID).hasRangeDels = true
599 :
600 0 : case *singleDeleteOp:
601 0 : meta := k.getOrInit(s.writerID, s.key)
602 0 : meta.history = append(meta.history, keyHistoryItem{
603 0 : opType: OpWriterSingleDelete,
604 0 : metaTimestamp: k.nextMetaTimestamp(),
605 0 : })
606 0 : case *rangeKeyDeleteOp:
607 0 : // Range key operations on their own don't determine singledel eligibility,
608 0 : // however range key operations could be part of a batch which contains
609 0 : // other operations that do affect it. If those batches were to get
610 0 : // ingested, we'd need to know what the bounds of sstables generated out
611 0 : // of those batches are, as that determines whether that ingestion
612 0 : // will succeed or not.
613 0 : k.expandBounds(s.writerID, k.makeEndExclusiveBounds(s.start, s.end))
614 0 : k.objKeyMeta(s.writerID).hasRangeKeys = true
615 0 : case *rangeKeySetOp:
616 0 : k.expandBounds(s.writerID, k.makeEndExclusiveBounds(s.start, s.end))
617 0 : k.objKeyMeta(s.writerID).hasRangeKeys = true
618 0 : case *rangeKeyUnsetOp:
619 0 : k.expandBounds(s.writerID, k.makeEndExclusiveBounds(s.start, s.end))
620 0 : k.objKeyMeta(s.writerID).hasRangeKeys = true
621 0 : k.objKeyMeta(s.writerID).hasRangeKeyUnset = true
622 0 : case *ingestOp:
623 0 : // Some ingestion operations may attempt to ingest overlapping sstables
624 0 : // which is prohibited. We know at generation time whether these
625 0 : // ingestions will be successful. If they won't be successful, we should
626 0 : // not update the key state because both the batch(es) and target DB
627 0 : // will be left unmodified.
628 0 : if k.doObjectBoundsOverlap(s.batchIDs) {
629 0 : // This ingestion will fail.
630 0 : return
631 0 : }
632 :
633 : // For each batch, merge the keys into the DB. We can't call
634 : // keyMeta.mergeInto directly to merge, because ingest operations first
635 : // "flatten" the batch (because you can't set the same key twice at a
636 : // single sequence number). Instead we compute the collapsed history and
637 : // merge that.
638 0 : for _, batchID := range s.batchIDs {
639 0 : k.objKeyMeta(batchID).CollapseKeys()
640 0 : k.mergeObjectInto(batchID, s.dbID)
641 0 : }
642 0 : case *ingestAndExciseOp:
643 0 : // IngestAndExcise does not ingest multiple batches, so we will not see
644 0 : // a failure due to overlapping sstables. However we do need to merge
645 0 : // the singular batch into the key manager.
646 0 : //
647 0 : // Remove all keys from the key manager within the excise span before
648 0 : // merging the batch into the db.
649 0 : for _, key := range k.InRangeKeysForObj(s.dbID, s.exciseStart, s.exciseEnd) {
650 0 : m := k.getOrInit(s.dbID, key.key)
651 0 : m.clear()
652 0 : }
653 0 : k.objKeyMeta(s.batchID).CollapseKeys()
654 0 : k.mergeObjectInto(s.batchID, s.dbID)
655 : // TODO(bilal): Handle replicateOp here. We currently disable SingleDelete
656 : // when these operations are enabled (see multiInstanceConfig).
657 0 : case *newExternalObjOp:
658 0 : // Collapse and transfer the keys from the batch to the external object.
659 0 : k.objKeyMeta(s.batchID).CollapseKeys()
660 0 : k.mergeObjectInto(s.batchID, s.externalObjID)
661 0 : case *ingestExternalFilesOp:
662 0 : // Merge the keys from the external objects (within the restricted bounds)
663 0 : // into the database.
664 0 : ts := k.nextMetaTimestamp()
665 0 : dbMeta := k.objKeyMeta(s.dbID)
666 0 : for _, obj := range s.objs {
667 0 : for _, keyMeta := range k.KeysForExternalIngest(obj) {
668 0 : dbMeta.MergeKey(&keyMeta, ts)
669 0 : }
670 0 : dbMeta.bounds.Expand(k.comparer.Compare, k.makeEndExclusiveBounds(obj.bounds.Start, obj.bounds.End))
671 : }
672 0 : case *applyOp:
673 0 : // Merge the keys from this batch into the parent writer.
674 0 : k.mergeObjectInto(s.batchID, s.writerID)
675 0 : case *batchCommitOp:
676 0 : // Merge the keys from the batch with the keys from the DB.
677 0 : k.mergeObjectInto(s.batchID, s.dbID)
678 : }
679 : }
680 :
681 0 : func (k *keyManager) knownKeys() (keys [][]byte) {
682 0 : return k.globalKeys
683 0 : }
684 :
685 : // knownKeysInRange returns all eligible read keys within the range
686 : // [start,end). The returned slice is owned by the keyManager and must not be
687 : // retained.
688 0 : func (k *keyManager) knownKeysInRange(kr pebble.KeyRange) (keys [][]byte) {
689 0 : s, _ := slices.BinarySearchFunc(k.globalKeys, kr.Start, k.comparer.Compare)
690 0 : e, _ := slices.BinarySearchFunc(k.globalKeys, kr.End, k.comparer.Compare)
691 0 : if s >= e {
692 0 : return nil
693 0 : }
694 0 : return k.globalKeys[s:e]
695 : }
696 :
697 0 : func (k *keyManager) prefixes() (prefixes [][]byte) {
698 0 : return k.globalKeyPrefixes
699 0 : }
700 :
701 : // prefixExists returns true if a key has been generated with the provided
702 : // prefix before.
703 0 : func (k *keyManager) prefixExists(prefix []byte) bool {
704 0 : _, exists := k.globalKeyPrefixesMap[string(prefix)]
705 0 : return exists
706 0 : }
707 :
708 : // eligibleSingleDeleteKeys returns a slice of keys that can be safely single
709 : // deleted, given the writer id. Restricting single delete keys through this
710 : // method is used to ensure the OLW1 guarantee (see the keyManager comment) for
711 : // the provided object ID.
712 0 : func (k *keyManager) eligibleSingleDeleteKeys(o objID) (keys [][]byte) {
713 0 : // Creating a slice of keys is wasteful given that the caller will pick one,
714 0 : // but makes it simpler for unit testing.
715 0 : objKeys := k.objKeyMeta(o)
716 0 : for _, key := range k.globalKeys {
717 0 : meta, ok := objKeys.keys[string(key)]
718 0 : if !ok {
719 0 : keys = append(keys, key)
720 0 : continue
721 : }
722 : // Examine the history within this object.
723 0 : if meta.history.canSingleDelete() {
724 0 : keys = append(keys, key)
725 0 : }
726 : }
727 0 : return keys
728 : }
729 :
730 : // makeSingleKeyBounds creates a [key, key] bound.
731 0 : func (k *keyManager) makeSingleKeyBounds(key []byte) bounds {
732 0 : return bounds{
733 0 : smallest: key,
734 0 : largest: key,
735 0 : largestExcl: false,
736 0 : }
737 0 : }
738 :
739 : // makeEndExclusiveBounds creates a [smallest, largest) bound.
740 0 : func (k *keyManager) makeEndExclusiveBounds(smallest, largest []byte) bounds {
741 0 : b := bounds{
742 0 : smallest: smallest,
743 0 : largest: largest,
744 0 : largestExcl: true,
745 0 : }
746 0 : b.checkValid(k.comparer.Compare)
747 0 : return b
748 0 : }
749 :
750 : // a keyHistoryItem describes an individual operation performed on a key.
751 : type keyHistoryItem struct {
752 : // opType may be writerSet, writerDelete, writerSingleDelete,
753 : // writerDeleteRange or writerMerge only. No other opTypes may appear here.
754 : opType OpType
755 : metaTimestamp int
756 : }
757 :
758 : // keyHistory captures the history of mutations to a key in chronological order.
759 : type keyHistory []keyHistoryItem
760 :
761 : // before returns the subslice of the key history that happened strictly before
762 : // the provided meta timestamp.
763 0 : func (h keyHistory) before(metaTimestamp int) keyHistory {
764 0 : i, _ := slices.BinarySearchFunc(h, metaTimestamp, func(a keyHistoryItem, ts int) int {
765 0 : return cmp.Compare(a.metaTimestamp, ts)
766 0 : })
767 0 : return h[:i]
768 : }
769 :
770 : // canSingleDelete examines the tail of the history and returns true if a single
771 : // delete appended to this history would satisfy the single delete invariants.
772 0 : func (h keyHistory) canSingleDelete() bool {
773 0 : if len(h) == 0 {
774 0 : return true
775 0 : }
776 0 : switch o := h[len(h)-1].opType; o {
777 0 : case OpWriterDelete, OpWriterDeleteRange, OpWriterSingleDelete:
778 0 : return true
779 0 : case OpWriterSet, OpWriterMerge:
780 0 : if len(h) == 1 {
781 0 : return true
782 0 : }
783 0 : return h[len(h)-2].opType.isDelete()
784 0 : default:
785 0 : panic(errors.AssertionFailedf("unexpected writer op %v", o))
786 : }
787 : }
788 :
789 0 : func (h keyHistory) String() string {
790 0 : var sb strings.Builder
791 0 : for i, it := range h {
792 0 : if i > 0 {
793 0 : fmt.Fprint(&sb, ", ")
794 0 : }
795 0 : switch it.opType {
796 0 : case OpWriterDelete:
797 0 : fmt.Fprint(&sb, "del")
798 0 : case OpWriterDeleteRange:
799 0 : fmt.Fprint(&sb, "delrange")
800 0 : case OpWriterSingleDelete:
801 0 : fmt.Fprint(&sb, "singledel")
802 0 : case OpWriterSet:
803 0 : fmt.Fprint(&sb, "set")
804 0 : case OpWriterMerge:
805 0 : fmt.Fprint(&sb, "merge")
806 0 : default:
807 0 : fmt.Fprintf(&sb, "optype[v=%d]", it.opType)
808 : }
809 0 : fmt.Fprintf(&sb, "(%d)", it.metaTimestamp)
810 : }
811 0 : return sb.String()
812 : }
813 :
814 : // hasVisibleKey examines the tail of the history and returns true if the
815 : // history should end in a visible value for this key.
816 0 : func (h keyHistory) hasVisibleValue() bool {
817 0 : if len(h) == 0 {
818 0 : return false
819 0 : }
820 0 : return !h[len(h)-1].opType.isDelete()
821 : }
822 :
823 : // collapsed returns a new key history that's equivalent to the history created
824 : // by an ingestOp that "collapses" a batch's keys. See ingestOp.build.
825 0 : func (h keyHistory) collapsed() keyHistory {
826 0 : var ret keyHistory
827 0 : // When collapsing a batch, any range deletes are semantically applied
828 0 : // first. Look for any range deletes and apply them.
829 0 : for _, op := range h {
830 0 : if op.opType == OpWriterDeleteRange {
831 0 : ret = append(ret, op)
832 0 : break
833 : }
834 : }
835 : // Among point keys, the most recently written key wins.
836 0 : for i := len(h) - 1; i >= 0; i-- {
837 0 : if h[i].opType != OpWriterDeleteRange {
838 0 : ret = append(ret, h[i])
839 0 : break
840 : }
841 : }
842 0 : return ret
843 : }
844 :
845 0 : func opWrittenKeys(untypedOp op) [][]byte {
846 0 : switch t := untypedOp.(type) {
847 0 : case *applyOp:
848 0 : case *batchCommitOp:
849 0 : case *checkpointOp:
850 0 : case *closeOp:
851 0 : case *compactOp:
852 0 : case *dbRestartOp:
853 0 : case *deleteOp:
854 0 : return [][]byte{t.key}
855 0 : case *deleteRangeOp:
856 0 : return [][]byte{t.start, t.end}
857 0 : case *flushOp:
858 0 : case *getOp:
859 0 : case *ingestOp:
860 0 : case *initOp:
861 0 : case *iterFirstOp:
862 0 : case *iterLastOp:
863 0 : case *iterNextOp:
864 0 : case *iterNextPrefixOp:
865 0 : case *iterCanSingleDelOp:
866 0 : case *iterPrevOp:
867 0 : case *iterSeekGEOp:
868 0 : case *iterSeekLTOp:
869 0 : case *iterSeekPrefixGEOp:
870 0 : case *iterSetBoundsOp:
871 0 : case *iterSetOptionsOp:
872 0 : case *mergeOp:
873 0 : return [][]byte{t.key}
874 0 : case *newBatchOp:
875 0 : case *newIndexedBatchOp:
876 0 : case *newIterOp:
877 0 : case *newIterUsingCloneOp:
878 0 : case *newSnapshotOp:
879 0 : case *rangeKeyDeleteOp:
880 0 : case *rangeKeySetOp:
881 0 : case *rangeKeyUnsetOp:
882 0 : case *setOp:
883 0 : return [][]byte{t.key}
884 0 : case *singleDeleteOp:
885 0 : return [][]byte{t.key}
886 0 : case *replicateOp:
887 0 : return [][]byte{t.start, t.end}
888 : }
889 0 : return nil
890 : }
891 :
892 0 : func loadPrecedingKeys(t TestingT, ops []op, cfg *OpConfig, m *keyManager) {
893 0 : for _, op := range ops {
894 0 : // Pretend we're generating all the operation's keys as potential new
895 0 : // key, so that we update the key manager's keys and prefix sets.
896 0 : for _, k := range opWrittenKeys(op) {
897 0 : m.addNewKey(k)
898 0 :
899 0 : // If the key has a suffix, ratchet up the suffix distribution if
900 0 : // necessary.
901 0 : if s := m.comparer.Split(k); s < len(k) {
902 0 : suffix, err := testkeys.ParseSuffix(k[s:])
903 0 : require.NoError(t, err)
904 0 : if uint64(suffix) > cfg.writeSuffixDist.Max() {
905 0 : diff := int(uint64(suffix) - cfg.writeSuffixDist.Max())
906 0 : cfg.writeSuffixDist.IncMax(diff)
907 0 : }
908 : }
909 : }
910 :
911 : // Update key tracking state.
912 0 : m.update(op)
913 : }
914 : }
915 :
916 0 : func insertSorted(cmp base.Compare, dst *[][]byte, k []byte) {
917 0 : s := *dst
918 0 : i, _ := slices.BinarySearchFunc(s, k, cmp)
919 0 : *dst = slices.Insert(s, i, k)
920 0 : }
|