LCOV - code coverage report
Current view: top level - pebble/metamorphic - key_manager.go (source / functions) Hit Total Coverage
Test: 2024-02-04 08:15Z 087fd92a - tests only.lcov Lines: 462 507 91.1 %
Date: 2024-02-04 08:15:58 Functions: 0 0 -

          Line data    Source code
       1             : package metamorphic
       2             : 
       3             : import (
       4             :         "cmp"
       5             :         "fmt"
       6             :         "slices"
       7             :         "strings"
       8             : 
       9             :         "github.com/cockroachdb/errors"
      10             :         "github.com/cockroachdb/pebble"
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/internal/testkeys"
      13             :         "github.com/stretchr/testify/require"
      14             : )
      15             : 
      16             : // keyMeta is metadata associated with an (objID, key) pair, where objID is
      17             : // a writer containing the key.
      18             : type keyMeta struct {
      19             :         objID objID
      20             :         key   []byte
      21             :         // history provides the history of writer operations applied against this
      22             :         // key on this object. history is always ordered by non-decreasing
      23             :         // metaTimestamp.
      24             :         history keyHistory
      25             : }
      26             : 
      27           1 : func (m *keyMeta) clear() {
      28           1 :         m.history = m.history[:0]
      29           1 : }
      30             : 
      31             : // mergeInto merges this metadata into the metadata for other, appending all of
      32             : // its individual operations to dst at the provided timestamp.
      33           1 : func (m *keyMeta) mergeInto(dst *keyMeta, ts int) {
      34           1 :         for _, op := range m.history {
      35           1 :                 // If the key is being merged into a database object and the operation
      36           1 :                 // is a delete, we can clear the destination history. Database objects
      37           1 :                 // are end points in the merging of keys and won't be the source of a
      38           1 :                 // future merge. Deletions cause all other operations to behave as
      39           1 :                 // though the key was never written to the database at all, so we don't
      40           1 :                 // need to consider it for maintaining single delete invariants.
      41           1 :                 //
      42           1 :                 // NB: There's a subtlety here in that isDelete() will return true if
      43           1 :                 // opType is a writerSingleDelete, but single deletes are capable of
      44           1 :                 // leaking information about the history of writes. However, that's
      45           1 :                 // okay, because as long as we're properly generating single deletes
      46           1 :                 // according to the W1 invariant described in keyManager's comment, a
      47           1 :                 // single delete is equivalent to delete for the current history.
      48           1 :                 if dst.objID.tag() == dbTag && op.opType.isDelete() {
      49           1 :                         dst.clear()
      50           1 :                         continue
      51             :                 }
      52           1 :                 dst.history = append(dst.history, keyHistoryItem{
      53           1 :                         opType:        op.opType,
      54           1 :                         metaTimestamp: ts,
      55           1 :                 })
      56             :         }
      57             : }
      58             : 
      59             : type bounds struct {
      60             :         smallest    []byte
      61             :         largest     []byte
      62             :         largestExcl bool // is largest exclusive?
      63             : }
      64             : 
      65           1 : func (b bounds) String() string {
      66           1 :         if b.largestExcl {
      67           1 :                 return fmt.Sprintf("[%q,%q)", b.smallest, b.largest)
      68           1 :         }
      69           1 :         return fmt.Sprintf("[%q,%q]", b.smallest, b.largest)
      70             : }
      71             : 
      72             : // Overlaps returns true iff the bounds intersect.
      73           1 : func (b *bounds) Overlaps(cmp base.Compare, other bounds) bool {
      74           1 :         if b.IsUnset() || other.IsUnset() {
      75           1 :                 return false
      76           1 :         }
      77             :         // Is b strictly before other?
      78           1 :         if v := cmp(b.largest, other.smallest); v < 0 || (v == 0 && b.largestExcl) {
      79           0 :                 return false
      80           0 :         }
      81             :         // Is b strictly after other?
      82           1 :         if v := cmp(b.smallest, other.largest); v > 0 || (v == 0 && other.largestExcl) {
      83           1 :                 return false
      84           1 :         }
      85           1 :         return true
      86             : }
      87             : 
      88             : // IsUnset returns true if the bounds haven't been set.
      89           1 : func (b bounds) IsUnset() bool {
      90           1 :         return b.smallest == nil && b.largest == nil
      91           1 : }
      92             : 
      93             : // Expand potentially expands the receiver bounds to include the other given
      94             : // bounds. If the receiver is unset, the other bounds are copied.
      95           1 : func (b *bounds) Expand(cmp base.Compare, other bounds) {
      96           1 :         if other.IsUnset() {
      97           1 :                 return
      98           1 :         }
      99           1 :         if b.IsUnset() {
     100           1 :                 *b = other
     101           1 :                 return
     102           1 :         }
     103           1 :         if cmp(b.smallest, other.smallest) > 0 {
     104           1 :                 b.smallest = other.smallest
     105           1 :         }
     106           1 :         if v := cmp(b.largest, other.largest); v < 0 || (v == 0 && b.largestExcl) {
     107           1 :                 b.largest = other.largest
     108           1 :                 b.largestExcl = other.largestExcl
     109           1 :         }
     110             : }
     111             : 
     112             : // keyManager tracks the write operations performed on keys in the generation
     113             : // phase of the metamorphic test. It maintains histories of operations performed
     114             : // against every unique user key on every writer object. These histories inform
     115             : // operation generation in order to maintain invariants that Pebble requires of
     116             : // end users, mostly around single deletions.
     117             : //
     118             : // A single deletion has a subtle requirement of the writer:
     119             : //
     120             : //      W1: The writer may only single delete a key `k` if `k` has been Set once
     121             : //          (and never MergeD) since the last delete.
     122             : //
     123             : // When a SINGLEDEL key deletes a SET key within a compaction, both the SET and
     124             : // the SINGLEDEL keys are elided. If multiple SETs of the key exist within the
     125             : // LSM, the SINGLEDEL reveals the lower SET. This behavior is dependent on the
     126             : // internal LSM state and nondeterministic. To ensure determinism, the end user
     127             : // must satisfy W1 and use single delete only when they can guarantee that the
     128             : // key has been set at most once since the last delete, preventing this rollback
     129             : // to a previous value.
     130             : //
     131             : // This W1 invariant requires a delicate dance during operation generation,
     132             : // because independent batches may be independently built and committed. With
     133             : // multi-instance variants of the metamorphic tests, keys in batches may
     134             : // ultimately be committed to any of several DB instances. To satisfy these
     135             : // requirements, the key manager tracks the history of every key on every
     136             : // writable object. When generating a new single deletion operation, the
     137             : // generator asks the key manager for a set of keys for which a single delete
     138             : // maintains the W1 invariant within the object itself. This object-local W1
     139             : // invariant (OLW1) is equivalent to W1 if one only ever performs write
     140             : // operations directly against individual DB objects.
     141             : //
     142             : // However with the existence of batches that receive writes independent of DB
     143             : // objects, W1 may be violated by appending the histories of two objects that
     144             : // independently satisfy OLW1. Consider a sequence such as:
     145             : //
     146             : //  1. db1.Set("foo")
     147             : //  2. batch1.Set("foo")
     148             : //  3. batch1.SingleDelete("foo")
     149             : //  4. db1.Apply(batch1)
     150             : //
     151             : // Both db1 and batch1 satisfy the object-local invariant OLW1. However the
     152             : // composition of the histories created by appending batch1's operations to db1
     153             : // creates a history that now violates W1 on db1. To detect this violation,
     154             : // batch applications/commits and ingestions examine the tail of the destination
     155             : // object's history and the head of the source batch's history. When a violation
     156             : // is detected, these operations insert additional Delete operations to clear
     157             : // the conflicting keys before proceeding with the conflicting operation. These
     158             : // deletes reset the key history.
     159             : //
     160             : // Note that this generation-time key tracking requires that operations be
     161             : // infallible, because a runtime failure would cause the key manager's state to
     162             : // diverge from the runtime object state. Ingestion operations pose an obstacle,
     163             : // because the generator may generate ingestions that fail due to overlapping
     164             : // sstables. Today, this complication is sidestepped by avoiding ingestion of
     165             : // multiple batches containing deletes or single deletes since loss of those
     166             : // specific operations on a key are what we cannot tolerate (doing SingleDelete
     167             : // on a key that has not been written to because the Set was lost is harmless).
     168             : //
     169             : // TODO(jackson): Instead, compute smallest and largest bounds of batches so
     170             : // that we know at generation-time whether or not an ingestion operation will
     171             : // fail and can avoid updating key state.
     172             : type keyManager struct {
     173             :         comparer *base.Comparer
     174             : 
     175             :         // metaTimestamp is used to provide a ordering over certain operations like
     176             :         // iter creation, updates to keys. Keeping track of the timestamp allows us
     177             :         // to make determinations such as whether a key will be visible to an
     178             :         // iterator.
     179             :         metaTimestamp int
     180             : 
     181             :         byObj map[objID]*objKeyMeta
     182             :         // globalKeys represents all the keys that have been generated so far. Not
     183             :         // all these keys have been written to. globalKeys is sorted.
     184             :         globalKeys [][]byte
     185             :         // globalKeysMap contains the same keys as globalKeys but in a map. It
     186             :         // ensures no duplication.
     187             :         globalKeysMap map[string]bool
     188             :         // globalKeyPrefixes contains all the key prefixes (as defined by the
     189             :         // comparer's Split) generated so far. globalKeyPrefixes is sorted.
     190             :         globalKeyPrefixes [][]byte
     191             :         // globalKeyPrefixesMap contains the same keys as globalKeyPrefixes. It
     192             :         // ensures no duplication.
     193             :         globalKeyPrefixesMap map[string]struct{}
     194             : }
     195             : 
     196             : type objKeyMeta struct {
     197             :         id objID
     198             :         // List of keys, and what has happened to each in this object.
     199             :         // Will be transferred when needed.
     200             :         keys map[string]*keyMeta
     201             :         // bounds holds user key bounds encompassing all the keys set within an
     202             :         // object. It's updated within `update` when a new op is generated.
     203             :         bounds bounds
     204             : }
     205             : 
     206             : // MergeKey adds the given key at the given meta timestamp, merging the histories as needed.
     207           1 : func (okm *objKeyMeta) MergeKey(k *keyMeta, ts int) {
     208           1 :         meta, ok := okm.keys[string(k.key)]
     209           1 :         if !ok {
     210           1 :                 meta = &keyMeta{
     211           1 :                         objID: okm.id,
     212           1 :                         key:   k.key,
     213           1 :                 }
     214           1 :                 okm.keys[string(k.key)] = meta
     215           1 :         }
     216           1 :         k.mergeInto(meta, ts)
     217             : }
     218             : 
     219             : // CollapseKeys collapses the history of all keys. Used with ingestion operation
     220             : // which only use the last value of any given key.
     221           1 : func (okm *objKeyMeta) CollapseKeys() {
     222           1 :         for _, keyMeta := range okm.keys {
     223           1 :                 keyMeta.history = keyMeta.history.collapsed()
     224           1 :         }
     225             : }
     226             : 
     227             : // objKeyMeta looks up the objKeyMeta for a given object, creating it if necessary.
     228           1 : func (k *keyManager) objKeyMeta(o objID) *objKeyMeta {
     229           1 :         m, ok := k.byObj[o]
     230           1 :         if !ok {
     231           1 :                 m = &objKeyMeta{
     232           1 :                         id:   o,
     233           1 :                         keys: make(map[string]*keyMeta),
     234           1 :                 }
     235           1 :                 k.byObj[o] = m
     236           1 :         }
     237           1 :         return m
     238             : }
     239             : 
     240             : // sortedKeysForObj returns all the values in objKeyMeta(o).keys, in sorted
     241             : // order.
     242           1 : func (k *keyManager) sortedKeysForObj(o objID) []*keyMeta {
     243           1 :         okm := k.objKeyMeta(o)
     244           1 :         res := make([]*keyMeta, 0, len(okm.keys))
     245           1 :         for _, m := range okm.keys {
     246           1 :                 res = append(res, m)
     247           1 :         }
     248           1 :         slices.SortFunc(res, func(a, b *keyMeta) int {
     249           1 :                 cmp := k.comparer.Compare(a.key, b.key)
     250           1 :                 if cmp == 0 {
     251           0 :                         panic(fmt.Sprintf("distinct keys %q and %q compared as equal", a.key, b.key))
     252             :                 }
     253           1 :                 return cmp
     254             :         })
     255           1 :         return res
     256             : }
     257             : 
     258           1 : func (k *keyManager) nextMetaTimestamp() int {
     259           1 :         ret := k.metaTimestamp
     260           1 :         k.metaTimestamp++
     261           1 :         return ret
     262           1 : }
     263             : 
     264             : // newKeyManager returns a pointer to a new keyManager. Callers should
     265             : // interact with this using addNewKey, knownKeys, update methods only.
     266           1 : func newKeyManager(numInstances int) *keyManager {
     267           1 :         m := &keyManager{
     268           1 :                 comparer:             testkeys.Comparer,
     269           1 :                 byObj:                make(map[objID]*objKeyMeta),
     270           1 :                 globalKeysMap:        make(map[string]bool),
     271           1 :                 globalKeyPrefixesMap: make(map[string]struct{}),
     272           1 :         }
     273           1 :         for i := 1; i <= max(numInstances, 1); i++ {
     274           1 :                 m.objKeyMeta(makeObjID(dbTag, uint32(i)))
     275           1 :         }
     276           1 :         return m
     277             : }
     278             : 
     279             : // addNewKey adds the given key to the key manager for global key tracking.
     280             : // Returns false iff this is not a new key.
     281           1 : func (k *keyManager) addNewKey(key []byte) bool {
     282           1 :         if k.globalKeysMap[string(key)] {
     283           1 :                 return false
     284           1 :         }
     285           1 :         insertSorted(k.comparer.Compare, &k.globalKeys, key)
     286           1 :         k.globalKeysMap[string(key)] = true
     287           1 : 
     288           1 :         prefixLen := k.comparer.Split(key)
     289           1 :         if _, ok := k.globalKeyPrefixesMap[string(key[:prefixLen])]; !ok {
     290           1 :                 insertSorted(k.comparer.Compare, &k.globalKeyPrefixes, key[:prefixLen])
     291           1 :                 k.globalKeyPrefixesMap[string(key[:prefixLen])] = struct{}{}
     292           1 :         }
     293           1 :         return true
     294             : }
     295             : 
     296             : // getOrInit returns the keyMeta for the (objID, key) pair, if it exists, else
     297             : // allocates, initializes and returns a new value.
     298           1 : func (k *keyManager) getOrInit(id objID, key []byte) *keyMeta {
     299           1 :         objKeys := k.objKeyMeta(id)
     300           1 :         m, ok := objKeys.keys[string(key)]
     301           1 :         if ok {
     302           1 :                 return m
     303           1 :         }
     304           1 :         m = &keyMeta{
     305           1 :                 objID: id,
     306           1 :                 key:   key,
     307           1 :         }
     308           1 :         // Initialize the key-to-meta index.
     309           1 :         objKeys.keys[string(key)] = m
     310           1 :         // Expand the object's bounds to contain this key if they don't already.
     311           1 :         objKeys.bounds.Expand(k.comparer.Compare, bounds{
     312           1 :                 smallest: key,
     313           1 :                 largest:  key,
     314           1 :         })
     315           1 :         return m
     316             : }
     317             : 
     318             : // mergeKeysInto merges the keys and bounds from an object into another. Assumes
     319             : // the source object will not be used again.
     320           1 : func (k *keyManager) mergeKeysInto(from, to objID) {
     321           1 :         fromMeta := k.objKeyMeta(from)
     322           1 :         toMeta := k.objKeyMeta(to)
     323           1 :         ts := k.nextMetaTimestamp()
     324           1 :         // The result should be the same, regardless of the ordering of the keys.
     325           1 :         for _, keyMeta := range fromMeta.keys {
     326           1 :                 toMeta.MergeKey(keyMeta, ts)
     327           1 :         }
     328             :         // All the keys in `from` have been merged into `to`. Expand `to`'s bounds
     329             :         // to be at least as wide as `from`'s.
     330           1 :         toMeta.bounds.Expand(k.comparer.Compare, fromMeta.bounds)
     331           1 : 
     332           1 :         delete(k.byObj, from) // Unlink "from" obj.
     333             : }
     334             : 
     335             : // expandBounds expands the incrementally maintained bounds of o to be at least
     336             : // as wide as `b`.
     337           1 : func (k *keyManager) expandBounds(o objID, b bounds) {
     338           1 :         k.objKeyMeta(o).bounds.Expand(k.comparer.Compare, b)
     339           1 : }
     340             : 
     341             : // doObjectBoundsOverlap returns true iff any of the named objects have key
     342             : // bounds that overlap any other named object.
     343           1 : func (k *keyManager) doObjectBoundsOverlap(objIDs []objID) bool {
     344           1 :         for i := range objIDs {
     345           1 :                 ib, iok := k.byObj[objIDs[i]]
     346           1 :                 if !iok {
     347           1 :                         continue
     348             :                 }
     349           1 :                 for j := i + 1; j < len(objIDs); j++ {
     350           1 :                         jb, jok := k.byObj[objIDs[j]]
     351           1 :                         if !jok {
     352           1 :                                 continue
     353             :                         }
     354           1 :                         if ib.bounds.Overlaps(k.comparer.Compare, jb.bounds) {
     355           1 :                                 return true
     356           1 :                         }
     357             :                 }
     358             :         }
     359           1 :         return false
     360             : }
     361             : 
     362             : // checkForSingleDelConflicts examines all the keys written to srcObj, and
     363             : // determines whether any of the contained single deletes would be
     364             : // nondeterministic if applied to dstObj in dstObj's current state. It returns a
     365             : // slice of all the keys that are found to conflict. In order to preserve
     366             : // determinism, the caller must delete the key from the destination before
     367             : // writing src's mutations to dst in order to ensure determinism.
     368             : //
     369             : // It takes a `srcCollapsed` parameter that determines whether the source
     370             : // history should be "collapsed" (see keyHistory.collapsed) before determining
     371             : // whether the applied state will conflict. This is required to facilitate
     372             : // ingestOps which are NOT equivalent to committing the batch, because they can
     373             : // only commit 1 internal point key at each unique user key.
     374           1 : func (k *keyManager) checkForSingleDelConflicts(srcObj, dstObj objID, srcCollapsed bool) [][]byte {
     375           1 :         dstKeys := k.objKeyMeta(dstObj)
     376           1 :         var conflicts [][]byte
     377           1 :         for _, src := range k.sortedKeysForObj(srcObj) {
     378           1 :                 // Single delete generation logic already ensures that both srcObj and
     379           1 :                 // dstObj's single deletes are deterministic within the context of their
     380           1 :                 // existing writes. However, applying srcObj on top of dstObj may
     381           1 :                 // violate the invariants. Consider:
     382           1 :                 //
     383           1 :                 //    src: a.SET; a.SINGLEDEL;
     384           1 :                 //    dst: a.SET;
     385           1 :                 //
     386           1 :                 // The merged view is:
     387           1 :                 //
     388           1 :                 //    a.SET; a.SET; a.SINGLEDEL;
     389           1 :                 //
     390           1 :                 // This is invalid, because there is more than 1 value mutation of the
     391           1 :                 // key before the single delete.
     392           1 :                 //
     393           1 :                 // We walk the source object's history in chronological order, looking
     394           1 :                 // for a single delete that was written before a DEL/RANGEDEL. (NB: We
     395           1 :                 // don't need to look beyond a DEL/RANGEDEL, because these deletes bound
     396           1 :                 // any subsequently-written single deletes to applying to the keys
     397           1 :                 // within src's history between the two tombstones. We already know from
     398           1 :                 // per-object history invariants that any such single delete must be
     399           1 :                 // deterministic with respect to src's keys.)
     400           1 :                 var srcHasUnboundedSingleDelete bool
     401           1 :                 var srcValuesBeforeSingleDelete int
     402           1 : 
     403           1 :                 // When the srcObj is being ingested (srcCollapsed=t), the semantics
     404           1 :                 // change. We must first "collapse" the key's history to represent the
     405           1 :                 // ingestion semantics.
     406           1 :                 srcHistory := src.history
     407           1 :                 if srcCollapsed {
     408           1 :                         srcHistory = src.history.collapsed()
     409           1 :                 }
     410             : 
     411           1 :         srcloop:
     412           1 :                 for _, item := range srcHistory {
     413           1 :                         switch item.opType {
     414           1 :                         case OpWriterDelete, OpWriterDeleteRange:
     415           1 :                                 // We found a DEL or RANGEDEL before any single delete. If src
     416           1 :                                 // contains additional single deletes, their effects are limited
     417           1 :                                 // to applying to later keys. Combining the two object histories
     418           1 :                                 // doesn't pose any determinism risk.
     419           1 :                                 break srcloop
     420           1 :                         case OpWriterSingleDelete:
     421           1 :                                 // We found a single delete. Since we found this single delete
     422           1 :                                 // before a DEL or RANGEDEL, this delete has the potential to
     423           1 :                                 // affect the visibility of keys in `dstObj`. We'll need to look
     424           1 :                                 // for potential conflicts down below.
     425           1 :                                 srcHasUnboundedSingleDelete = true
     426           1 :                                 if srcValuesBeforeSingleDelete > 1 {
     427           0 :                                         panic(errors.AssertionFailedf("unexpectedly found %d sets/merges within %s before single del",
     428           0 :                                                 srcValuesBeforeSingleDelete, srcObj))
     429             :                                 }
     430           1 :                                 break srcloop
     431           1 :                         case OpWriterSet, OpWriterMerge:
     432           1 :                                 // We found a SET or MERGE operation for this key. If there's a
     433           1 :                                 // subsequent single delete, we'll need to make sure there's not
     434           1 :                                 // a SET or MERGE in the dst too.
     435           1 :                                 srcValuesBeforeSingleDelete++
     436           0 :                         default:
     437           0 :                                 panic(errors.AssertionFailedf("unexpected optype %d", item.opType))
     438             :                         }
     439             :                 }
     440           1 :                 if !srcHasUnboundedSingleDelete {
     441           1 :                         continue
     442             :                 }
     443             : 
     444           1 :                 dst, ok := dstKeys.keys[string(src.key)]
     445           1 :                 // If the destination writer has no record of the key, the combined key
     446           1 :                 // history is simply the src object's key history which is valid due to
     447           1 :                 // per-object single deletion invariants.
     448           1 :                 if !ok {
     449           1 :                         continue
     450             :                 }
     451             : 
     452             :                 // We need to examine the trailing key history on dst.
     453           1 :                 consecutiveValues := srcValuesBeforeSingleDelete
     454           1 :         dstloop:
     455           1 :                 for i := len(dst.history) - 1; i >= 0; i-- {
     456           1 :                         switch dst.history[i].opType {
     457           1 :                         case OpWriterSet, OpWriterMerge:
     458           1 :                                 // A SET/MERGE may conflict if there's more than 1 consecutive
     459           1 :                                 // SET/MERGEs.
     460           1 :                                 consecutiveValues++
     461           1 :                                 if consecutiveValues > 1 {
     462           1 :                                         conflicts = append(conflicts, src.key)
     463           1 :                                         break dstloop
     464             :                                 }
     465           0 :                         case OpWriterDelete, OpWriterSingleDelete, OpWriterDeleteRange:
     466           0 :                                 // Dels clear the history, enabling use of single delete.
     467           0 :                                 break dstloop
     468           0 :                         default:
     469           0 :                                 panic(errors.AssertionFailedf("unexpected optype %d", dst.history[i].opType))
     470             :                         }
     471             :                 }
     472             :         }
     473           1 :         return conflicts
     474             : }
     475             : 
     476             : // update updates the internal state of the keyManager according to the given
     477             : // op.
     478           1 : func (k *keyManager) update(o op) {
     479           1 :         switch s := o.(type) {
     480           1 :         case *setOp:
     481           1 :                 meta := k.getOrInit(s.writerID, s.key)
     482           1 :                 meta.history = append(meta.history, keyHistoryItem{
     483           1 :                         opType:        OpWriterSet,
     484           1 :                         metaTimestamp: k.nextMetaTimestamp(),
     485           1 :                 })
     486           1 :         case *mergeOp:
     487           1 :                 meta := k.getOrInit(s.writerID, s.key)
     488           1 :                 meta.history = append(meta.history, keyHistoryItem{
     489           1 :                         opType:        OpWriterMerge,
     490           1 :                         metaTimestamp: k.nextMetaTimestamp(),
     491           1 :                 })
     492           1 :         case *deleteOp:
     493           1 :                 meta := k.getOrInit(s.writerID, s.key)
     494           1 :                 if meta.objID.tag() == dbTag {
     495           1 :                         meta.clear()
     496           1 :                 } else {
     497           1 :                         meta.history = append(meta.history, keyHistoryItem{
     498           1 :                                 opType:        OpWriterDelete,
     499           1 :                                 metaTimestamp: k.nextMetaTimestamp(),
     500           1 :                         })
     501           1 :                 }
     502           1 :         case *deleteRangeOp:
     503           1 :                 // We track the history of discrete point keys, but a range deletion
     504           1 :                 // applies over a continuous key span of infinite keys. However, the key
     505           1 :                 // manager knows all keys that have been used in all operations, so we
     506           1 :                 // can discretize the range tombstone by adding it to every known key
     507           1 :                 // within the range.
     508           1 :                 ts := k.nextMetaTimestamp()
     509           1 :                 keyRange := pebble.KeyRange{Start: s.start, End: s.end}
     510           1 :                 for _, key := range k.knownKeysInRange(keyRange) {
     511           1 :                         meta := k.getOrInit(s.writerID, key)
     512           1 :                         if meta.objID.tag() == dbTag {
     513           1 :                                 meta.clear()
     514           1 :                         } else {
     515           1 :                                 meta.history = append(meta.history, keyHistoryItem{
     516           1 :                                         opType:        OpWriterDeleteRange,
     517           1 :                                         metaTimestamp: ts,
     518           1 :                                 })
     519           1 :                         }
     520             :                 }
     521           1 :                 k.expandBounds(s.writerID, bounds{
     522           1 :                         smallest:    s.start,
     523           1 :                         largest:     s.end,
     524           1 :                         largestExcl: true,
     525           1 :                 })
     526           1 :         case *singleDeleteOp:
     527           1 :                 meta := k.getOrInit(s.writerID, s.key)
     528           1 :                 meta.history = append(meta.history, keyHistoryItem{
     529           1 :                         opType:        OpWriterSingleDelete,
     530           1 :                         metaTimestamp: k.nextMetaTimestamp(),
     531           1 :                 })
     532           1 :         case *rangeKeyDeleteOp:
     533           1 :                 // Range key operations on their own don't determine singledel eligibility,
     534           1 :                 // however range key operations could be part of a batch which contains
     535           1 :                 // other operations that do affect it. If those batches were to get
     536           1 :                 // ingested, we'd need to know what the bounds of sstables generated out
     537           1 :                 // of those batches are, as that determines whether that ingestion
     538           1 :                 // will succeed or not.
     539           1 :                 k.expandBounds(s.writerID, bounds{
     540           1 :                         smallest:    s.start,
     541           1 :                         largest:     s.end,
     542           1 :                         largestExcl: true,
     543           1 :                 })
     544           1 :         case *rangeKeySetOp:
     545           1 :                 k.expandBounds(s.writerID, bounds{
     546           1 :                         smallest:    s.start,
     547           1 :                         largest:     s.end,
     548           1 :                         largestExcl: true,
     549           1 :                 })
     550           1 :         case *rangeKeyUnsetOp:
     551           1 :                 k.expandBounds(s.writerID, bounds{
     552           1 :                         smallest:    s.start,
     553           1 :                         largest:     s.end,
     554           1 :                         largestExcl: true,
     555           1 :                 })
     556           1 :         case *ingestOp:
     557           1 :                 // Some ingestion operations may attempt to ingest overlapping sstables
     558           1 :                 // which is prohibited. We know at generation time whether these
     559           1 :                 // ingestions will be successful. If they won't be successful, we should
     560           1 :                 // not update the key state because both the batch(es) and target DB
     561           1 :                 // will be left unmodified.
     562           1 :                 if k.doObjectBoundsOverlap(s.batchIDs) {
     563           1 :                         // This ingestion will fail.
     564           1 :                         return
     565           1 :                 }
     566             : 
     567             :                 // For each batch, merge the keys into the DB. We can't call
     568             :                 // keyMeta.mergeInto directly to merge, because ingest operations first
     569             :                 // "flatten" the batch (because you can't set the same key twice at a
     570             :                 // single sequence number). Instead we compute the collapsed history and
     571             :                 // merge that.
     572           1 :                 for _, batchID := range s.batchIDs {
     573           1 :                         k.objKeyMeta(batchID).CollapseKeys()
     574           1 :                         k.mergeKeysInto(batchID, s.dbID)
     575           1 :                 }
     576             :                 // TODO(bilal): Handle ingestAndExciseOp and replicateOp here. We currently
     577             :                 // disable SingleDelete when these operations are enabled (see
     578             :                 // multiInstanceConfig).
     579           1 :         case *applyOp:
     580           1 :                 // Merge the keys from this writer into the parent writer.
     581           1 :                 k.mergeKeysInto(s.batchID, s.writerID)
     582           1 :         case *batchCommitOp:
     583           1 :                 // Merge the keys from the batch with the keys from the DB.
     584           1 :                 k.mergeKeysInto(s.batchID, s.dbID)
     585             :         }
     586             : }
     587             : 
     588           1 : func (k *keyManager) knownKeys() (keys [][]byte) {
     589           1 :         return k.globalKeys
     590           1 : }
     591             : 
     592             : // knownKeysInRange returns all eligible read keys within the range
     593             : // [start,end). The returned slice is owned by the keyManager and must not be
     594             : // retained.
     595           1 : func (k *keyManager) knownKeysInRange(kr pebble.KeyRange) (keys [][]byte) {
     596           1 :         s, _ := slices.BinarySearchFunc(k.globalKeys, kr.Start, k.comparer.Compare)
     597           1 :         e, _ := slices.BinarySearchFunc(k.globalKeys, kr.End, k.comparer.Compare)
     598           1 :         if s >= e {
     599           1 :                 return nil
     600           1 :         }
     601           1 :         return k.globalKeys[s:e]
     602             : }
     603             : 
     604           1 : func (k *keyManager) prefixes() (prefixes [][]byte) {
     605           1 :         return k.globalKeyPrefixes
     606           1 : }
     607             : 
     608             : // prefixExists returns true if a key has been generated with the provided
     609             : // prefix before.
     610           1 : func (k *keyManager) prefixExists(prefix []byte) bool {
     611           1 :         _, exists := k.globalKeyPrefixesMap[string(prefix)]
     612           1 :         return exists
     613           1 : }
     614             : 
     615             : // eligibleSingleDeleteKeys returns a slice of keys that can be safely single
     616             : // deleted, given the writer id. Restricting single delete keys through this
     617             : // method is used to ensure the OLW1 guarantee (see the keyManager comment) for
     618             : // the provided object ID.
     619           1 : func (k *keyManager) eligibleSingleDeleteKeys(o objID) (keys [][]byte) {
     620           1 :         // Creating a slice of keys is wasteful given that the caller will pick one,
     621           1 :         // but makes it simpler for unit testing.
     622           1 :         objKeys := k.objKeyMeta(o)
     623           1 :         for _, key := range k.globalKeys {
     624           1 :                 meta, ok := objKeys.keys[string(key)]
     625           1 :                 if !ok {
     626           1 :                         keys = append(keys, key)
     627           1 :                         continue
     628             :                 }
     629             :                 // Examine the history within this object.
     630           1 :                 if meta.history.canSingleDelete() {
     631           1 :                         keys = append(keys, key)
     632           1 :                 }
     633             :         }
     634           1 :         return keys
     635             : }
     636             : 
     637             : // a keyHistoryItem describes an individual operation performed on a key.
     638             : type keyHistoryItem struct {
     639             :         // opType may be writerSet, writerDelete, writerSingleDelete,
     640             :         // writerDeleteRange or writerMerge only. No other opTypes may appear here.
     641             :         opType        OpType
     642             :         metaTimestamp int
     643             : }
     644             : 
     645             : // keyHistory captures the history of mutations to a key in chronological order.
     646             : type keyHistory []keyHistoryItem
     647             : 
     648             : // before returns the subslice of the key history that happened strictly before
     649             : // the provided meta timestamp.
     650           0 : func (h keyHistory) before(metaTimestamp int) keyHistory {
     651           0 :         i, _ := slices.BinarySearchFunc(h, metaTimestamp, func(a keyHistoryItem, ts int) int {
     652           0 :                 return cmp.Compare(a.metaTimestamp, ts)
     653           0 :         })
     654           0 :         return h[:i]
     655             : }
     656             : 
     657             : // canSingleDelete examines the tail of the history and returns true if a single
     658             : // delete appended to this history would satisfy the single delete invariants.
     659           1 : func (h keyHistory) canSingleDelete() bool {
     660           1 :         if len(h) == 0 {
     661           1 :                 return true
     662           1 :         }
     663           1 :         switch o := h[len(h)-1].opType; o {
     664           1 :         case OpWriterDelete, OpWriterDeleteRange, OpWriterSingleDelete:
     665           1 :                 return true
     666           1 :         case OpWriterSet, OpWriterMerge:
     667           1 :                 if len(h) == 1 {
     668           1 :                         return true
     669           1 :                 }
     670           1 :                 return h[len(h)-2].opType.isDelete()
     671           0 :         default:
     672           0 :                 panic(errors.AssertionFailedf("unexpected writer op %v", o))
     673             :         }
     674             : }
     675             : 
     676           0 : func (h keyHistory) String() string {
     677           0 :         var sb strings.Builder
     678           0 :         for i, it := range h {
     679           0 :                 if i > 0 {
     680           0 :                         fmt.Fprint(&sb, ", ")
     681           0 :                 }
     682           0 :                 switch it.opType {
     683           0 :                 case OpWriterDelete:
     684           0 :                         fmt.Fprint(&sb, "del")
     685           0 :                 case OpWriterDeleteRange:
     686           0 :                         fmt.Fprint(&sb, "delrange")
     687           0 :                 case OpWriterSingleDelete:
     688           0 :                         fmt.Fprint(&sb, "singledel")
     689           0 :                 case OpWriterSet:
     690           0 :                         fmt.Fprint(&sb, "set")
     691           0 :                 case OpWriterMerge:
     692           0 :                         fmt.Fprint(&sb, "merge")
     693           0 :                 default:
     694           0 :                         fmt.Fprintf(&sb, "optype[v=%d]", it.opType)
     695             :                 }
     696           0 :                 fmt.Fprintf(&sb, "(%d)", it.metaTimestamp)
     697             :         }
     698           0 :         return sb.String()
     699             : }
     700             : 
     701             : // hasVisibleKey examines the tail of the history and returns true if the
     702             : // history should end in a visible value for this key.
     703           0 : func (h keyHistory) hasVisibleValue() bool {
     704           0 :         if len(h) == 0 {
     705           0 :                 return false
     706           0 :         }
     707           0 :         return !h[len(h)-1].opType.isDelete()
     708             : }
     709             : 
     710             : // collapsed returns a new key history that's equivalent to the history created
     711             : // by an ingestOp that "collapses" a batch's keys. See ingestOp.build.
     712           1 : func (h keyHistory) collapsed() keyHistory {
     713           1 :         var ret keyHistory
     714           1 :         // When collapsing a batch, any range deletes are semantically applied
     715           1 :         // first. Look for any range deletes and apply them.
     716           1 :         for _, op := range h {
     717           1 :                 if op.opType == OpWriterDeleteRange {
     718           1 :                         ret = append(ret, op)
     719           1 :                         break
     720             :                 }
     721             :         }
     722             :         // Among point keys, the most recently written key wins.
     723           1 :         for i := len(h) - 1; i >= 0; i-- {
     724           1 :                 if h[i].opType != OpWriterDeleteRange {
     725           1 :                         ret = append(ret, h[i])
     726           1 :                         break
     727             :                 }
     728             :         }
     729           1 :         return ret
     730             : }
     731             : 
     732           1 : func opWrittenKeys(untypedOp op) [][]byte {
     733           1 :         switch t := untypedOp.(type) {
     734           1 :         case *applyOp:
     735           1 :         case *batchCommitOp:
     736           1 :         case *checkpointOp:
     737           1 :         case *closeOp:
     738           1 :         case *compactOp:
     739           1 :         case *dbRestartOp:
     740           1 :         case *deleteOp:
     741           1 :                 return [][]byte{t.key}
     742           1 :         case *deleteRangeOp:
     743           1 :                 return [][]byte{t.start, t.end}
     744           1 :         case *flushOp:
     745           1 :         case *getOp:
     746           1 :         case *ingestOp:
     747           1 :         case *initOp:
     748           1 :         case *iterFirstOp:
     749           1 :         case *iterLastOp:
     750           1 :         case *iterNextOp:
     751           1 :         case *iterNextPrefixOp:
     752           1 :         case *iterCanSingleDelOp:
     753           1 :         case *iterPrevOp:
     754           1 :         case *iterSeekGEOp:
     755           1 :         case *iterSeekLTOp:
     756           1 :         case *iterSeekPrefixGEOp:
     757           1 :         case *iterSetBoundsOp:
     758           1 :         case *iterSetOptionsOp:
     759           1 :         case *mergeOp:
     760           1 :                 return [][]byte{t.key}
     761           1 :         case *newBatchOp:
     762           1 :         case *newIndexedBatchOp:
     763           1 :         case *newIterOp:
     764           1 :         case *newIterUsingCloneOp:
     765           1 :         case *newSnapshotOp:
     766           1 :         case *rangeKeyDeleteOp:
     767           1 :         case *rangeKeySetOp:
     768           1 :         case *rangeKeyUnsetOp:
     769           1 :         case *setOp:
     770           1 :                 return [][]byte{t.key}
     771           1 :         case *singleDeleteOp:
     772           1 :                 return [][]byte{t.key}
     773           1 :         case *replicateOp:
     774           1 :                 return [][]byte{t.start, t.end}
     775             :         }
     776           1 :         return nil
     777             : }
     778             : 
     779           1 : func loadPrecedingKeys(t TestingT, ops []op, cfg *OpConfig, m *keyManager) {
     780           1 :         for _, op := range ops {
     781           1 :                 // Pretend we're generating all the operation's keys as potential new
     782           1 :                 // key, so that we update the key manager's keys and prefix sets.
     783           1 :                 for _, k := range opWrittenKeys(op) {
     784           1 :                         m.addNewKey(k)
     785           1 : 
     786           1 :                         // If the key has a suffix, ratchet up the suffix distribution if
     787           1 :                         // necessary.
     788           1 :                         if s := m.comparer.Split(k); s < len(k) {
     789           1 :                                 suffix, err := testkeys.ParseSuffix(k[s:])
     790           1 :                                 require.NoError(t, err)
     791           1 :                                 if uint64(suffix) > cfg.writeSuffixDist.Max() {
     792           1 :                                         diff := int(uint64(suffix) - cfg.writeSuffixDist.Max())
     793           1 :                                         cfg.writeSuffixDist.IncMax(diff)
     794           1 :                                 }
     795             :                         }
     796             :                 }
     797             : 
     798             :                 // Update key tracking state.
     799           1 :                 m.update(op)
     800             :         }
     801             : }
     802             : 
     803           1 : func insertSorted(cmp base.Compare, dst *[][]byte, k []byte) {
     804           1 :         s := *dst
     805           1 :         i, _ := slices.BinarySearchFunc(s, k, cmp)
     806           1 :         *dst = slices.Insert(s, i, k)
     807           1 : }

Generated by: LCOV version 1.14