Line data Source code
1 : package metamorphic
2 :
3 : import (
4 : "bytes"
5 : "cmp"
6 : "fmt"
7 : "slices"
8 : "strings"
9 :
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble"
12 : "github.com/cockroachdb/pebble/internal/base"
13 : "github.com/cockroachdb/pebble/internal/testkeys"
14 : "github.com/stretchr/testify/require"
15 : )
16 :
17 : // objKey is a tuple of (objID, key). This struct is used primarily as a map
18 : // key for keyManager. Only writer objTags can occur here, i.e., dbTag and
19 : // batchTag, since this is used for tracking the keys in a writer.
20 : type objKey struct {
21 : id objID
22 : key []byte
23 : }
24 :
25 : // makeObjKey returns a new objKey given and id and key.
26 0 : func makeObjKey(id objID, key []byte) objKey {
27 0 : if id.tag() != dbTag && id.tag() != batchTag {
28 0 : panic(fmt.Sprintf("unexpected non-writer tag %v", id.tag()))
29 : }
30 0 : return objKey{id, key}
31 : }
32 :
33 : // String implements fmt.Stringer, returning a stable string representation of
34 : // the objKey. This string is used as map key.
35 0 : func (o objKey) String() string {
36 0 : return fmt.Sprintf("%s:%s", o.id, o.key)
37 0 : }
38 :
39 : // keyMeta is metadata associated with an (objID, key) pair, where objID is
40 : // a writer containing the key.
41 : type keyMeta struct {
42 : objKey
43 : // history provides the history of writer operations applied against this
44 : // key on this object. history is always ordered by non-decreasing
45 : // metaTimestamp.
46 : history keyHistory
47 : }
48 :
49 0 : func (m *keyMeta) clear() {
50 0 : m.history = m.history[:0]
51 0 : }
52 :
53 : // mergeInto merges this metadata into the metadata for other, appending all of
54 : // its individual operations to dst at the provided timestamp.
55 0 : func (m *keyMeta) mergeInto(dst *keyMeta, ts int) {
56 0 : for _, op := range m.history {
57 0 : // If the key is being merged into a database object and the operation
58 0 : // is a delete, we can clear the destination history. Database objects
59 0 : // are end points in the merging of keys and won't be the source of a
60 0 : // future merge. Deletions cause all other operations to behave as
61 0 : // though the key was never written to the database at all, so we don't
62 0 : // need to consider it for maintaining single delete invariants.
63 0 : //
64 0 : // NB: There's a subtlety here in that isDelete() will return true if
65 0 : // opType is a writerSingleDelete, but single deletes are capable of
66 0 : // leaking information about the history of writes. However, that's
67 0 : // okay, because as long as we're properly generating single deletes
68 0 : // according to the W1 invariant described in keyManager's comment, a
69 0 : // single delete is equivalent to delete for the current history.
70 0 : if dst.objKey.id.tag() == dbTag && op.opType.isDelete() {
71 0 : dst.clear()
72 0 : continue
73 : }
74 0 : dst.history = append(dst.history, keyHistoryItem{
75 0 : opType: op.opType,
76 0 : metaTimestamp: ts,
77 0 : })
78 : }
79 : }
80 :
81 : type bounds struct {
82 : smallest []byte
83 : largest []byte
84 : largestExcl bool // is largest exclusive?
85 : }
86 :
87 0 : func (b *bounds) String() string {
88 0 : if b.largestExcl {
89 0 : return fmt.Sprintf("[%q,%q)", b.smallest, b.largest)
90 0 : }
91 0 : return fmt.Sprintf("[%q,%q]", b.smallest, b.largest)
92 : }
93 :
94 : // overlaps returns true iff the bounds intersect.
95 0 : func (b *bounds) overlaps(cmp base.Compare, other *bounds) bool {
96 0 : // Is b strictly before other?
97 0 : if v := cmp(b.largest, other.smallest); v < 0 || (v == 0 && b.largestExcl) {
98 0 : return false
99 0 : }
100 : // Is b strictly after other?
101 0 : if v := cmp(b.smallest, other.largest); v > 0 || (v == 0 && other.largestExcl) {
102 0 : return false
103 0 : }
104 0 : return true
105 : }
106 :
107 : // mergeInto merges the receiver bounds into other, mutating other.
108 0 : func (b bounds) mergeInto(cmp base.Compare, other *bounds) {
109 0 : if cmp(other.smallest, b.smallest) > 0 {
110 0 : other.smallest = b.smallest
111 0 : }
112 0 : if v := cmp(other.largest, b.largest); v < 0 || (v == 0 && other.largestExcl) {
113 0 : other.largest = b.largest
114 0 : other.largestExcl = b.largestExcl
115 0 : }
116 : }
117 :
118 : // keyManager tracks the write operations performed on keys in the generation
119 : // phase of the metamorphic test. It maintains histories of operations performed
120 : // against every unique user key on every writer object. These histories inform
121 : // operation generation in order to maintain invariants that Pebble requires of
122 : // end users, mostly around single deletions.
123 : //
124 : // A single deletion has a subtle requirement of the writer:
125 : //
126 : // W1: The writer may only single delete a key `k` if `k` has been Set once
127 : // (and never MergeD) since the last delete.
128 : //
129 : // When a SINGLEDEL key deletes a SET key within a compaction, both the SET and
130 : // the SINGLEDEL keys are elided. If multiple SETs of the key exist within the
131 : // LSM, the SINGLEDEL reveals the lower SET. This behavior is dependent on the
132 : // internal LSM state and nondeterministic. To ensure determinism, the end user
133 : // must satisfy W1 and use single delete only when they can guarantee that the
134 : // key has been set at most once since the last delete, preventing this rollback
135 : // to a previous value.
136 : //
137 : // This W1 invariant requires a delicate dance during operation generation,
138 : // because independent batches may be independently built and committed. With
139 : // multi-instance variants of the metamorphic tests, keys in batches may
140 : // ultimately be committed to any of several DB instances. To satisfy these
141 : // requirements, the key manager tracks the history of every key on every
142 : // writable object. When generating a new single deletion operation, the
143 : // generator asks the key manager for a set of keys for which a single delete
144 : // maintains the W1 invariant within the object itself. This object-local W1
145 : // invariant (OLW1) is equivalent to W1 if one only ever performs write
146 : // operations directly against individual DB objects.
147 : //
148 : // However with the existence of batches that receive writes independent of DB
149 : // objects, W1 may be violated by appending the histories of two objects that
150 : // independently satisfy OLW1. Consider a sequence such as:
151 : //
152 : // 1. db1.Set("foo")
153 : // 2. batch1.Set("foo")
154 : // 3. batch1.SingleDelete("foo")
155 : // 4. db1.Apply(batch1)
156 : //
157 : // Both db1 and batch1 satisfy the object-local invariant OLW1. However the
158 : // composition of the histories created by appending batch1's operations to db1
159 : // creates a history that now violates W1 on db1. To detect this violation,
160 : // batch applications/commits and ingestions examine the tail of the destination
161 : // object's history and the head of the source batch's history. When a violation
162 : // is detected, these operations insert additional Delete operations to clear
163 : // the conflicting keys before proceeding with the conflicting operation. These
164 : // deletes reset the key history.
165 : //
166 : // Note that this generation-time key tracking requires that operations be
167 : // infallible, because a runtime failure would cause the key manager's state to
168 : // diverge from the runtime object state. Ingestion operations pose an obstacle,
169 : // because the generator may generate ingestions that fail due to overlapping
170 : // sstables. Today, this complication is sidestepped by avoiding ingestion of
171 : // multiple batches containing deletes or single deletes since loss of those
172 : // specific operations on a key are what we cannot tolerate (doing SingleDelete
173 : // on a key that has not been written to because the Set was lost is harmless).
174 : //
175 : // TODO(jackson): Instead, compute smallest and largest bounds of batches so
176 : // that we know at generation-time whether or not an ingestion operation will
177 : // fail and can avoid updating key state.
178 : type keyManager struct {
179 : comparer *base.Comparer
180 :
181 : // metaTimestamp is used to provide a ordering over certain operations like
182 : // iter creation, updates to keys. Keeping track of the timestamp allows us
183 : // to make determinations such as whether a key will be visible to an
184 : // iterator.
185 : metaTimestamp int
186 :
187 : // byObjKey tracks the state for each (writer, key) pair. It refers to the
188 : // same *keyMeta as in the byObj slices. Using a map allows for fast state
189 : // lookups when changing the state based on a writer operation on the key.
190 : byObjKey map[string]*keyMeta
191 : // List of keys per writer, and what has happened to it in that writer.
192 : // Will be transferred when needed.
193 : byObj map[objID][]*keyMeta
194 : // boundsByObj holds user key bounds encompassing all the keys set within an
195 : // object. It's updated within `update` when a new op is generated. It's
196 : // used when determining whether an ingestion should succeed or not.
197 : boundsByObj map[objID]*bounds
198 :
199 : // globalKeys represents all the keys that have been generated so far. Not
200 : // all these keys have been written to. globalKeys is sorted.
201 : globalKeys [][]byte
202 : // globalKeysMap contains the same keys as globalKeys but in a map. It
203 : // ensures no duplication.
204 : globalKeysMap map[string]bool
205 : // globalKeyPrefixes contains all the key prefixes (as defined by the
206 : // comparer's Split) generated so far. globalKeyPrefixes is sorted.
207 : globalKeyPrefixes [][]byte
208 : // globalKeyPrefixesMap contains the same keys as globalKeyPrefixes. It
209 : // ensures no duplication.
210 : globalKeyPrefixesMap map[string]struct{}
211 : }
212 :
213 0 : func (k *keyManager) nextMetaTimestamp() int {
214 0 : ret := k.metaTimestamp
215 0 : k.metaTimestamp++
216 0 : return ret
217 0 : }
218 :
219 : // newKeyManager returns a pointer to a new keyManager. Callers should
220 : // interact with this using addNewKey, knownKeys, update methods only.
221 0 : func newKeyManager(numInstances int) *keyManager {
222 0 : m := &keyManager{
223 0 : comparer: testkeys.Comparer,
224 0 : byObjKey: make(map[string]*keyMeta),
225 0 : byObj: make(map[objID][]*keyMeta),
226 0 : boundsByObj: make(map[objID]*bounds),
227 0 : globalKeysMap: make(map[string]bool),
228 0 : globalKeyPrefixesMap: make(map[string]struct{}),
229 0 : }
230 0 : for i := 1; i <= max(numInstances, 1); i++ {
231 0 : m.byObj[makeObjID(dbTag, uint32(i))] = []*keyMeta{}
232 0 : }
233 0 : return m
234 : }
235 :
236 : // addNewKey adds the given key to the key manager for global key tracking.
237 : // Returns false iff this is not a new key.
238 0 : func (k *keyManager) addNewKey(key []byte) bool {
239 0 : if k.globalKeysMap[string(key)] {
240 0 : return false
241 0 : }
242 0 : insertSorted(k.comparer.Compare, &k.globalKeys, key)
243 0 : k.globalKeysMap[string(key)] = true
244 0 :
245 0 : prefixLen := k.comparer.Split(key)
246 0 : if _, ok := k.globalKeyPrefixesMap[string(key[:prefixLen])]; !ok {
247 0 : insertSorted(k.comparer.Compare, &k.globalKeyPrefixes, key[:prefixLen])
248 0 : k.globalKeyPrefixesMap[string(key[:prefixLen])] = struct{}{}
249 0 : }
250 0 : return true
251 : }
252 :
253 : // getOrInit returns the keyMeta for the (objID, key) pair, if it exists, else
254 : // allocates, initializes and returns a new value.
255 0 : func (k *keyManager) getOrInit(id objID, key []byte) *keyMeta {
256 0 : o := makeObjKey(id, key)
257 0 : m, ok := k.byObjKey[o.String()]
258 0 : if ok {
259 0 : return m
260 0 : }
261 0 : m = &keyMeta{objKey: makeObjKey(id, key)}
262 0 : // Initialize the key-to-meta index.
263 0 : k.byObjKey[o.String()] = m
264 0 : // Add to the id-to-metas slide.
265 0 : k.byObj[o.id] = append(k.byObj[o.id], m)
266 0 :
267 0 : // Expand the object's bounds to contain this key if they don't already.
268 0 : k.expandBounds(id, bounds{
269 0 : smallest: key,
270 0 : largest: key,
271 0 : })
272 0 : return m
273 : }
274 :
275 : // mergeKeysInto merges all metadata for all keys associated with the "from" ID
276 : // with the metadata for keys associated with the "to" ID.
277 0 : func (k *keyManager) mergeKeysInto(from, to objID, mergeFunc func(src, dst *keyMeta, ts int)) {
278 0 : msFrom, ok := k.byObj[from]
279 0 : if !ok {
280 0 : msFrom = []*keyMeta{}
281 0 : k.byObj[from] = msFrom
282 0 : }
283 0 : msTo, ok := k.byObj[to]
284 0 : if !ok {
285 0 : msTo = []*keyMeta{}
286 0 : k.byObj[to] = msTo
287 0 : }
288 :
289 : // Sort to facilitate a merge.
290 0 : slices.SortFunc(msFrom, func(a, b *keyMeta) int {
291 0 : return bytes.Compare(a.key, b.key)
292 0 : })
293 0 : slices.SortFunc(msTo, func(a, b *keyMeta) int {
294 0 : return bytes.Compare(a.key, b.key)
295 0 : })
296 :
297 0 : ts := k.nextMetaTimestamp()
298 0 : var msNew []*keyMeta
299 0 : var iTo int
300 0 : for _, m := range msFrom {
301 0 : // Move cursor on mTo forward.
302 0 : for iTo < len(msTo) && bytes.Compare(msTo[iTo].key, m.key) < 0 {
303 0 : msNew = append(msNew, msTo[iTo])
304 0 : iTo++
305 0 : }
306 :
307 0 : var mTo *keyMeta
308 0 : if iTo < len(msTo) && bytes.Equal(msTo[iTo].key, m.key) {
309 0 : mTo = msTo[iTo]
310 0 : iTo++
311 0 : } else {
312 0 : mTo = &keyMeta{objKey: makeObjKey(to, m.key)}
313 0 : k.byObjKey[mTo.String()] = mTo
314 0 : }
315 :
316 0 : mergeFunc(m, mTo, ts)
317 0 : msNew = append(msNew, mTo)
318 0 :
319 0 : delete(k.byObjKey, m.String()) // Unlink "from".
320 : }
321 :
322 : // Add any remaining items from the "to" set.
323 0 : for iTo < len(msTo) {
324 0 : msNew = append(msNew, msTo[iTo])
325 0 : iTo++
326 0 : }
327 :
328 : // All the keys in `from` have been merged into `to`. Expand `to`'s bounds
329 : // to be at least as wide as `from`'s.
330 0 : if fromBounds := k.boundsByObj[from]; fromBounds != nil {
331 0 : k.expandBounds(to, *fromBounds)
332 0 : }
333 0 : k.byObj[to] = msNew // Update "to" obj.
334 0 : delete(k.byObj, from) // Unlink "from" obj.
335 0 : delete(k.boundsByObj, from) // Unlink "from" bounds.
336 : }
337 :
338 : // expandBounds expands the incrementally maintained bounds of o to be at least
339 : // as wide as `b`.
340 0 : func (k *keyManager) expandBounds(o objID, b bounds) {
341 0 : existing, ok := k.boundsByObj[o]
342 0 : if !ok {
343 0 : existing = new(bounds)
344 0 : *existing = b
345 0 : k.boundsByObj[o] = existing
346 0 : return
347 0 : }
348 0 : b.mergeInto(k.comparer.Compare, existing)
349 : }
350 :
351 : // doObjectBoundsOverlap returns true iff any of the named objects have key
352 : // bounds that overlap any other named object.
353 0 : func (k *keyManager) doObjectBoundsOverlap(objIDs []objID) bool {
354 0 : for i := range objIDs {
355 0 : ib, iok := k.boundsByObj[objIDs[i]]
356 0 : if !iok {
357 0 : continue
358 : }
359 0 : for j := i + 1; j < len(objIDs); j++ {
360 0 : jb, jok := k.boundsByObj[objIDs[j]]
361 0 : if !jok {
362 0 : continue
363 : }
364 0 : if ib.overlaps(k.comparer.Compare, jb) {
365 0 : return true
366 0 : }
367 : }
368 : }
369 0 : return false
370 : }
371 :
372 : // checkForSingleDelConflicts examines all the keys written to srcObj, and
373 : // determines whether any of the contained single deletes would be
374 : // nondeterministic if applied to dstObj in dstObj's current state. It returns a
375 : // slice of all the keys that are found to conflict. In order to preserve
376 : // determinism, the caller must delete the key from the destination before
377 : // writing src's mutations to dst in order to ensure determinism.
378 : //
379 : // It takes a `srcCollapsed` parameter that determines whether the source
380 : // history should be "collapsed" (see keyHistory.collapsed) before determining
381 : // whether the applied state will conflict. This is required to facilitate
382 : // ingestOps which are NOT equivalent to committing the batch, because they can
383 : // only commit 1 internal point key at each unique user key.
384 0 : func (k *keyManager) checkForSingleDelConflicts(srcObj, dstObj objID, srcCollapsed bool) [][]byte {
385 0 : var conflicts [][]byte
386 0 : for _, src := range k.byObj[srcObj] {
387 0 : // Single delete generation logic already ensures that both srcObj and
388 0 : // dstObj's single deletes are deterministic within the context of their
389 0 : // existing writes. However, applying srcObj on top of dstObj may
390 0 : // violate the invariants. Consider:
391 0 : //
392 0 : // src: a.SET; a.SINGLEDEL;
393 0 : // dst: a.SET;
394 0 : //
395 0 : // The merged view is:
396 0 : //
397 0 : // a.SET; a.SET; a.SINGLEDEL;
398 0 : //
399 0 : // This is invalid, because there is more than 1 value mutation of the
400 0 : // key before the single delete.
401 0 : //
402 0 : // We walk the source object's history in chronological order, looking
403 0 : // for a single delete that was written before a DEL/RANGEDEL. (NB: We
404 0 : // don't need to look beyond a DEL/RANGEDEL, because these deletes bound
405 0 : // any subsequently-written single deletes to applying to the keys
406 0 : // within src's history between the two tombstones. We already know from
407 0 : // per-object history invariants that any such single delete must be
408 0 : // deterministic with respect to src's keys.)
409 0 : var srcHasUnboundedSingleDelete bool
410 0 : var srcValuesBeforeSingleDelete int
411 0 :
412 0 : // When the srcObj is being ingested (srcCollapsed=t), the semantics
413 0 : // change. We must first "collapse" the key's history to represent the
414 0 : // ingestion semantics.
415 0 : srcHistory := src.history
416 0 : if srcCollapsed {
417 0 : srcHistory = src.history.collapsed()
418 0 : }
419 :
420 0 : srcloop:
421 0 : for _, item := range srcHistory {
422 0 : switch item.opType {
423 0 : case OpWriterDelete, OpWriterDeleteRange:
424 0 : // We found a DEL or RANGEDEL before any single delete. If src
425 0 : // contains additional single deletes, their effects are limited
426 0 : // to applying to later keys. Combining the two object histories
427 0 : // doesn't pose any determinism risk.
428 0 : break srcloop
429 0 : case OpWriterSingleDelete:
430 0 : // We found a single delete. Since we found this single delete
431 0 : // before a DEL or RANGEDEL, this delete has the potential to
432 0 : // affect the visibility of keys in `dstObj`. We'll need to look
433 0 : // for potential conflicts down below.
434 0 : srcHasUnboundedSingleDelete = true
435 0 : if srcValuesBeforeSingleDelete > 1 {
436 0 : panic(errors.AssertionFailedf("unexpectedly found %d sets/merges within %s before single del",
437 0 : srcValuesBeforeSingleDelete, srcObj))
438 : }
439 0 : break srcloop
440 0 : case OpWriterSet, OpWriterMerge:
441 0 : // We found a SET or MERGE operation for this key. If there's a
442 0 : // subsequent single delete, we'll need to make sure there's not
443 0 : // a SET or MERGE in the dst too.
444 0 : srcValuesBeforeSingleDelete++
445 0 : default:
446 0 : panic(errors.AssertionFailedf("unexpected optype %d", item.opType))
447 : }
448 : }
449 0 : if !srcHasUnboundedSingleDelete {
450 0 : continue
451 : }
452 :
453 0 : dst, ok := k.byObjKey[makeObjKey(dstObj, src.key).String()]
454 0 : // If the destination writer has no record of the key, the combined key
455 0 : // history is simply the src object's key history which is valid due to
456 0 : // per-object single deletion invariants.
457 0 : if !ok {
458 0 : continue
459 : }
460 :
461 : // We need to examine the trailing key history on dst.
462 0 : consecutiveValues := srcValuesBeforeSingleDelete
463 0 : dstloop:
464 0 : for i := len(dst.history) - 1; i >= 0; i-- {
465 0 : switch dst.history[i].opType {
466 0 : case OpWriterSet, OpWriterMerge:
467 0 : // A SET/MERGE may conflict if there's more than 1 consecutive
468 0 : // SET/MERGEs.
469 0 : consecutiveValues++
470 0 : if consecutiveValues > 1 {
471 0 : conflicts = append(conflicts, src.key)
472 0 : break dstloop
473 : }
474 0 : case OpWriterDelete, OpWriterSingleDelete, OpWriterDeleteRange:
475 0 : // Dels clear the history, enabling use of single delete.
476 0 : break dstloop
477 0 : default:
478 0 : panic(errors.AssertionFailedf("unexpected optype %d", dst.history[i].opType))
479 : }
480 : }
481 : }
482 0 : return conflicts
483 : }
484 :
485 : // update updates the internal state of the keyManager according to the given
486 : // op.
487 0 : func (k *keyManager) update(o op) {
488 0 : switch s := o.(type) {
489 0 : case *setOp:
490 0 : meta := k.getOrInit(s.writerID, s.key)
491 0 : meta.history = append(meta.history, keyHistoryItem{
492 0 : opType: OpWriterSet,
493 0 : metaTimestamp: k.nextMetaTimestamp(),
494 0 : })
495 0 : case *mergeOp:
496 0 : meta := k.getOrInit(s.writerID, s.key)
497 0 : meta.history = append(meta.history, keyHistoryItem{
498 0 : opType: OpWriterMerge,
499 0 : metaTimestamp: k.nextMetaTimestamp(),
500 0 : })
501 0 : case *deleteOp:
502 0 : meta := k.getOrInit(s.writerID, s.key)
503 0 : if meta.objKey.id.tag() == dbTag {
504 0 : meta.clear()
505 0 : } else {
506 0 : meta.history = append(meta.history, keyHistoryItem{
507 0 : opType: OpWriterDelete,
508 0 : metaTimestamp: k.nextMetaTimestamp(),
509 0 : })
510 0 : }
511 0 : case *deleteRangeOp:
512 0 : // We track the history of discrete point keys, but a range deletion
513 0 : // applies over a continuous key span of infinite keys. However, the key
514 0 : // manager knows all keys that have been used in all operations, so we
515 0 : // can discretize the range tombstone by adding it to every known key
516 0 : // within the range.
517 0 : ts := k.nextMetaTimestamp()
518 0 : keyRange := pebble.KeyRange{Start: s.start, End: s.end}
519 0 : for _, key := range k.knownKeysInRange(keyRange) {
520 0 : meta := k.getOrInit(s.writerID, key)
521 0 : if meta.objKey.id.tag() == dbTag {
522 0 : meta.clear()
523 0 : } else {
524 0 : meta.history = append(meta.history, keyHistoryItem{
525 0 : opType: OpWriterDeleteRange,
526 0 : metaTimestamp: ts,
527 0 : })
528 0 : }
529 : }
530 0 : k.expandBounds(s.writerID, bounds{
531 0 : smallest: s.start,
532 0 : largest: s.end,
533 0 : largestExcl: true,
534 0 : })
535 0 : case *singleDeleteOp:
536 0 : meta := k.getOrInit(s.writerID, s.key)
537 0 : meta.history = append(meta.history, keyHistoryItem{
538 0 : opType: OpWriterSingleDelete,
539 0 : metaTimestamp: k.nextMetaTimestamp(),
540 0 : })
541 :
542 0 : case *ingestOp:
543 0 : // Some ingestion operations may attempt to ingest overlapping sstables
544 0 : // which is prohibited. We know at generation time whether these
545 0 : // ingestions will be successful. If they won't be successful, we should
546 0 : // not update the key state because both the batch(es) and target DB
547 0 : // will be left unmodified.
548 0 : if k.doObjectBoundsOverlap(s.batchIDs) {
549 0 : // This ingestion will fail.
550 0 : return
551 0 : }
552 :
553 : // For each batch, merge the keys into the DB. We can't call
554 : // keyMeta.mergeInto directly to merge, because ingest operations first
555 : // "flatten" the batch (because you can't set the same key twice at a
556 : // single sequence number). Instead we compute the collapsed history and
557 : // merge that.
558 0 : for _, batchID := range s.batchIDs {
559 0 : k.mergeKeysInto(batchID, s.dbID, func(src, dst *keyMeta, ts int) {
560 0 : collapsedSrc := keyMeta{
561 0 : objKey: src.objKey,
562 0 : history: src.history.collapsed(),
563 0 : }
564 0 : collapsedSrc.mergeInto(dst, ts)
565 0 : })
566 : }
567 : // TODO(bilal): Handle ingestAndExciseOp and replicateOp here.
568 0 : case *applyOp:
569 0 : // Merge the keys from this writer into the parent writer.
570 0 : k.mergeKeysInto(s.batchID, s.writerID, (*keyMeta).mergeInto)
571 0 : case *batchCommitOp:
572 0 : // Merge the keys from the batch with the keys from the DB.
573 0 : k.mergeKeysInto(s.batchID, s.dbID, (*keyMeta).mergeInto)
574 : }
575 : }
576 :
577 0 : func (k *keyManager) knownKeys() (keys [][]byte) {
578 0 : return k.globalKeys
579 0 : }
580 :
581 : // knownKeysInRange returns all eligible read keys within the range
582 : // [start,end). The returned slice is owned by the keyManager and must not be
583 : // retained.
584 0 : func (k *keyManager) knownKeysInRange(kr pebble.KeyRange) (keys [][]byte) {
585 0 : s, _ := slices.BinarySearchFunc(k.globalKeys, kr.Start, k.comparer.Compare)
586 0 : e, _ := slices.BinarySearchFunc(k.globalKeys, kr.End, k.comparer.Compare)
587 0 : if s >= e {
588 0 : return nil
589 0 : }
590 0 : return k.globalKeys[s:e]
591 : }
592 :
593 0 : func (k *keyManager) prefixes() (prefixes [][]byte) {
594 0 : return k.globalKeyPrefixes
595 0 : }
596 :
597 : // prefixExists returns true if a key has been generated with the provided
598 : // prefix before.
599 0 : func (k *keyManager) prefixExists(prefix []byte) bool {
600 0 : _, exists := k.globalKeyPrefixesMap[string(prefix)]
601 0 : return exists
602 0 : }
603 :
604 : // eligibleSingleDeleteKeys returns a slice of keys that can be safely single
605 : // deleted, given the writer id. Restricting single delete keys through this
606 : // method is used to ensure the OLW1 guarantee (see the keyManager comment) for
607 : // the provided object ID.
608 0 : func (k *keyManager) eligibleSingleDeleteKeys(o objID) (keys [][]byte) {
609 0 : // Creating a slice of keys is wasteful given that the caller will pick one,
610 0 : // but makes it simpler for unit testing.
611 0 : for _, key := range k.globalKeys {
612 0 : objKey := makeObjKey(o, key)
613 0 : meta, ok := k.byObjKey[objKey.String()]
614 0 : if !ok {
615 0 : keys = append(keys, key)
616 0 : continue
617 : }
618 : // Examine the history within this object.
619 0 : if meta.history.canSingleDelete() {
620 0 : keys = append(keys, key)
621 0 : }
622 : }
623 0 : return keys
624 : }
625 :
626 : // a keyHistoryItem describes an individual operation performed on a key.
627 : type keyHistoryItem struct {
628 : // opType may be writerSet, writerDelete, writerSingleDelete,
629 : // writerDeleteRange or writerMerge only. No other opTypes may appear here.
630 : opType OpType
631 : metaTimestamp int
632 : }
633 :
634 : // keyHistory captures the history of mutations to a key in chronological order.
635 : type keyHistory []keyHistoryItem
636 :
637 : // before returns the subslice of the key history that happened strictly before
638 : // the provided meta timestamp.
639 0 : func (h keyHistory) before(metaTimestamp int) keyHistory {
640 0 : i, _ := slices.BinarySearchFunc(h, metaTimestamp, func(a keyHistoryItem, ts int) int {
641 0 : return cmp.Compare(a.metaTimestamp, ts)
642 0 : })
643 0 : return h[:i]
644 : }
645 :
646 : // canSingleDelete examines the tail of the history and returns true if a single
647 : // delete appended to this history would satisfy the single delete invariants.
648 0 : func (h keyHistory) canSingleDelete() bool {
649 0 : if len(h) == 0 {
650 0 : return true
651 0 : }
652 0 : switch o := h[len(h)-1].opType; o {
653 0 : case OpWriterDelete, OpWriterDeleteRange, OpWriterSingleDelete:
654 0 : return true
655 0 : case OpWriterSet, OpWriterMerge:
656 0 : if len(h) == 1 {
657 0 : return true
658 0 : }
659 0 : return h[len(h)-2].opType.isDelete()
660 0 : default:
661 0 : panic(errors.AssertionFailedf("unexpected writer op %v", o))
662 : }
663 : }
664 :
665 0 : func (h keyHistory) String() string {
666 0 : var sb strings.Builder
667 0 : for i, it := range h {
668 0 : if i > 0 {
669 0 : fmt.Fprint(&sb, ", ")
670 0 : }
671 0 : switch it.opType {
672 0 : case OpWriterDelete:
673 0 : fmt.Fprint(&sb, "del")
674 0 : case OpWriterDeleteRange:
675 0 : fmt.Fprint(&sb, "delrange")
676 0 : case OpWriterSingleDelete:
677 0 : fmt.Fprint(&sb, "singledel")
678 0 : case OpWriterSet:
679 0 : fmt.Fprint(&sb, "set")
680 0 : case OpWriterMerge:
681 0 : fmt.Fprint(&sb, "merge")
682 0 : default:
683 0 : fmt.Fprintf(&sb, "optype[v=%d]", it.opType)
684 : }
685 0 : fmt.Fprintf(&sb, "(%d)", it.metaTimestamp)
686 : }
687 0 : return sb.String()
688 : }
689 :
690 : // hasVisibleKey examines the tail of the history and returns true if the
691 : // history should end in a visible value for this key.
692 0 : func (h keyHistory) hasVisibleValue() bool {
693 0 : if len(h) == 0 {
694 0 : return false
695 0 : }
696 0 : return !h[len(h)-1].opType.isDelete()
697 : }
698 :
699 : // collapsed returns a new key history that's equivalent to the history created
700 : // by an ingestOp that "collapses" a batch's keys. See ingestOp.build.
701 0 : func (h keyHistory) collapsed() keyHistory {
702 0 : var ret keyHistory
703 0 : // When collapsing a batch, any range deletes are semantically applied
704 0 : // first. Look for any range deletes and apply them.
705 0 : for _, op := range h {
706 0 : if op.opType == OpWriterDeleteRange {
707 0 : ret = append(ret, op)
708 0 : break
709 : }
710 : }
711 : // Among point keys, the most recently written key wins.
712 0 : for i := len(h) - 1; i >= 0; i-- {
713 0 : if h[i].opType != OpWriterDeleteRange {
714 0 : ret = append(ret, h[i])
715 0 : break
716 : }
717 : }
718 0 : return ret
719 : }
720 :
721 0 : func opWrittenKeys(untypedOp op) [][]byte {
722 0 : switch t := untypedOp.(type) {
723 0 : case *applyOp:
724 0 : case *batchCommitOp:
725 0 : case *checkpointOp:
726 0 : case *closeOp:
727 0 : case *compactOp:
728 0 : case *dbRestartOp:
729 0 : case *deleteOp:
730 0 : return [][]byte{t.key}
731 0 : case *deleteRangeOp:
732 0 : return [][]byte{t.start, t.end}
733 0 : case *flushOp:
734 0 : case *getOp:
735 0 : case *ingestOp:
736 0 : case *initOp:
737 0 : case *iterFirstOp:
738 0 : case *iterLastOp:
739 0 : case *iterNextOp:
740 0 : case *iterNextPrefixOp:
741 0 : case *iterCanSingleDelOp:
742 0 : case *iterPrevOp:
743 0 : case *iterSeekGEOp:
744 0 : case *iterSeekLTOp:
745 0 : case *iterSeekPrefixGEOp:
746 0 : case *iterSetBoundsOp:
747 0 : case *iterSetOptionsOp:
748 0 : case *mergeOp:
749 0 : return [][]byte{t.key}
750 0 : case *newBatchOp:
751 0 : case *newIndexedBatchOp:
752 0 : case *newIterOp:
753 0 : case *newIterUsingCloneOp:
754 0 : case *newSnapshotOp:
755 0 : case *rangeKeyDeleteOp:
756 0 : case *rangeKeySetOp:
757 0 : case *rangeKeyUnsetOp:
758 0 : case *setOp:
759 0 : return [][]byte{t.key}
760 0 : case *singleDeleteOp:
761 0 : return [][]byte{t.key}
762 0 : case *replicateOp:
763 0 : return [][]byte{t.start, t.end}
764 : }
765 0 : return nil
766 : }
767 :
768 0 : func loadPrecedingKeys(t TestingT, ops []op, cfg *OpConfig, m *keyManager) {
769 0 : for _, op := range ops {
770 0 : // Pretend we're generating all the operation's keys as potential new
771 0 : // key, so that we update the key manager's keys and prefix sets.
772 0 : for _, k := range opWrittenKeys(op) {
773 0 : m.addNewKey(k)
774 0 :
775 0 : // If the key has a suffix, ratchet up the suffix distribution if
776 0 : // necessary.
777 0 : if s := m.comparer.Split(k); s < len(k) {
778 0 : suffix, err := testkeys.ParseSuffix(k[s:])
779 0 : require.NoError(t, err)
780 0 : if uint64(suffix) > cfg.writeSuffixDist.Max() {
781 0 : diff := int(uint64(suffix) - cfg.writeSuffixDist.Max())
782 0 : cfg.writeSuffixDist.IncMax(diff)
783 0 : }
784 : }
785 : }
786 :
787 : // Update key tracking state.
788 0 : m.update(op)
789 : }
790 : }
791 :
792 0 : func insertSorted(cmp base.Compare, dst *[][]byte, k []byte) {
793 0 : s := *dst
794 0 : i, _ := slices.BinarySearchFunc(s, k, cmp)
795 0 : *dst = slices.Insert(s, i, k)
796 0 : }
|