Line data Source code
1 : package metamorphic
2 :
3 : import (
4 : "cmp"
5 : "fmt"
6 : "slices"
7 : "strings"
8 :
9 : "github.com/cockroachdb/errors"
10 : "github.com/cockroachdb/pebble"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/testkeys"
13 : "github.com/stretchr/testify/require"
14 : )
15 :
16 : // keyMeta is metadata associated with an (objID, key) pair, where objID is
17 : // a writer containing the key.
18 : type keyMeta struct {
19 : objID objID
20 : key []byte
21 : // history provides the history of writer operations applied against this
22 : // key on this object. history is always ordered by non-decreasing
23 : // metaTimestamp.
24 : history keyHistory
25 : }
26 :
27 1 : func (m *keyMeta) clear() {
28 1 : m.history = m.history[:0]
29 1 : }
30 :
31 : // mergeInto merges this metadata into the metadata for other, appending all of
32 : // its individual operations to dst at the provided timestamp.
33 1 : func (m *keyMeta) mergeInto(dst *keyMeta, ts int) {
34 1 : for _, op := range m.history {
35 1 : // If the key is being merged into a database object and the operation
36 1 : // is a delete, we can clear the destination history. Database objects
37 1 : // are end points in the merging of keys and won't be the source of a
38 1 : // future merge. Deletions cause all other operations to behave as
39 1 : // though the key was never written to the database at all, so we don't
40 1 : // need to consider it for maintaining single delete invariants.
41 1 : //
42 1 : // NB: There's a subtlety here in that isDelete() will return true if
43 1 : // opType is a writerSingleDelete, but single deletes are capable of
44 1 : // leaking information about the history of writes. However, that's
45 1 : // okay, because as long as we're properly generating single deletes
46 1 : // according to the W1 invariant described in keyManager's comment, a
47 1 : // single delete is equivalent to delete for the current history.
48 1 : if dst.objID.tag() == dbTag && op.opType.isDelete() {
49 1 : dst.clear()
50 1 : continue
51 : }
52 1 : dst.history = append(dst.history, keyHistoryItem{
53 1 : opType: op.opType,
54 1 : metaTimestamp: ts,
55 1 : })
56 : }
57 : }
58 :
59 : type bounds struct {
60 : smallest []byte
61 : largest []byte
62 : largestExcl bool // is largest exclusive?
63 : }
64 :
65 1 : func (b bounds) String() string {
66 1 : if b.largestExcl {
67 1 : return fmt.Sprintf("[%q,%q)", b.smallest, b.largest)
68 1 : }
69 1 : return fmt.Sprintf("[%q,%q]", b.smallest, b.largest)
70 : }
71 :
72 : // Overlaps returns true iff the bounds intersect.
73 1 : func (b *bounds) Overlaps(cmp base.Compare, other bounds) bool {
74 1 : if b.IsUnset() || other.IsUnset() {
75 1 : return false
76 1 : }
77 : // Is b strictly before other?
78 1 : if v := cmp(b.largest, other.smallest); v < 0 || (v == 0 && b.largestExcl) {
79 0 : return false
80 0 : }
81 : // Is b strictly after other?
82 1 : if v := cmp(b.smallest, other.largest); v > 0 || (v == 0 && other.largestExcl) {
83 1 : return false
84 1 : }
85 1 : return true
86 : }
87 :
88 : // IsUnset returns true if the bounds haven't been set.
89 1 : func (b bounds) IsUnset() bool {
90 1 : return b.smallest == nil && b.largest == nil
91 1 : }
92 :
93 : // Expand potentially expands the receiver bounds to include the other given
94 : // bounds. If the receiver is unset, the other bounds are copied.
95 1 : func (b *bounds) Expand(cmp base.Compare, other bounds) {
96 1 : if other.IsUnset() {
97 1 : return
98 1 : }
99 1 : if b.IsUnset() {
100 1 : *b = other
101 1 : return
102 1 : }
103 1 : if cmp(b.smallest, other.smallest) > 0 {
104 1 : b.smallest = other.smallest
105 1 : }
106 1 : if v := cmp(b.largest, other.largest); v < 0 || (v == 0 && b.largestExcl) {
107 1 : b.largest = other.largest
108 1 : b.largestExcl = other.largestExcl
109 1 : }
110 : }
111 :
112 : // keyManager tracks the write operations performed on keys in the generation
113 : // phase of the metamorphic test. It maintains histories of operations performed
114 : // against every unique user key on every writer object. These histories inform
115 : // operation generation in order to maintain invariants that Pebble requires of
116 : // end users, mostly around single deletions.
117 : //
118 : // A single deletion has a subtle requirement of the writer:
119 : //
120 : // W1: The writer may only single delete a key `k` if `k` has been Set once
121 : // (and never MergeD) since the last delete.
122 : //
123 : // When a SINGLEDEL key deletes a SET key within a compaction, both the SET and
124 : // the SINGLEDEL keys are elided. If multiple SETs of the key exist within the
125 : // LSM, the SINGLEDEL reveals the lower SET. This behavior is dependent on the
126 : // internal LSM state and nondeterministic. To ensure determinism, the end user
127 : // must satisfy W1 and use single delete only when they can guarantee that the
128 : // key has been set at most once since the last delete, preventing this rollback
129 : // to a previous value.
130 : //
131 : // This W1 invariant requires a delicate dance during operation generation,
132 : // because independent batches may be independently built and committed. With
133 : // multi-instance variants of the metamorphic tests, keys in batches may
134 : // ultimately be committed to any of several DB instances. To satisfy these
135 : // requirements, the key manager tracks the history of every key on every
136 : // writable object. When generating a new single deletion operation, the
137 : // generator asks the key manager for a set of keys for which a single delete
138 : // maintains the W1 invariant within the object itself. This object-local W1
139 : // invariant (OLW1) is equivalent to W1 if one only ever performs write
140 : // operations directly against individual DB objects.
141 : //
142 : // However with the existence of batches that receive writes independent of DB
143 : // objects, W1 may be violated by appending the histories of two objects that
144 : // independently satisfy OLW1. Consider a sequence such as:
145 : //
146 : // 1. db1.Set("foo")
147 : // 2. batch1.Set("foo")
148 : // 3. batch1.SingleDelete("foo")
149 : // 4. db1.Apply(batch1)
150 : //
151 : // Both db1 and batch1 satisfy the object-local invariant OLW1. However the
152 : // composition of the histories created by appending batch1's operations to db1
153 : // creates a history that now violates W1 on db1. To detect this violation,
154 : // batch applications/commits and ingestions examine the tail of the destination
155 : // object's history and the head of the source batch's history. When a violation
156 : // is detected, these operations insert additional Delete operations to clear
157 : // the conflicting keys before proceeding with the conflicting operation. These
158 : // deletes reset the key history.
159 : //
160 : // Note that this generation-time key tracking requires that operations be
161 : // infallible, because a runtime failure would cause the key manager's state to
162 : // diverge from the runtime object state. Ingestion operations pose an obstacle,
163 : // because the generator may generate ingestions that fail due to overlapping
164 : // sstables. Today, this complication is sidestepped by avoiding ingestion of
165 : // multiple batches containing deletes or single deletes since loss of those
166 : // specific operations on a key are what we cannot tolerate (doing SingleDelete
167 : // on a key that has not been written to because the Set was lost is harmless).
168 : //
169 : // TODO(jackson): Instead, compute smallest and largest bounds of batches so
170 : // that we know at generation-time whether or not an ingestion operation will
171 : // fail and can avoid updating key state.
172 : type keyManager struct {
173 : comparer *base.Comparer
174 :
175 : // metaTimestamp is used to provide a ordering over certain operations like
176 : // iter creation, updates to keys. Keeping track of the timestamp allows us
177 : // to make determinations such as whether a key will be visible to an
178 : // iterator.
179 : metaTimestamp int
180 :
181 : byObj map[objID]*objKeyMeta
182 : // globalKeys represents all the keys that have been generated so far. Not
183 : // all these keys have been written to. globalKeys is sorted.
184 : globalKeys [][]byte
185 : // globalKeysMap contains the same keys as globalKeys but in a map. It
186 : // ensures no duplication.
187 : globalKeysMap map[string]bool
188 : // globalKeyPrefixes contains all the key prefixes (as defined by the
189 : // comparer's Split) generated so far. globalKeyPrefixes is sorted.
190 : globalKeyPrefixes [][]byte
191 : // globalKeyPrefixesMap contains the same keys as globalKeyPrefixes. It
192 : // ensures no duplication.
193 : globalKeyPrefixesMap map[string]struct{}
194 : }
195 :
196 : type objKeyMeta struct {
197 : id objID
198 : // List of keys, and what has happened to each in this object.
199 : // Will be transferred when needed.
200 : keys map[string]*keyMeta
201 : // bounds holds user key bounds encompassing all the keys set within an
202 : // object. It's updated within `update` when a new op is generated.
203 : bounds bounds
204 : }
205 :
206 : // MergeKey adds the given key at the given meta timestamp, merging the histories as needed.
207 1 : func (okm *objKeyMeta) MergeKey(k *keyMeta, ts int) {
208 1 : meta, ok := okm.keys[string(k.key)]
209 1 : if !ok {
210 1 : meta = &keyMeta{
211 1 : objID: okm.id,
212 1 : key: k.key,
213 1 : }
214 1 : okm.keys[string(k.key)] = meta
215 1 : }
216 1 : k.mergeInto(meta, ts)
217 : }
218 :
219 : // CollapseKeys collapses the history of all keys. Used with ingestion operation
220 : // which only use the last value of any given key.
221 1 : func (okm *objKeyMeta) CollapseKeys() {
222 1 : for _, keyMeta := range okm.keys {
223 1 : keyMeta.history = keyMeta.history.collapsed()
224 1 : }
225 : }
226 :
227 : // objKeyMeta looks up the objKeyMeta for a given object, creating it if necessary.
228 1 : func (k *keyManager) objKeyMeta(o objID) *objKeyMeta {
229 1 : m, ok := k.byObj[o]
230 1 : if !ok {
231 1 : m = &objKeyMeta{
232 1 : id: o,
233 1 : keys: make(map[string]*keyMeta),
234 1 : }
235 1 : k.byObj[o] = m
236 1 : }
237 1 : return m
238 : }
239 :
240 : // sortedKeysForObj returns all the values in objKeyMeta(o).keys, in sorted
241 : // order.
242 1 : func (k *keyManager) sortedKeysForObj(o objID) []*keyMeta {
243 1 : okm := k.objKeyMeta(o)
244 1 : res := make([]*keyMeta, 0, len(okm.keys))
245 1 : for _, m := range okm.keys {
246 1 : res = append(res, m)
247 1 : }
248 1 : slices.SortFunc(res, func(a, b *keyMeta) int {
249 1 : cmp := k.comparer.Compare(a.key, b.key)
250 1 : if cmp == 0 {
251 0 : panic(fmt.Sprintf("distinct keys %q and %q compared as equal", a.key, b.key))
252 : }
253 1 : return cmp
254 : })
255 1 : return res
256 : }
257 :
258 1 : func (k *keyManager) nextMetaTimestamp() int {
259 1 : ret := k.metaTimestamp
260 1 : k.metaTimestamp++
261 1 : return ret
262 1 : }
263 :
264 : // newKeyManager returns a pointer to a new keyManager. Callers should
265 : // interact with this using addNewKey, knownKeys, update methods only.
266 1 : func newKeyManager(numInstances int) *keyManager {
267 1 : m := &keyManager{
268 1 : comparer: testkeys.Comparer,
269 1 : byObj: make(map[objID]*objKeyMeta),
270 1 : globalKeysMap: make(map[string]bool),
271 1 : globalKeyPrefixesMap: make(map[string]struct{}),
272 1 : }
273 1 : for i := 1; i <= max(numInstances, 1); i++ {
274 1 : m.objKeyMeta(makeObjID(dbTag, uint32(i)))
275 1 : }
276 1 : return m
277 : }
278 :
279 : // addNewKey adds the given key to the key manager for global key tracking.
280 : // Returns false iff this is not a new key.
281 1 : func (k *keyManager) addNewKey(key []byte) bool {
282 1 : if k.globalKeysMap[string(key)] {
283 1 : return false
284 1 : }
285 1 : insertSorted(k.comparer.Compare, &k.globalKeys, key)
286 1 : k.globalKeysMap[string(key)] = true
287 1 :
288 1 : prefixLen := k.comparer.Split(key)
289 1 : if _, ok := k.globalKeyPrefixesMap[string(key[:prefixLen])]; !ok {
290 1 : insertSorted(k.comparer.Compare, &k.globalKeyPrefixes, key[:prefixLen])
291 1 : k.globalKeyPrefixesMap[string(key[:prefixLen])] = struct{}{}
292 1 : }
293 1 : return true
294 : }
295 :
296 : // getOrInit returns the keyMeta for the (objID, key) pair, if it exists, else
297 : // allocates, initializes and returns a new value.
298 1 : func (k *keyManager) getOrInit(id objID, key []byte) *keyMeta {
299 1 : objKeys := k.objKeyMeta(id)
300 1 : m, ok := objKeys.keys[string(key)]
301 1 : if ok {
302 1 : return m
303 1 : }
304 1 : m = &keyMeta{
305 1 : objID: id,
306 1 : key: key,
307 1 : }
308 1 : // Initialize the key-to-meta index.
309 1 : objKeys.keys[string(key)] = m
310 1 : // Expand the object's bounds to contain this key if they don't already.
311 1 : objKeys.bounds.Expand(k.comparer.Compare, bounds{
312 1 : smallest: key,
313 1 : largest: key,
314 1 : })
315 1 : return m
316 : }
317 :
318 : // mergeKeysInto merges the keys and bounds from an object into another. Assumes
319 : // the source object will not be used again.
320 1 : func (k *keyManager) mergeKeysInto(from, to objID) {
321 1 : fromMeta := k.objKeyMeta(from)
322 1 : toMeta := k.objKeyMeta(to)
323 1 : ts := k.nextMetaTimestamp()
324 1 : // The result should be the same, regardless of the ordering of the keys.
325 1 : for _, keyMeta := range fromMeta.keys {
326 1 : toMeta.MergeKey(keyMeta, ts)
327 1 : }
328 : // All the keys in `from` have been merged into `to`. Expand `to`'s bounds
329 : // to be at least as wide as `from`'s.
330 1 : toMeta.bounds.Expand(k.comparer.Compare, fromMeta.bounds)
331 1 :
332 1 : delete(k.byObj, from) // Unlink "from" obj.
333 : }
334 :
335 : // expandBounds expands the incrementally maintained bounds of o to be at least
336 : // as wide as `b`.
337 1 : func (k *keyManager) expandBounds(o objID, b bounds) {
338 1 : k.objKeyMeta(o).bounds.Expand(k.comparer.Compare, b)
339 1 : }
340 :
341 : // doObjectBoundsOverlap returns true iff any of the named objects have key
342 : // bounds that overlap any other named object.
343 1 : func (k *keyManager) doObjectBoundsOverlap(objIDs []objID) bool {
344 1 : for i := range objIDs {
345 1 : ib, iok := k.byObj[objIDs[i]]
346 1 : if !iok {
347 1 : continue
348 : }
349 1 : for j := i + 1; j < len(objIDs); j++ {
350 1 : jb, jok := k.byObj[objIDs[j]]
351 1 : if !jok {
352 1 : continue
353 : }
354 1 : if ib.bounds.Overlaps(k.comparer.Compare, jb.bounds) {
355 1 : return true
356 1 : }
357 : }
358 : }
359 1 : return false
360 : }
361 :
362 : // checkForSingleDelConflicts examines all the keys written to srcObj, and
363 : // determines whether any of the contained single deletes would be
364 : // nondeterministic if applied to dstObj in dstObj's current state. It returns a
365 : // slice of all the keys that are found to conflict. In order to preserve
366 : // determinism, the caller must delete the key from the destination before
367 : // writing src's mutations to dst in order to ensure determinism.
368 : //
369 : // It takes a `srcCollapsed` parameter that determines whether the source
370 : // history should be "collapsed" (see keyHistory.collapsed) before determining
371 : // whether the applied state will conflict. This is required to facilitate
372 : // ingestOps which are NOT equivalent to committing the batch, because they can
373 : // only commit 1 internal point key at each unique user key.
374 1 : func (k *keyManager) checkForSingleDelConflicts(srcObj, dstObj objID, srcCollapsed bool) [][]byte {
375 1 : dstKeys := k.objKeyMeta(dstObj)
376 1 : var conflicts [][]byte
377 1 : for _, src := range k.sortedKeysForObj(srcObj) {
378 1 : // Single delete generation logic already ensures that both srcObj and
379 1 : // dstObj's single deletes are deterministic within the context of their
380 1 : // existing writes. However, applying srcObj on top of dstObj may
381 1 : // violate the invariants. Consider:
382 1 : //
383 1 : // src: a.SET; a.SINGLEDEL;
384 1 : // dst: a.SET;
385 1 : //
386 1 : // The merged view is:
387 1 : //
388 1 : // a.SET; a.SET; a.SINGLEDEL;
389 1 : //
390 1 : // This is invalid, because there is more than 1 value mutation of the
391 1 : // key before the single delete.
392 1 : //
393 1 : // We walk the source object's history in chronological order, looking
394 1 : // for a single delete that was written before a DEL/RANGEDEL. (NB: We
395 1 : // don't need to look beyond a DEL/RANGEDEL, because these deletes bound
396 1 : // any subsequently-written single deletes to applying to the keys
397 1 : // within src's history between the two tombstones. We already know from
398 1 : // per-object history invariants that any such single delete must be
399 1 : // deterministic with respect to src's keys.)
400 1 : var srcHasUnboundedSingleDelete bool
401 1 : var srcValuesBeforeSingleDelete int
402 1 :
403 1 : // When the srcObj is being ingested (srcCollapsed=t), the semantics
404 1 : // change. We must first "collapse" the key's history to represent the
405 1 : // ingestion semantics.
406 1 : srcHistory := src.history
407 1 : if srcCollapsed {
408 1 : srcHistory = src.history.collapsed()
409 1 : }
410 :
411 1 : srcloop:
412 1 : for _, item := range srcHistory {
413 1 : switch item.opType {
414 1 : case OpWriterDelete, OpWriterDeleteRange:
415 1 : // We found a DEL or RANGEDEL before any single delete. If src
416 1 : // contains additional single deletes, their effects are limited
417 1 : // to applying to later keys. Combining the two object histories
418 1 : // doesn't pose any determinism risk.
419 1 : break srcloop
420 1 : case OpWriterSingleDelete:
421 1 : // We found a single delete. Since we found this single delete
422 1 : // before a DEL or RANGEDEL, this delete has the potential to
423 1 : // affect the visibility of keys in `dstObj`. We'll need to look
424 1 : // for potential conflicts down below.
425 1 : srcHasUnboundedSingleDelete = true
426 1 : if srcValuesBeforeSingleDelete > 1 {
427 0 : panic(errors.AssertionFailedf("unexpectedly found %d sets/merges within %s before single del",
428 0 : srcValuesBeforeSingleDelete, srcObj))
429 : }
430 1 : break srcloop
431 1 : case OpWriterSet, OpWriterMerge:
432 1 : // We found a SET or MERGE operation for this key. If there's a
433 1 : // subsequent single delete, we'll need to make sure there's not
434 1 : // a SET or MERGE in the dst too.
435 1 : srcValuesBeforeSingleDelete++
436 0 : default:
437 0 : panic(errors.AssertionFailedf("unexpected optype %d", item.opType))
438 : }
439 : }
440 1 : if !srcHasUnboundedSingleDelete {
441 1 : continue
442 : }
443 :
444 1 : dst, ok := dstKeys.keys[string(src.key)]
445 1 : // If the destination writer has no record of the key, the combined key
446 1 : // history is simply the src object's key history which is valid due to
447 1 : // per-object single deletion invariants.
448 1 : if !ok {
449 1 : continue
450 : }
451 :
452 : // We need to examine the trailing key history on dst.
453 1 : consecutiveValues := srcValuesBeforeSingleDelete
454 1 : dstloop:
455 1 : for i := len(dst.history) - 1; i >= 0; i-- {
456 1 : switch dst.history[i].opType {
457 1 : case OpWriterSet, OpWriterMerge:
458 1 : // A SET/MERGE may conflict if there's more than 1 consecutive
459 1 : // SET/MERGEs.
460 1 : consecutiveValues++
461 1 : if consecutiveValues > 1 {
462 1 : conflicts = append(conflicts, src.key)
463 1 : break dstloop
464 : }
465 0 : case OpWriterDelete, OpWriterSingleDelete, OpWriterDeleteRange:
466 0 : // Dels clear the history, enabling use of single delete.
467 0 : break dstloop
468 0 : default:
469 0 : panic(errors.AssertionFailedf("unexpected optype %d", dst.history[i].opType))
470 : }
471 : }
472 : }
473 1 : return conflicts
474 : }
475 :
476 : // update updates the internal state of the keyManager according to the given
477 : // op.
478 1 : func (k *keyManager) update(o op) {
479 1 : switch s := o.(type) {
480 1 : case *setOp:
481 1 : meta := k.getOrInit(s.writerID, s.key)
482 1 : meta.history = append(meta.history, keyHistoryItem{
483 1 : opType: OpWriterSet,
484 1 : metaTimestamp: k.nextMetaTimestamp(),
485 1 : })
486 1 : case *mergeOp:
487 1 : meta := k.getOrInit(s.writerID, s.key)
488 1 : meta.history = append(meta.history, keyHistoryItem{
489 1 : opType: OpWriterMerge,
490 1 : metaTimestamp: k.nextMetaTimestamp(),
491 1 : })
492 1 : case *deleteOp:
493 1 : meta := k.getOrInit(s.writerID, s.key)
494 1 : if meta.objID.tag() == dbTag {
495 1 : meta.clear()
496 1 : } else {
497 1 : meta.history = append(meta.history, keyHistoryItem{
498 1 : opType: OpWriterDelete,
499 1 : metaTimestamp: k.nextMetaTimestamp(),
500 1 : })
501 1 : }
502 1 : case *deleteRangeOp:
503 1 : // We track the history of discrete point keys, but a range deletion
504 1 : // applies over a continuous key span of infinite keys. However, the key
505 1 : // manager knows all keys that have been used in all operations, so we
506 1 : // can discretize the range tombstone by adding it to every known key
507 1 : // within the range.
508 1 : ts := k.nextMetaTimestamp()
509 1 : keyRange := pebble.KeyRange{Start: s.start, End: s.end}
510 1 : for _, key := range k.knownKeysInRange(keyRange) {
511 1 : meta := k.getOrInit(s.writerID, key)
512 1 : if meta.objID.tag() == dbTag {
513 1 : meta.clear()
514 1 : } else {
515 1 : meta.history = append(meta.history, keyHistoryItem{
516 1 : opType: OpWriterDeleteRange,
517 1 : metaTimestamp: ts,
518 1 : })
519 1 : }
520 : }
521 1 : k.expandBounds(s.writerID, bounds{
522 1 : smallest: s.start,
523 1 : largest: s.end,
524 1 : largestExcl: true,
525 1 : })
526 1 : case *singleDeleteOp:
527 1 : meta := k.getOrInit(s.writerID, s.key)
528 1 : meta.history = append(meta.history, keyHistoryItem{
529 1 : opType: OpWriterSingleDelete,
530 1 : metaTimestamp: k.nextMetaTimestamp(),
531 1 : })
532 1 : case *rangeKeyDeleteOp:
533 1 : // Range key operations on their own don't determine singledel eligibility,
534 1 : // however range key operations could be part of a batch which contains
535 1 : // other operations that do affect it. If those batches were to get
536 1 : // ingested, we'd need to know what the bounds of sstables generated out
537 1 : // of those batches are, as that determines whether that ingestion
538 1 : // will succeed or not.
539 1 : k.expandBounds(s.writerID, bounds{
540 1 : smallest: s.start,
541 1 : largest: s.end,
542 1 : largestExcl: true,
543 1 : })
544 1 : case *rangeKeySetOp:
545 1 : k.expandBounds(s.writerID, bounds{
546 1 : smallest: s.start,
547 1 : largest: s.end,
548 1 : largestExcl: true,
549 1 : })
550 1 : case *rangeKeyUnsetOp:
551 1 : k.expandBounds(s.writerID, bounds{
552 1 : smallest: s.start,
553 1 : largest: s.end,
554 1 : largestExcl: true,
555 1 : })
556 1 : case *ingestOp:
557 1 : // Some ingestion operations may attempt to ingest overlapping sstables
558 1 : // which is prohibited. We know at generation time whether these
559 1 : // ingestions will be successful. If they won't be successful, we should
560 1 : // not update the key state because both the batch(es) and target DB
561 1 : // will be left unmodified.
562 1 : if k.doObjectBoundsOverlap(s.batchIDs) {
563 1 : // This ingestion will fail.
564 1 : return
565 1 : }
566 :
567 : // For each batch, merge the keys into the DB. We can't call
568 : // keyMeta.mergeInto directly to merge, because ingest operations first
569 : // "flatten" the batch (because you can't set the same key twice at a
570 : // single sequence number). Instead we compute the collapsed history and
571 : // merge that.
572 1 : for _, batchID := range s.batchIDs {
573 1 : k.objKeyMeta(batchID).CollapseKeys()
574 1 : k.mergeKeysInto(batchID, s.dbID)
575 1 : }
576 : // TODO(bilal): Handle ingestAndExciseOp and replicateOp here. We currently
577 : // disable SingleDelete when these operations are enabled (see
578 : // multiInstanceConfig).
579 1 : case *applyOp:
580 1 : // Merge the keys from this writer into the parent writer.
581 1 : k.mergeKeysInto(s.batchID, s.writerID)
582 1 : case *batchCommitOp:
583 1 : // Merge the keys from the batch with the keys from the DB.
584 1 : k.mergeKeysInto(s.batchID, s.dbID)
585 : }
586 : }
587 :
588 1 : func (k *keyManager) knownKeys() (keys [][]byte) {
589 1 : return k.globalKeys
590 1 : }
591 :
592 : // knownKeysInRange returns all eligible read keys within the range
593 : // [start,end). The returned slice is owned by the keyManager and must not be
594 : // retained.
595 1 : func (k *keyManager) knownKeysInRange(kr pebble.KeyRange) (keys [][]byte) {
596 1 : s, _ := slices.BinarySearchFunc(k.globalKeys, kr.Start, k.comparer.Compare)
597 1 : e, _ := slices.BinarySearchFunc(k.globalKeys, kr.End, k.comparer.Compare)
598 1 : if s >= e {
599 1 : return nil
600 1 : }
601 1 : return k.globalKeys[s:e]
602 : }
603 :
604 1 : func (k *keyManager) prefixes() (prefixes [][]byte) {
605 1 : return k.globalKeyPrefixes
606 1 : }
607 :
608 : // prefixExists returns true if a key has been generated with the provided
609 : // prefix before.
610 1 : func (k *keyManager) prefixExists(prefix []byte) bool {
611 1 : _, exists := k.globalKeyPrefixesMap[string(prefix)]
612 1 : return exists
613 1 : }
614 :
615 : // eligibleSingleDeleteKeys returns a slice of keys that can be safely single
616 : // deleted, given the writer id. Restricting single delete keys through this
617 : // method is used to ensure the OLW1 guarantee (see the keyManager comment) for
618 : // the provided object ID.
619 1 : func (k *keyManager) eligibleSingleDeleteKeys(o objID) (keys [][]byte) {
620 1 : // Creating a slice of keys is wasteful given that the caller will pick one,
621 1 : // but makes it simpler for unit testing.
622 1 : objKeys := k.objKeyMeta(o)
623 1 : for _, key := range k.globalKeys {
624 1 : meta, ok := objKeys.keys[string(key)]
625 1 : if !ok {
626 1 : keys = append(keys, key)
627 1 : continue
628 : }
629 : // Examine the history within this object.
630 1 : if meta.history.canSingleDelete() {
631 1 : keys = append(keys, key)
632 1 : }
633 : }
634 1 : return keys
635 : }
636 :
637 : // a keyHistoryItem describes an individual operation performed on a key.
638 : type keyHistoryItem struct {
639 : // opType may be writerSet, writerDelete, writerSingleDelete,
640 : // writerDeleteRange or writerMerge only. No other opTypes may appear here.
641 : opType OpType
642 : metaTimestamp int
643 : }
644 :
645 : // keyHistory captures the history of mutations to a key in chronological order.
646 : type keyHistory []keyHistoryItem
647 :
648 : // before returns the subslice of the key history that happened strictly before
649 : // the provided meta timestamp.
650 0 : func (h keyHistory) before(metaTimestamp int) keyHistory {
651 0 : i, _ := slices.BinarySearchFunc(h, metaTimestamp, func(a keyHistoryItem, ts int) int {
652 0 : return cmp.Compare(a.metaTimestamp, ts)
653 0 : })
654 0 : return h[:i]
655 : }
656 :
657 : // canSingleDelete examines the tail of the history and returns true if a single
658 : // delete appended to this history would satisfy the single delete invariants.
659 1 : func (h keyHistory) canSingleDelete() bool {
660 1 : if len(h) == 0 {
661 1 : return true
662 1 : }
663 1 : switch o := h[len(h)-1].opType; o {
664 1 : case OpWriterDelete, OpWriterDeleteRange, OpWriterSingleDelete:
665 1 : return true
666 1 : case OpWriterSet, OpWriterMerge:
667 1 : if len(h) == 1 {
668 1 : return true
669 1 : }
670 1 : return h[len(h)-2].opType.isDelete()
671 0 : default:
672 0 : panic(errors.AssertionFailedf("unexpected writer op %v", o))
673 : }
674 : }
675 :
676 0 : func (h keyHistory) String() string {
677 0 : var sb strings.Builder
678 0 : for i, it := range h {
679 0 : if i > 0 {
680 0 : fmt.Fprint(&sb, ", ")
681 0 : }
682 0 : switch it.opType {
683 0 : case OpWriterDelete:
684 0 : fmt.Fprint(&sb, "del")
685 0 : case OpWriterDeleteRange:
686 0 : fmt.Fprint(&sb, "delrange")
687 0 : case OpWriterSingleDelete:
688 0 : fmt.Fprint(&sb, "singledel")
689 0 : case OpWriterSet:
690 0 : fmt.Fprint(&sb, "set")
691 0 : case OpWriterMerge:
692 0 : fmt.Fprint(&sb, "merge")
693 0 : default:
694 0 : fmt.Fprintf(&sb, "optype[v=%d]", it.opType)
695 : }
696 0 : fmt.Fprintf(&sb, "(%d)", it.metaTimestamp)
697 : }
698 0 : return sb.String()
699 : }
700 :
701 : // hasVisibleKey examines the tail of the history and returns true if the
702 : // history should end in a visible value for this key.
703 0 : func (h keyHistory) hasVisibleValue() bool {
704 0 : if len(h) == 0 {
705 0 : return false
706 0 : }
707 0 : return !h[len(h)-1].opType.isDelete()
708 : }
709 :
710 : // collapsed returns a new key history that's equivalent to the history created
711 : // by an ingestOp that "collapses" a batch's keys. See ingestOp.build.
712 1 : func (h keyHistory) collapsed() keyHistory {
713 1 : var ret keyHistory
714 1 : // When collapsing a batch, any range deletes are semantically applied
715 1 : // first. Look for any range deletes and apply them.
716 1 : for _, op := range h {
717 1 : if op.opType == OpWriterDeleteRange {
718 1 : ret = append(ret, op)
719 1 : break
720 : }
721 : }
722 : // Among point keys, the most recently written key wins.
723 1 : for i := len(h) - 1; i >= 0; i-- {
724 1 : if h[i].opType != OpWriterDeleteRange {
725 1 : ret = append(ret, h[i])
726 1 : break
727 : }
728 : }
729 1 : return ret
730 : }
731 :
732 1 : func opWrittenKeys(untypedOp op) [][]byte {
733 1 : switch t := untypedOp.(type) {
734 1 : case *applyOp:
735 1 : case *batchCommitOp:
736 1 : case *checkpointOp:
737 1 : case *closeOp:
738 1 : case *compactOp:
739 1 : case *dbRestartOp:
740 1 : case *deleteOp:
741 1 : return [][]byte{t.key}
742 1 : case *deleteRangeOp:
743 1 : return [][]byte{t.start, t.end}
744 1 : case *flushOp:
745 1 : case *getOp:
746 1 : case *ingestOp:
747 1 : case *initOp:
748 1 : case *iterFirstOp:
749 1 : case *iterLastOp:
750 1 : case *iterNextOp:
751 1 : case *iterNextPrefixOp:
752 1 : case *iterCanSingleDelOp:
753 1 : case *iterPrevOp:
754 1 : case *iterSeekGEOp:
755 1 : case *iterSeekLTOp:
756 1 : case *iterSeekPrefixGEOp:
757 1 : case *iterSetBoundsOp:
758 1 : case *iterSetOptionsOp:
759 1 : case *mergeOp:
760 1 : return [][]byte{t.key}
761 1 : case *newBatchOp:
762 1 : case *newIndexedBatchOp:
763 1 : case *newIterOp:
764 1 : case *newIterUsingCloneOp:
765 1 : case *newSnapshotOp:
766 1 : case *rangeKeyDeleteOp:
767 1 : case *rangeKeySetOp:
768 1 : case *rangeKeyUnsetOp:
769 1 : case *setOp:
770 1 : return [][]byte{t.key}
771 1 : case *singleDeleteOp:
772 1 : return [][]byte{t.key}
773 1 : case *replicateOp:
774 1 : return [][]byte{t.start, t.end}
775 : }
776 1 : return nil
777 : }
778 :
779 1 : func loadPrecedingKeys(t TestingT, ops []op, cfg *OpConfig, m *keyManager) {
780 1 : for _, op := range ops {
781 1 : // Pretend we're generating all the operation's keys as potential new
782 1 : // key, so that we update the key manager's keys and prefix sets.
783 1 : for _, k := range opWrittenKeys(op) {
784 1 : m.addNewKey(k)
785 1 :
786 1 : // If the key has a suffix, ratchet up the suffix distribution if
787 1 : // necessary.
788 1 : if s := m.comparer.Split(k); s < len(k) {
789 1 : suffix, err := testkeys.ParseSuffix(k[s:])
790 1 : require.NoError(t, err)
791 1 : if uint64(suffix) > cfg.writeSuffixDist.Max() {
792 1 : diff := int(uint64(suffix) - cfg.writeSuffixDist.Max())
793 1 : cfg.writeSuffixDist.IncMax(diff)
794 1 : }
795 : }
796 : }
797 :
798 : // Update key tracking state.
799 1 : m.update(op)
800 : }
801 : }
802 :
803 1 : func insertSorted(cmp base.Compare, dst *[][]byte, k []byte) {
804 1 : s := *dst
805 1 : i, _ := slices.BinarySearchFunc(s, k, cmp)
806 1 : *dst = slices.Insert(s, i, k)
807 1 : }
|