Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "fmt"
9 : "slices"
10 : "strings"
11 : "unsafe"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/testkeys"
17 : "github.com/stretchr/testify/require"
18 : )
19 :
20 : // keyMeta is metadata associated with an (objID, key) pair, where objID is
21 : // a writer containing the key.
22 : type keyMeta struct {
23 : objID objID
24 : key []byte
25 : // history provides the history of writer operations applied against this
26 : // key on this object. history is always ordered in chronological order.
27 : history keyHistory
28 : }
29 :
30 1 : func (m *keyMeta) clear() {
31 1 : m.history = m.history[:0]
32 1 : }
33 :
34 : // mergeInto merges this metadata into the metadata for other, appending all of
35 : // its individual operations to dst at the provided timestamp.
36 1 : func (m *keyMeta) mergeInto(dst *keyMeta) {
37 1 : for _, op := range m.history {
38 1 : // If the key is being merged into a database object and the operation
39 1 : // is a delete, we can clear the destination history. Database objects
40 1 : // are end points in the merging of keys and won't be the source of a
41 1 : // future merge. Deletions cause all other operations to behave as
42 1 : // though the key was never written to the database at all, so we don't
43 1 : // need to consider it for maintaining single delete invariants.
44 1 : //
45 1 : // NB: There's a subtlety here in that isDelete() will return true if
46 1 : // opType is a writerSingleDelete, but single deletes are capable of
47 1 : // leaking information about the history of writes. However, that's
48 1 : // okay, because as long as we're properly generating single deletes
49 1 : // according to the W1 invariant described in keyManager's comment, a
50 1 : // single delete is equivalent to delete for the current history.
51 1 : if dst.objID.tag() == dbTag && op.opType.isDelete() {
52 1 : dst.clear()
53 1 : continue
54 : }
55 1 : dst.history = append(dst.history, keyHistoryItem{
56 1 : opType: op.opType,
57 1 : })
58 : }
59 : }
60 :
61 : type bounds struct {
62 : smallest []byte
63 : largest []byte
64 : largestExcl bool // is largest exclusive?
65 : }
66 :
67 1 : func (b bounds) checkValid(cmp base.Compare) {
68 1 : if c := cmp(b.smallest, b.largest); c > 0 {
69 0 : panic(fmt.Sprintf("invalid bound [%q, %q]", b.smallest, b.largest))
70 1 : } else if c == 0 && b.largestExcl {
71 0 : panic(fmt.Sprintf("invalid bound [%q, %q)", b.smallest, b.largest))
72 : }
73 : }
74 :
75 1 : func (b bounds) String() string {
76 1 : if b.largestExcl {
77 1 : return fmt.Sprintf("[%q,%q)", b.smallest, b.largest)
78 1 : }
79 1 : return fmt.Sprintf("[%q,%q]", b.smallest, b.largest)
80 : }
81 :
82 : // Overlaps returns true iff the bounds intersect.
83 1 : func (b *bounds) Overlaps(cmp base.Compare, other bounds) bool {
84 1 : if b.IsUnset() || other.IsUnset() {
85 1 : return false
86 1 : }
87 : // Is b strictly before other?
88 1 : if v := cmp(b.largest, other.smallest); v < 0 || (v == 0 && b.largestExcl) {
89 1 : return false
90 1 : }
91 : // Is b strictly after other?
92 1 : if v := cmp(b.smallest, other.largest); v > 0 || (v == 0 && other.largestExcl) {
93 0 : return false
94 0 : }
95 1 : return true
96 : }
97 :
98 : // IsUnset returns true if the bounds haven't been set.
99 1 : func (b bounds) IsUnset() bool {
100 1 : return b.smallest == nil && b.largest == nil
101 1 : }
102 :
103 : // Expand potentially expands the receiver bounds to include the other given
104 : // bounds. If the receiver is unset, the other bounds are copied.
105 1 : func (b *bounds) Expand(cmp base.Compare, other bounds) {
106 1 : if other.IsUnset() {
107 1 : return
108 1 : }
109 1 : other.checkValid(cmp)
110 1 : if b.IsUnset() {
111 1 : *b = other
112 1 : return
113 1 : }
114 1 : if cmp(b.smallest, other.smallest) > 0 {
115 1 : b.smallest = other.smallest
116 1 : }
117 1 : if v := cmp(b.largest, other.largest); v < 0 || (v == 0 && b.largestExcl) {
118 1 : b.largest = other.largest
119 1 : b.largestExcl = other.largestExcl
120 1 : }
121 : }
122 :
123 : // keyManager tracks the write operations performed on keys in the generation
124 : // phase of the metamorphic test. It maintains histories of operations performed
125 : // against every unique user key on every writer object. These histories inform
126 : // operation generation in order to maintain invariants that Pebble requires of
127 : // end users, mostly around single deletions.
128 : //
129 : // A single deletion has a subtle requirement of the writer:
130 : //
131 : // W1: The writer may only single delete a key `k` if `k` has been Set once
132 : // (and never MergeD) since the last delete.
133 : //
134 : // When a SINGLEDEL key deletes a SET key within a compaction, both the SET and
135 : // the SINGLEDEL keys are elided. If multiple SETs of the key exist within the
136 : // LSM, the SINGLEDEL reveals the lower SET. This behavior is dependent on the
137 : // internal LSM state and nondeterministic. To ensure determinism, the end user
138 : // must satisfy W1 and use single delete only when they can guarantee that the
139 : // key has been set at most once since the last delete, preventing this rollback
140 : // to a previous value.
141 : //
142 : // This W1 invariant requires a delicate dance during operation generation,
143 : // because independent batches may be independently built and committed. With
144 : // multi-instance variants of the metamorphic tests, keys in batches may
145 : // ultimately be committed to any of several DB instances. To satisfy these
146 : // requirements, the key manager tracks the history of every key on every
147 : // writable object. When generating a new single deletion operation, the
148 : // generator asks the key manager for a set of keys for which a single delete
149 : // maintains the W1 invariant within the object itself. This object-local W1
150 : // invariant (OLW1) is equivalent to W1 if one only ever performs write
151 : // operations directly against individual DB objects.
152 : //
153 : // However with the existence of batches that receive writes independent of DB
154 : // objects, W1 may be violated by appending the histories of two objects that
155 : // independently satisfy OLW1. Consider a sequence such as:
156 : //
157 : // 1. db1.Set("foo")
158 : // 2. batch1.Set("foo")
159 : // 3. batch1.SingleDelete("foo")
160 : // 4. db1.Apply(batch1)
161 : //
162 : // Both db1 and batch1 satisfy the object-local invariant OLW1. However the
163 : // composition of the histories created by appending batch1's operations to db1
164 : // creates a history that now violates W1 on db1. To detect this violation,
165 : // batch applications/commits and ingestions examine the tail of the destination
166 : // object's history and the head of the source batch's history. When a violation
167 : // is detected, these operations insert additional Delete operations to clear
168 : // the conflicting keys before proceeding with the conflicting operation. These
169 : // deletes reset the key history.
170 : //
171 : // Note that this generation-time key tracking requires that operations be
172 : // infallible, because a runtime failure would cause the key manager's state to
173 : // diverge from the runtime object state. Ingestion operations pose an obstacle,
174 : // because the generator may generate ingestions that fail due to overlapping
175 : // sstables. Today, this complication is sidestepped by avoiding ingestion of
176 : // multiple batches containing deletes or single deletes since loss of those
177 : // specific operations on a key are what we cannot tolerate (doing SingleDelete
178 : // on a key that has not been written to because the Set was lost is harmless).
179 : //
180 : // TODO(jackson): Instead, compute smallest and largest bounds of batches so
181 : // that we know at generation-time whether or not an ingestion operation will
182 : // fail and can avoid updating key state.
183 : type keyManager struct {
184 : comparer *base.Comparer
185 :
186 : byObj map[objID]*objKeyMeta
187 : // globalKeys represents all the keys that have been generated so far. Not
188 : // all these keys have been written to. globalKeys is sorted.
189 : globalKeys [][]byte
190 : // globalKeysMap contains the same keys as globalKeys but in a map. It
191 : // ensures no duplication.
192 : globalKeysMap map[string]bool
193 : // globalKeyPrefixes contains all the key prefixes (as defined by the
194 : // comparer's Split) generated so far. globalKeyPrefixes is sorted.
195 : globalKeyPrefixes [][]byte
196 : // globalKeyPrefixesMap contains the same keys as globalKeyPrefixes. It
197 : // ensures no duplication.
198 : globalKeyPrefixesMap map[string]struct{}
199 : }
200 :
201 : type objKeyMeta struct {
202 : id objID
203 : // List of keys, and what has happened to each in this object.
204 : // Will be transferred when needed.
205 : keys map[string]*keyMeta
206 : // bounds holds user key bounds encompassing all the keys set within an
207 : // object. It's updated within `update` when a new op is generated.
208 : bounds bounds
209 : // These flags are true if the object has had range del or range key operations.
210 : hasRangeDels bool
211 : hasRangeKeys bool
212 : hasRangeKeyUnset bool
213 : // List of RangeKeySets for this object. Used to check for overlapping
214 : // RangeKeySets in external files.
215 : rangeKeySets []pebble.KeyRange
216 : }
217 :
218 : // MergeKey adds the given key, merging the histories as needed.
219 1 : func (okm *objKeyMeta) MergeKey(k *keyMeta) {
220 1 : meta, ok := okm.keys[string(k.key)]
221 1 : if !ok {
222 1 : meta = &keyMeta{
223 1 : objID: okm.id,
224 1 : key: k.key,
225 1 : }
226 1 : okm.keys[string(k.key)] = meta
227 1 : }
228 1 : k.mergeInto(meta)
229 : }
230 :
231 : // CollapseKeys collapses the history of all keys. Used with ingestion operation
232 : // which only use the last value of any given key.
233 1 : func (okm *objKeyMeta) CollapseKeys() {
234 1 : for _, keyMeta := range okm.keys {
235 1 : keyMeta.history = keyMeta.history.collapsed()
236 1 : }
237 : }
238 :
239 : // MergeFrom merges the `from` metadata into this one, appending all of its
240 : // individual operations at the provided timestamp.
241 1 : func (okm *objKeyMeta) MergeFrom(from *objKeyMeta, cmp base.Compare) {
242 1 : // The result should be the same regardless of the ordering of the keys.
243 1 : for _, k := range from.keys {
244 1 : okm.MergeKey(k)
245 1 : }
246 1 : okm.bounds.Expand(cmp, from.bounds)
247 1 : okm.hasRangeDels = okm.hasRangeDels || from.hasRangeDels
248 1 : okm.hasRangeKeys = okm.hasRangeKeys || from.hasRangeKeys
249 1 : okm.hasRangeKeyUnset = okm.hasRangeKeyUnset || from.hasRangeKeyUnset
250 1 : okm.rangeKeySets = append(okm.rangeKeySets, from.rangeKeySets...)
251 : }
252 :
253 : // objKeyMeta looks up the objKeyMeta for a given object, creating it if necessary.
254 1 : func (k *keyManager) objKeyMeta(o objID) *objKeyMeta {
255 1 : m, ok := k.byObj[o]
256 1 : if !ok {
257 1 : m = &objKeyMeta{
258 1 : id: o,
259 1 : keys: make(map[string]*keyMeta),
260 1 : }
261 1 : k.byObj[o] = m
262 1 : }
263 1 : return m
264 : }
265 :
266 : // SortedKeysForObj returns all the entries in objKeyMeta(o).keys, in sorted
267 : // order.
268 1 : func (k *keyManager) SortedKeysForObj(o objID) []keyMeta {
269 1 : okm := k.objKeyMeta(o)
270 1 : res := make([]keyMeta, 0, len(okm.keys))
271 1 : for _, m := range okm.keys {
272 1 : res = append(res, *m)
273 1 : }
274 1 : slices.SortFunc(res, func(a, b keyMeta) int {
275 1 : cmp := k.comparer.Compare(a.key, b.key)
276 1 : if cmp == 0 {
277 0 : panic(fmt.Sprintf("distinct keys %q and %q compared as equal", a.key, b.key))
278 : }
279 1 : return cmp
280 : })
281 1 : return res
282 : }
283 :
284 : // InRangeKeysForObj returns all keys in the range [lower, upper) associated with the
285 : // given object, in sorted order. If either of the bounds is nil, it is ignored.
286 1 : func (k *keyManager) InRangeKeysForObj(o objID, lower, upper []byte) []keyMeta {
287 1 : var inRangeKeys []keyMeta
288 1 : for _, km := range k.SortedKeysForObj(o) {
289 1 : if (lower == nil || k.comparer.Compare(km.key, lower) >= 0) &&
290 1 : (upper == nil || k.comparer.Compare(km.key, upper) < 0) {
291 1 : inRangeKeys = append(inRangeKeys, km)
292 1 : }
293 : }
294 1 : return inRangeKeys
295 :
296 : }
297 :
298 : // KeysForExternalIngest returns the keys that will be ingested with an external
299 : // object (taking into consideration the bounds, synthetic suffix, etc).
300 1 : func (k *keyManager) KeysForExternalIngest(obj externalObjWithBounds) []keyMeta {
301 1 : var res []keyMeta
302 1 : var lastPrefix []byte
303 1 : for _, km := range k.SortedKeysForObj(obj.externalObjID) {
304 1 : // Apply prefix and suffix changes, then check the bounds.
305 1 : if obj.syntheticPrefix.IsSet() {
306 1 : km.key = obj.syntheticPrefix.Apply(km.key)
307 1 : }
308 1 : if obj.syntheticSuffix.IsSet() {
309 1 : n := k.comparer.Split(km.key)
310 1 : km.key = append(km.key[:n:n], obj.syntheticSuffix...)
311 1 : }
312 1 : if lastPrefix != nil && k.comparer.Equal(lastPrefix, km.key[:k.comparer.Split(km.key)]) {
313 1 : // We only keep the first of every unique prefix for external ingests.
314 1 : // See the use of uniquePrefixes in newExternalObjOp.
315 1 : continue
316 : }
317 1 : lastPrefix = append(lastPrefix[:0], km.key[:k.comparer.Split(km.key)]...)
318 1 : if k.comparer.Compare(km.key, obj.bounds.Start) >= 0 && k.comparer.Compare(km.key, obj.bounds.End) < 0 {
319 1 : res = append(res, km)
320 1 : }
321 : }
322 : // Check for duplicate resulting keys.
323 1 : for i := 1; i < len(res); i++ {
324 1 : if k.comparer.Compare(res[i].key, res[i-1].key) == 0 {
325 0 : panic(fmt.Sprintf("duplicate external ingest key %q", res[i].key))
326 : }
327 : }
328 1 : return res
329 : }
330 :
331 1 : func (k *keyManager) ExternalObjectHasOverlappingRangeKeySets(externalObjID objID) bool {
332 1 : meta := k.objKeyMeta(externalObjID)
333 1 : if len(meta.rangeKeySets) == 0 {
334 1 : return false
335 1 : }
336 0 : ranges := meta.rangeKeySets
337 0 : // Sort by start key.
338 0 : slices.SortFunc(ranges, func(a, b pebble.KeyRange) int {
339 0 : return k.comparer.Compare(a.Start, b.Start)
340 0 : })
341 : // Check overlap between adjacent ranges.
342 0 : for i := 0; i < len(ranges)-1; i++ {
343 0 : if ranges[i].OverlapsKeyRange(k.comparer.Compare, ranges[i+1]) {
344 0 : return true
345 0 : }
346 : }
347 0 : return false
348 : }
349 :
350 : // getSetOfVisibleKeys returns a sorted slice of keys that are visible in the
351 : // provided reader object under the object's current history.
352 1 : func (k *keyManager) getSetOfVisibleKeys(readerID objID) [][]byte {
353 1 : okm := k.objKeyMeta(readerID)
354 1 : keys := make([][]byte, 0, len(okm.keys))
355 1 : for k, km := range okm.keys {
356 1 : if km.history.hasVisibleValue() {
357 1 : keys = append(keys, unsafe.Slice(unsafe.StringData(k), len(k)))
358 1 : }
359 : }
360 1 : slices.SortFunc(keys, testkeys.Comparer.Compare)
361 1 : return keys
362 : }
363 :
364 : // newKeyManager returns a pointer to a new keyManager. Callers should
365 : // interact with this using addNewKey, knownKeys, update methods only.
366 1 : func newKeyManager(numInstances int) *keyManager {
367 1 : m := &keyManager{
368 1 : comparer: testkeys.Comparer,
369 1 : byObj: make(map[objID]*objKeyMeta),
370 1 : globalKeysMap: make(map[string]bool),
371 1 : globalKeyPrefixesMap: make(map[string]struct{}),
372 1 : }
373 1 : for i := 1; i <= max(numInstances, 1); i++ {
374 1 : m.objKeyMeta(makeObjID(dbTag, uint32(i)))
375 1 : }
376 1 : return m
377 : }
378 :
379 : // addNewKey adds the given key to the key manager for global key tracking.
380 : // Returns false iff this is not a new key.
381 1 : func (k *keyManager) addNewKey(key []byte) bool {
382 1 : if k.globalKeysMap[string(key)] {
383 1 : return false
384 1 : }
385 1 : insertSorted(k.comparer.Compare, &k.globalKeys, key)
386 1 : k.globalKeysMap[string(key)] = true
387 1 :
388 1 : prefixLen := k.comparer.Split(key)
389 1 : if prefixLen == 0 {
390 0 : panic(fmt.Sprintf("key %q has zero length prefix", key))
391 : }
392 1 : if _, ok := k.globalKeyPrefixesMap[string(key[:prefixLen])]; !ok {
393 1 : insertSorted(k.comparer.Compare, &k.globalKeyPrefixes, key[:prefixLen])
394 1 : k.globalKeyPrefixesMap[string(key[:prefixLen])] = struct{}{}
395 1 : }
396 1 : return true
397 : }
398 :
399 : // getOrInit returns the keyMeta for the (objID, key) pair, if it exists, else
400 : // allocates, initializes and returns a new value.
401 1 : func (k *keyManager) getOrInit(id objID, key []byte) *keyMeta {
402 1 : objKeys := k.objKeyMeta(id)
403 1 : m, ok := objKeys.keys[string(key)]
404 1 : if ok {
405 1 : return m
406 1 : }
407 1 : m = &keyMeta{
408 1 : objID: id,
409 1 : key: key,
410 1 : }
411 1 : // Initialize the key-to-meta index.
412 1 : objKeys.keys[string(key)] = m
413 1 : // Expand the object's bounds to contain this key if they don't already.
414 1 : objKeys.bounds.Expand(k.comparer.Compare, k.makeSingleKeyBounds(key))
415 1 : return m
416 : }
417 :
418 : // mergeObjectInto merges obj key metadata from an object into another and
419 : // deletes the metadata for the source object (which must not be used again).
420 1 : func (k *keyManager) mergeObjectInto(from, to objID) {
421 1 : toMeta := k.objKeyMeta(to)
422 1 : toMeta.MergeFrom(k.objKeyMeta(from), k.comparer.Compare)
423 1 :
424 1 : delete(k.byObj, from)
425 1 : }
426 :
427 : // expandBounds expands the incrementally maintained bounds of o to be at least
428 : // as wide as `b`.
429 1 : func (k *keyManager) expandBounds(o objID, b bounds) {
430 1 : k.objKeyMeta(o).bounds.Expand(k.comparer.Compare, b)
431 1 : }
432 :
433 : // doObjectBoundsOverlap returns true iff any of the named objects have key
434 : // bounds that overlap any other named object.
435 1 : func (k *keyManager) doObjectBoundsOverlap(objIDs []objID) bool {
436 1 : for i := range objIDs {
437 1 : ib, iok := k.byObj[objIDs[i]]
438 1 : if !iok {
439 1 : continue
440 : }
441 1 : for j := i + 1; j < len(objIDs); j++ {
442 1 : jb, jok := k.byObj[objIDs[j]]
443 1 : if !jok {
444 1 : continue
445 : }
446 1 : if ib.bounds.Overlaps(k.comparer.Compare, jb.bounds) {
447 1 : return true
448 1 : }
449 : }
450 : }
451 1 : return false
452 : }
453 :
454 : // checkForSingleDelConflicts examines all the keys written to srcObj, and
455 : // determines whether any of the contained single deletes would be
456 : // nondeterministic if applied to dstObj in dstObj's current state. It returns a
457 : // slice of all the keys that are found to conflict. In order to preserve
458 : // determinism, the caller must delete the key from the destination before
459 : // writing src's mutations to dst in order to ensure determinism.
460 : //
461 : // It takes a `srcCollapsed` parameter that determines whether the source
462 : // history should be "collapsed" (see keyHistory.collapsed) before determining
463 : // whether the applied state will conflict. This is required to facilitate
464 : // ingestOps which are NOT equivalent to committing the batch, because they can
465 : // only commit 1 internal point key at each unique user key.
466 1 : func (k *keyManager) checkForSingleDelConflicts(srcObj, dstObj objID, srcCollapsed bool) [][]byte {
467 1 : dstKeys := k.objKeyMeta(dstObj)
468 1 : var conflicts [][]byte
469 1 : for _, src := range k.SortedKeysForObj(srcObj) {
470 1 : if srcCollapsed {
471 1 : src.history = src.history.collapsed()
472 1 : }
473 1 : if k.checkForSingleDelConflict(src, dstKeys) {
474 1 : conflicts = append(conflicts, src.key)
475 1 : }
476 : }
477 1 : return conflicts
478 : }
479 :
480 : // checkForSingleDelConflict returns true if applying the history of the source
481 : // key on top of the given object results in a possible SingleDel
482 : // nondeterminism. See checkForSingleDelConflicts.
483 1 : func (k *keyManager) checkForSingleDelConflict(src keyMeta, dstObjKeyMeta *objKeyMeta) bool {
484 1 : // Single delete generation logic already ensures that both the source
485 1 : // object and the destination object's single deletes are deterministic
486 1 : // within the context of their existing writes. However, applying the source
487 1 : // keys on top of the destination object may violate the invariants.
488 1 : // Consider:
489 1 : //
490 1 : // src: a.SET; a.SINGLEDEL;
491 1 : // dst: a.SET;
492 1 : //
493 1 : // The merged view is:
494 1 : //
495 1 : // a.SET; a.SET; a.SINGLEDEL;
496 1 : //
497 1 : // This is invalid, because there is more than 1 value mutation of the
498 1 : // key before the single delete.
499 1 : //
500 1 : // We walk the source object's history in chronological order, looking
501 1 : // for a single delete that was written before a DEL/RANGEDEL. (NB: We
502 1 : // don't need to look beyond a DEL/RANGEDEL, because these deletes bound
503 1 : // any subsequently-written single deletes to applying to the keys
504 1 : // within src's history between the two tombstones. We already know from
505 1 : // per-object history invariants that any such single delete must be
506 1 : // deterministic with respect to src's keys.)
507 1 : var srcHasUnboundedSingleDelete bool
508 1 : var srcValuesBeforeSingleDelete int
509 1 :
510 1 : // When the srcObj is being ingested (srcCollapsed=t), the semantics
511 1 : // change. We must first "collapse" the key's history to represent the
512 1 : // ingestion semantics.
513 1 : srcHistory := src.history
514 1 :
515 1 : srcloop:
516 1 : for _, item := range srcHistory {
517 1 : switch item.opType {
518 1 : case OpWriterDelete, OpWriterDeleteRange:
519 1 : // We found a DEL or RANGEDEL before any single delete. If src
520 1 : // contains additional single deletes, their effects are limited
521 1 : // to applying to later keys. Combining the two object histories
522 1 : // doesn't pose any determinism risk.
523 1 : return false
524 :
525 1 : case OpWriterSingleDelete:
526 1 : // We found a single delete. Since we found this single delete
527 1 : // before a DEL or RANGEDEL, this delete has the potential to
528 1 : // affect the visibility of keys in `dstObj`. We'll need to look
529 1 : // for potential conflicts down below.
530 1 : srcHasUnboundedSingleDelete = true
531 1 : if srcValuesBeforeSingleDelete > 1 {
532 0 : panic(errors.AssertionFailedf("unexpectedly found %d sets/merges before single del",
533 0 : srcValuesBeforeSingleDelete))
534 : }
535 1 : break srcloop
536 :
537 1 : case OpWriterSet, OpWriterMerge:
538 1 : // We found a SET or MERGE operation for this key. If there's a
539 1 : // subsequent single delete, we'll need to make sure there's not
540 1 : // a SET or MERGE in the dst too.
541 1 : srcValuesBeforeSingleDelete++
542 :
543 0 : default:
544 0 : panic(errors.AssertionFailedf("unexpected optype %d", item.opType))
545 : }
546 : }
547 1 : if !srcHasUnboundedSingleDelete {
548 1 : return false
549 1 : }
550 :
551 1 : dst, ok := dstObjKeyMeta.keys[string(src.key)]
552 1 : // If the destination writer has no record of the key, the combined key
553 1 : // history is simply the src object's key history which is valid due to
554 1 : // per-object single deletion invariants.
555 1 : if !ok {
556 1 : return false
557 1 : }
558 :
559 : // We need to examine the trailing key history on dst.
560 1 : consecutiveValues := srcValuesBeforeSingleDelete
561 1 : for i := len(dst.history) - 1; i >= 0; i-- {
562 1 : switch dst.history[i].opType {
563 1 : case OpWriterSet, OpWriterMerge:
564 1 : // A SET/MERGE may conflict if there's more than 1 consecutive
565 1 : // SET/MERGEs.
566 1 : consecutiveValues++
567 1 : if consecutiveValues > 1 {
568 1 : return true
569 1 : }
570 1 : case OpWriterDelete, OpWriterSingleDelete, OpWriterDeleteRange:
571 1 : // Dels clear the history, enabling use of single delete.
572 1 : return false
573 :
574 0 : default:
575 0 : panic(errors.AssertionFailedf("unexpected optype %d", dst.history[i].opType))
576 : }
577 : }
578 1 : return false
579 : }
580 :
581 : // update updates the internal state of the keyManager according to the given
582 : // op.
583 1 : func (k *keyManager) update(o op) {
584 1 : switch s := o.(type) {
585 1 : case *setOp:
586 1 : meta := k.getOrInit(s.writerID, s.key)
587 1 : meta.history = append(meta.history, keyHistoryItem{
588 1 : opType: OpWriterSet,
589 1 : })
590 1 : case *mergeOp:
591 1 : meta := k.getOrInit(s.writerID, s.key)
592 1 : meta.history = append(meta.history, keyHistoryItem{
593 1 : opType: OpWriterMerge,
594 1 : })
595 1 : case *deleteOp:
596 1 : meta := k.getOrInit(s.writerID, s.key)
597 1 : if meta.objID.tag() == dbTag {
598 1 : meta.clear()
599 1 : } else {
600 1 : meta.history = append(meta.history, keyHistoryItem{
601 1 : opType: OpWriterDelete,
602 1 : })
603 1 : }
604 1 : case *deleteRangeOp:
605 1 : // We track the history of discrete point keys, but a range deletion
606 1 : // applies over a continuous key span of infinite keys. However, the key
607 1 : // manager knows all keys that have been used in all operations, so we
608 1 : // can discretize the range tombstone by adding it to every known key
609 1 : // within the range.
610 1 : //
611 1 : // TODO(jackson): If s.writerID is a batch, we may not know the set of
612 1 : // all keys that WILL exist by the time the batch is committed. This
613 1 : // means the delete range history is incomplete. Fix this.
614 1 : keyRange := pebble.KeyRange{Start: s.start, End: s.end}
615 1 : for _, key := range k.knownKeysInRange(keyRange) {
616 1 : meta := k.getOrInit(s.writerID, key)
617 1 : if meta.objID.tag() == dbTag {
618 1 : meta.clear()
619 1 : } else {
620 1 : meta.history = append(meta.history, keyHistoryItem{
621 1 : opType: OpWriterDeleteRange,
622 1 : })
623 1 : }
624 : }
625 1 : k.expandBounds(s.writerID, k.makeEndExclusiveBounds(s.start, s.end))
626 1 : k.objKeyMeta(s.writerID).hasRangeDels = true
627 :
628 1 : case *singleDeleteOp:
629 1 : meta := k.getOrInit(s.writerID, s.key)
630 1 : meta.history = append(meta.history, keyHistoryItem{
631 1 : opType: OpWriterSingleDelete,
632 1 : })
633 1 : case *rangeKeyDeleteOp:
634 1 : // Range key operations on their own don't determine singledel eligibility,
635 1 : // however range key operations could be part of a batch which contains
636 1 : // other operations that do affect it. If those batches were to get
637 1 : // ingested, we'd need to know what the bounds of sstables generated out
638 1 : // of those batches are, as that determines whether that ingestion
639 1 : // will succeed or not.
640 1 : k.expandBounds(s.writerID, k.makeEndExclusiveBounds(s.start, s.end))
641 1 : k.objKeyMeta(s.writerID).hasRangeKeys = true
642 1 : case *rangeKeySetOp:
643 1 : k.expandBounds(s.writerID, k.makeEndExclusiveBounds(s.start, s.end))
644 1 : meta := k.objKeyMeta(s.writerID)
645 1 : meta.hasRangeKeys = true
646 1 : meta.rangeKeySets = append(meta.rangeKeySets, pebble.KeyRange{
647 1 : Start: s.start,
648 1 : End: s.end,
649 1 : })
650 1 : case *rangeKeyUnsetOp:
651 1 : k.expandBounds(s.writerID, k.makeEndExclusiveBounds(s.start, s.end))
652 1 : meta := k.objKeyMeta(s.writerID)
653 1 : meta.hasRangeKeys = true
654 1 : meta.hasRangeKeyUnset = true
655 1 : case *ingestOp:
656 1 : // Some ingestion operations may attempt to ingest overlapping sstables
657 1 : // which is prohibited. We know at generation time whether these
658 1 : // ingestions will be successful. If they won't be successful, we should
659 1 : // not update the key state because both the batch(es) and target DB
660 1 : // will be left unmodified.
661 1 : if k.doObjectBoundsOverlap(s.batchIDs) {
662 1 : // This ingestion will fail.
663 1 : return
664 1 : }
665 :
666 : // For each batch, merge the keys into the DB. We can't call
667 : // keyMeta.mergeInto directly to merge, because ingest operations first
668 : // "flatten" the batch (because you can't set the same key twice at a
669 : // single sequence number). Instead we compute the collapsed history and
670 : // merge that.
671 1 : for _, batchID := range s.batchIDs {
672 1 : k.objKeyMeta(batchID).CollapseKeys()
673 1 : k.mergeObjectInto(batchID, s.dbID)
674 1 : }
675 1 : case *ingestAndExciseOp:
676 1 : // IngestAndExcise does not ingest multiple batches, so we will not see
677 1 : // a failure due to overlapping sstables. However we do need to merge
678 1 : // the singular batch into the key manager.
679 1 : //
680 1 : // Remove all keys from the key manager within the excise span before
681 1 : // merging the batch into the db.
682 1 : for _, key := range k.InRangeKeysForObj(s.dbID, s.exciseStart, s.exciseEnd) {
683 1 : m := k.getOrInit(s.dbID, key.key)
684 1 : m.clear()
685 1 : }
686 1 : k.objKeyMeta(s.batchID).CollapseKeys()
687 1 : k.mergeObjectInto(s.batchID, s.dbID)
688 : // TODO(bilal): Handle replicateOp here. We currently disable SingleDelete
689 : // when these operations are enabled (see multiInstanceConfig).
690 1 : case *newExternalObjOp:
691 1 : // Collapse and transfer the keys from the batch to the external object.
692 1 : k.objKeyMeta(s.batchID).CollapseKeys()
693 1 : k.mergeObjectInto(s.batchID, s.externalObjID)
694 1 : case *ingestExternalFilesOp:
695 1 : // Merge the keys from the external objects (within the restricted bounds)
696 1 : // into the database.
697 1 : dbMeta := k.objKeyMeta(s.dbID)
698 1 : for _, obj := range s.objs {
699 1 : for _, keyMeta := range k.KeysForExternalIngest(obj) {
700 1 : dbMeta.MergeKey(&keyMeta)
701 1 : }
702 1 : dbMeta.bounds.Expand(k.comparer.Compare, k.makeEndExclusiveBounds(obj.bounds.Start, obj.bounds.End))
703 : }
704 1 : case *applyOp:
705 1 : // Merge the keys from this batch into the parent writer.
706 1 : k.mergeObjectInto(s.batchID, s.writerID)
707 1 : case *batchCommitOp:
708 1 : // Merge the keys from the batch with the keys from the DB.
709 1 : k.mergeObjectInto(s.batchID, s.dbID)
710 : }
711 : }
712 :
713 1 : func (k *keyManager) knownKeys() (keys [][]byte) {
714 1 : return k.globalKeys
715 1 : }
716 :
717 : // knownKeysInRange returns all eligible read keys within the range
718 : // [start,end). The returned slice is owned by the keyManager and must not be
719 : // retained.
720 1 : func (k *keyManager) knownKeysInRange(kr pebble.KeyRange) (keys [][]byte) {
721 1 : s, _ := slices.BinarySearchFunc(k.globalKeys, kr.Start, k.comparer.Compare)
722 1 : e, _ := slices.BinarySearchFunc(k.globalKeys, kr.End, k.comparer.Compare)
723 1 : if s >= e {
724 1 : return nil
725 1 : }
726 1 : return k.globalKeys[s:e]
727 : }
728 :
729 1 : func (k *keyManager) prefixes() (prefixes [][]byte) {
730 1 : return k.globalKeyPrefixes
731 1 : }
732 :
733 : // prefixExists returns true if a key has been generated with the provided
734 : // prefix before.
735 1 : func (k *keyManager) prefixExists(prefix []byte) bool {
736 1 : _, exists := k.globalKeyPrefixesMap[string(prefix)]
737 1 : return exists
738 1 : }
739 :
740 : // eligibleSingleDeleteKeys returns a slice of keys that can be safely single
741 : // deleted, given the writer id. Restricting single delete keys through this
742 : // method is used to ensure the OLW1 guarantee (see the keyManager comment) for
743 : // the provided object ID.
744 1 : func (k *keyManager) eligibleSingleDeleteKeys(o objID) (keys [][]byte) {
745 1 : // Creating a slice of keys is wasteful given that the caller will pick one,
746 1 : // but makes it simpler for unit testing.
747 1 : objKeys := k.objKeyMeta(o)
748 1 : for _, key := range k.globalKeys {
749 1 : meta, ok := objKeys.keys[string(key)]
750 1 : if !ok {
751 1 : keys = append(keys, key)
752 1 : continue
753 : }
754 : // Examine the history within this object.
755 1 : if meta.history.canSingleDelete() {
756 1 : keys = append(keys, key)
757 1 : }
758 : }
759 1 : return keys
760 : }
761 :
762 : // makeSingleKeyBounds creates a [key, key] bound.
763 1 : func (k *keyManager) makeSingleKeyBounds(key []byte) bounds {
764 1 : return bounds{
765 1 : smallest: key,
766 1 : largest: key,
767 1 : largestExcl: false,
768 1 : }
769 1 : }
770 :
771 : // makeEndExclusiveBounds creates a [smallest, largest) bound.
772 1 : func (k *keyManager) makeEndExclusiveBounds(smallest, largest []byte) bounds {
773 1 : b := bounds{
774 1 : smallest: smallest,
775 1 : largest: largest,
776 1 : largestExcl: true,
777 1 : }
778 1 : b.checkValid(k.comparer.Compare)
779 1 : return b
780 1 : }
781 :
782 : // a keyHistoryItem describes an individual operation performed on a key.
783 : type keyHistoryItem struct {
784 : // opType may be writerSet, writerDelete, writerSingleDelete,
785 : // writerDeleteRange or writerMerge only. No other opTypes may appear here.
786 : opType OpType
787 : }
788 :
789 : // keyHistory captures the history of mutations to a key in chronological order.
790 : type keyHistory []keyHistoryItem
791 :
792 : // canSingleDelete examines the tail of the history and returns true if a single
793 : // delete appended to this history would satisfy the single delete invariants.
794 1 : func (h keyHistory) canSingleDelete() bool {
795 1 : if len(h) == 0 {
796 1 : return true
797 1 : }
798 1 : switch o := h[len(h)-1].opType; o {
799 1 : case OpWriterDelete, OpWriterDeleteRange, OpWriterSingleDelete:
800 1 : return true
801 1 : case OpWriterSet, OpWriterMerge:
802 1 : if len(h) == 1 {
803 1 : return true
804 1 : }
805 1 : return h[len(h)-2].opType.isDelete()
806 0 : default:
807 0 : panic(errors.AssertionFailedf("unexpected writer op %v", o))
808 : }
809 : }
810 :
811 0 : func (h keyHistory) String() string {
812 0 : var sb strings.Builder
813 0 : for i, it := range h {
814 0 : if i > 0 {
815 0 : fmt.Fprint(&sb, ", ")
816 0 : }
817 0 : switch it.opType {
818 0 : case OpWriterDelete:
819 0 : fmt.Fprint(&sb, "del")
820 0 : case OpWriterDeleteRange:
821 0 : fmt.Fprint(&sb, "delrange")
822 0 : case OpWriterSingleDelete:
823 0 : fmt.Fprint(&sb, "singledel")
824 0 : case OpWriterSet:
825 0 : fmt.Fprint(&sb, "set")
826 0 : case OpWriterMerge:
827 0 : fmt.Fprint(&sb, "merge")
828 0 : default:
829 0 : fmt.Fprintf(&sb, "optype[v=%d]", it.opType)
830 : }
831 : }
832 0 : return sb.String()
833 : }
834 :
835 : // hasVisibleKey examines the tail of the history and returns true if the
836 : // history should end in a visible value for this key.
837 1 : func (h keyHistory) hasVisibleValue() bool {
838 1 : if len(h) == 0 {
839 1 : return false
840 1 : }
841 1 : return !h[len(h)-1].opType.isDelete()
842 : }
843 :
844 : // collapsed returns a new key history that's equivalent to the history created
845 : // by an ingestOp that "collapses" a batch's keys. See ingestOp.build.
846 1 : func (h keyHistory) collapsed() keyHistory {
847 1 : var ret keyHistory
848 1 : // When collapsing a batch, any range deletes are semantically applied
849 1 : // first. Look for any range deletes and apply them.
850 1 : for _, op := range h {
851 1 : if op.opType == OpWriterDeleteRange {
852 1 : ret = append(ret, op)
853 1 : break
854 : }
855 : }
856 : // Among point keys, the most recently written key wins.
857 1 : for i := len(h) - 1; i >= 0; i-- {
858 1 : if h[i].opType != OpWriterDeleteRange {
859 1 : ret = append(ret, h[i])
860 1 : break
861 : }
862 : }
863 1 : return ret
864 : }
865 :
866 1 : func opWrittenKeys(untypedOp op) [][]byte {
867 1 : switch t := untypedOp.(type) {
868 1 : case *applyOp:
869 1 : case *batchCommitOp:
870 1 : case *checkpointOp:
871 1 : case *closeOp:
872 1 : case *compactOp:
873 1 : case *dbRestartOp:
874 1 : case *deleteOp:
875 1 : return [][]byte{t.key}
876 1 : case *deleteRangeOp:
877 1 : return [][]byte{t.start, t.end}
878 1 : case *flushOp:
879 1 : case *getOp:
880 1 : case *ingestOp:
881 1 : case *initOp:
882 1 : case *iterFirstOp:
883 1 : case *iterLastOp:
884 1 : case *iterNextOp:
885 1 : case *iterNextPrefixOp:
886 1 : case *iterCanSingleDelOp:
887 1 : case *iterPrevOp:
888 1 : case *iterSeekGEOp:
889 1 : case *iterSeekLTOp:
890 1 : case *iterSeekPrefixGEOp:
891 1 : case *iterSetBoundsOp:
892 1 : case *iterSetOptionsOp:
893 1 : case *mergeOp:
894 1 : return [][]byte{t.key}
895 1 : case *newBatchOp:
896 1 : case *newIndexedBatchOp:
897 1 : case *newIterOp:
898 1 : case *newIterUsingCloneOp:
899 1 : case *newSnapshotOp:
900 1 : case *rangeKeyDeleteOp:
901 1 : case *rangeKeySetOp:
902 1 : case *rangeKeyUnsetOp:
903 1 : case *setOp:
904 1 : return [][]byte{t.key}
905 1 : case *singleDeleteOp:
906 1 : return [][]byte{t.key}
907 1 : case *replicateOp:
908 1 : return [][]byte{t.start, t.end}
909 : }
910 1 : return nil
911 : }
912 :
913 1 : func loadPrecedingKeys(t TestingT, ops []op, cfg *OpConfig, m *keyManager) {
914 1 : for _, op := range ops {
915 1 : // Pretend we're generating all the operation's keys as potential new
916 1 : // key, so that we update the key manager's keys and prefix sets.
917 1 : for _, k := range opWrittenKeys(op) {
918 1 : m.addNewKey(k)
919 1 :
920 1 : // If the key has a suffix, ratchet up the suffix distribution if
921 1 : // necessary.
922 1 : if s := m.comparer.Split(k); s < len(k) {
923 1 : suffix, err := testkeys.ParseSuffix(k[s:])
924 1 : require.NoError(t, err)
925 1 : if uint64(suffix) > cfg.writeSuffixDist.Max() {
926 1 : diff := int(uint64(suffix) - cfg.writeSuffixDist.Max())
927 1 : cfg.writeSuffixDist.IncMax(diff)
928 1 : }
929 : }
930 : }
931 :
932 : // Update key tracking state.
933 1 : m.update(op)
934 : }
935 : }
936 :
937 1 : func insertSorted(cmp base.Compare, dst *[][]byte, k []byte) {
938 1 : s := *dst
939 1 : i, _ := slices.BinarySearchFunc(s, k, cmp)
940 1 : *dst = slices.Insert(s, i, k)
941 1 : }
|