LCOV - code coverage report
Current view: top level - pebble - commit.go (source / functions) Hit Total Coverage
Test: 2024-03-14 08:16Z e6d0a420 - tests only.lcov Lines: 240 266 90.2 %
Date: 2024-03-14 08:16:28 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "runtime"
       9             :         "sync"
      10             :         "sync/atomic"
      11             :         "time"
      12             : 
      13             :         "github.com/cockroachdb/pebble/batchrepr"
      14             :         "github.com/cockroachdb/pebble/record"
      15             : )
      16             : 
      17             : // commitQueue is a lock-free fixed-size single-producer, multi-consumer
      18             : // queue. The single producer can enqueue (push) to the head, and consumers can
      19             : // dequeue (pop) from the tail.
      20             : //
      21             : // It has the added feature that it nils out unused slots to avoid unnecessary
      22             : // retention of objects.
      23             : type commitQueue struct {
      24             :         // headTail packs together a 32-bit head index and a 32-bit tail index. Both
      25             :         // are indexes into slots modulo len(slots)-1.
      26             :         //
      27             :         // tail = index of oldest data in queue
      28             :         // head = index of next slot to fill
      29             :         //
      30             :         // Slots in the range [tail, head) are owned by consumers.  A consumer
      31             :         // continues to own a slot outside this range until it nils the slot, at
      32             :         // which point ownership passes to the producer.
      33             :         //
      34             :         // The head index is stored in the most-significant bits so that we can
      35             :         // atomically add to it and the overflow is harmless.
      36             :         headTail atomic.Uint64
      37             : 
      38             :         // slots is a ring buffer of values stored in this queue. The size must be a
      39             :         // power of 2. A slot is in use until *both* the tail index has moved beyond
      40             :         // it and the slot value has been set to nil. The slot value is set to nil
      41             :         // atomically by the consumer and read atomically by the producer.
      42             :         slots [record.SyncConcurrency]atomic.Pointer[Batch]
      43             : }
      44             : 
      45             : const dequeueBits = 32
      46             : 
      47           1 : func (q *commitQueue) unpack(ptrs uint64) (head, tail uint32) {
      48           1 :         const mask = 1<<dequeueBits - 1
      49           1 :         head = uint32((ptrs >> dequeueBits) & mask)
      50           1 :         tail = uint32(ptrs & mask)
      51           1 :         return
      52           1 : }
      53             : 
      54           1 : func (q *commitQueue) pack(head, tail uint32) uint64 {
      55           1 :         const mask = 1<<dequeueBits - 1
      56           1 :         return (uint64(head) << dequeueBits) |
      57           1 :                 uint64(tail&mask)
      58           1 : }
      59             : 
      60           1 : func (q *commitQueue) enqueue(b *Batch) {
      61           1 :         ptrs := q.headTail.Load()
      62           1 :         head, tail := q.unpack(ptrs)
      63           1 :         if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
      64           0 :                 // Queue is full. This should never be reached because commitPipeline.commitQueueSem
      65           0 :                 // limits the number of concurrent operations.
      66           0 :                 panic("pebble: not reached")
      67             :         }
      68           1 :         slot := &q.slots[head&uint32(len(q.slots)-1)]
      69           1 : 
      70           1 :         // Check if the head slot has been released by dequeueApplied.
      71           1 :         for slot.Load() != nil {
      72           0 :                 // Another goroutine is still cleaning up the tail, so the queue is
      73           0 :                 // actually still full. We spin because this should resolve itself
      74           0 :                 // momentarily.
      75           0 :                 runtime.Gosched()
      76           0 :         }
      77             : 
      78             :         // The head slot is free, so we own it.
      79           1 :         slot.Store(b)
      80           1 : 
      81           1 :         // Increment head. This passes ownership of slot to dequeueApplied and acts as a
      82           1 :         // store barrier for writing the slot.
      83           1 :         q.headTail.Add(1 << dequeueBits)
      84             : }
      85             : 
      86             : // dequeueApplied removes the earliest enqueued Batch, if it is applied.
      87             : //
      88             : // Returns nil if the commit queue is empty or the earliest Batch is not yet
      89             : // applied.
      90           1 : func (q *commitQueue) dequeueApplied() *Batch {
      91           1 :         for {
      92           1 :                 ptrs := q.headTail.Load()
      93           1 :                 head, tail := q.unpack(ptrs)
      94           1 :                 if tail == head {
      95           1 :                         // Queue is empty.
      96           1 :                         return nil
      97           1 :                 }
      98             : 
      99           1 :                 slot := &q.slots[tail&uint32(len(q.slots)-1)]
     100           1 :                 b := slot.Load()
     101           1 :                 if b == nil || !b.applied.Load() {
     102           1 :                         // The batch is not ready to be dequeued, or another goroutine has
     103           1 :                         // already dequeued it.
     104           1 :                         return nil
     105           1 :                 }
     106             : 
     107             :                 // Confirm head and tail (for our speculative check above) and increment
     108             :                 // tail. If this succeeds, then we own the slot at tail.
     109           1 :                 ptrs2 := q.pack(head, tail+1)
     110           1 :                 if q.headTail.CompareAndSwap(ptrs, ptrs2) {
     111           1 :                         // We now own slot.
     112           1 :                         //
     113           1 :                         // Tell enqueue that we're done with this slot. Zeroing the slot is also
     114           1 :                         // important so we don't leave behind references that could keep this object
     115           1 :                         // live longer than necessary.
     116           1 :                         slot.Store(nil)
     117           1 :                         // At this point enqueue owns the slot.
     118           1 :                         return b
     119           1 :                 }
     120             :         }
     121             : }
     122             : 
     123             : // commitEnv contains the environment that a commitPipeline interacts
     124             : // with. This allows fine-grained testing of commitPipeline behavior without
     125             : // construction of an entire DB.
     126             : type commitEnv struct {
     127             :         // The next sequence number to give to a batch. Protected by
     128             :         // commitPipeline.mu.
     129             :         logSeqNum *atomic.Uint64
     130             :         // The visible sequence number at which reads should be performed. Ratcheted
     131             :         // upwards atomically as batches are applied to the memtable.
     132             :         visibleSeqNum *atomic.Uint64
     133             : 
     134             :         // Apply the batch to the specified memtable. Called concurrently.
     135             :         apply func(b *Batch, mem *memTable) error
     136             :         // Write the batch to the WAL. If wg != nil, the data will be persisted
     137             :         // asynchronously and done will be called on wg upon completion. If wg != nil
     138             :         // and err != nil, a failure to persist the WAL will populate *err. Returns
     139             :         // the memtable the batch should be applied to. Serial execution enforced by
     140             :         // commitPipeline.mu.
     141             :         write func(b *Batch, wg *sync.WaitGroup, err *error) (*memTable, error)
     142             : }
     143             : 
     144             : // A commitPipeline manages the stages of committing a set of mutations
     145             : // (contained in a single Batch) atomically to the DB. The steps are
     146             : // conceptually:
     147             : //
     148             : //  1. Write the batch to the WAL and optionally sync the WAL
     149             : //  2. Apply the mutations in the batch to the memtable
     150             : //
     151             : // These two simple steps are made complicated by the desire for high
     152             : // performance. In the absence of concurrency, performance is limited by how
     153             : // fast a batch can be written (and synced) to the WAL and then added to the
     154             : // memtable, both of which are outside the purview of the commit
     155             : // pipeline. Performance under concurrency is the primary concern of the commit
     156             : // pipeline, though it also needs to maintain two invariants:
     157             : //
     158             : //  1. Batches need to be written to the WAL in sequence number order.
     159             : //  2. Batches need to be made visible for reads in sequence number order. This
     160             : //     invariant arises from the use of a single sequence number which
     161             : //     indicates which mutations are visible.
     162             : //
     163             : // Taking these invariants into account, let's revisit the work the commit
     164             : // pipeline needs to perform. Writing the batch to the WAL is necessarily
     165             : // serialized as there is a single WAL object. The order of the entries in the
     166             : // WAL defines the sequence number order. Note that writing to the WAL is
     167             : // extremely fast, usually just a memory copy. Applying the mutations in a
     168             : // batch to the memtable can occur concurrently as the underlying skiplist
     169             : // supports concurrent insertions. Publishing the visible sequence number is
     170             : // another serialization point, but one with a twist: the visible sequence
     171             : // number cannot be bumped until the mutations for earlier batches have
     172             : // finished applying to the memtable (the visible sequence number only ratchets
     173             : // up). Lastly, if requested, the commit waits for the WAL to sync. Note that
     174             : // waiting for the WAL sync after ratcheting the visible sequence number allows
     175             : // another goroutine to read committed data before the WAL has synced. This is
     176             : // similar behavior to RocksDB's manual WAL flush functionality. Application
     177             : // code needs to protect against this if necessary.
     178             : //
     179             : // The full outline of the commit pipeline operation is as follows:
     180             : //
     181             : //      with commitPipeline mutex locked:
     182             : //        assign batch sequence number
     183             : //        write batch to WAL
     184             : //      (optionally) add batch to WAL sync list
     185             : //      apply batch to memtable (concurrently)
     186             : //      wait for earlier batches to apply
     187             : //      ratchet read sequence number
     188             : //      (optionally) wait for the WAL to sync
     189             : //
     190             : // As soon as a batch has been written to the WAL, the commitPipeline mutex is
     191             : // released allowing another batch to write to the WAL. Each commit operation
     192             : // individually applies its batch to the memtable providing concurrency. The
     193             : // WAL sync happens concurrently with applying to the memtable (see
     194             : // commitPipeline.syncLoop).
     195             : //
     196             : // The "waits for earlier batches to apply" work is more complicated than might
     197             : // be expected. The obvious approach would be to keep a queue of pending
     198             : // batches and for each batch to wait for the previous batch to finish
     199             : // committing. This approach was tried initially and turned out to be too
     200             : // slow. The problem is that it causes excessive goroutine activity as each
     201             : // committing goroutine needs to wake up in order for the next goroutine to be
     202             : // unblocked. The approach taken in the current code is conceptually similar,
     203             : // though it avoids waking a goroutine to perform work that another goroutine
     204             : // can perform. A commitQueue (a single-producer, multiple-consumer queue)
     205             : // holds the ordered list of committing batches. Addition to the queue is done
     206             : // while holding commitPipeline.mutex ensuring the same ordering of batches in
     207             : // the queue as the ordering in the WAL. When a batch finishes applying to the
     208             : // memtable, it atomically updates its Batch.applied field. Ratcheting of the
     209             : // visible sequence number is done by commitPipeline.publish which loops
     210             : // dequeueing "applied" batches and ratcheting the visible sequence number. If
     211             : // we hit an unapplied batch at the head of the queue we can block as we know
     212             : // that committing of that unapplied batch will eventually find our (applied)
     213             : // batch in the queue. See commitPipeline.publish for additional commentary.
     214             : type commitPipeline struct {
     215             :         // WARNING: The following struct `commitQueue` contains fields which will
     216             :         // be accessed atomically.
     217             :         //
     218             :         // Go allocations are guaranteed to be 64-bit aligned which we take advantage
     219             :         // of by placing the 64-bit fields which we access atomically at the beginning
     220             :         // of the commitPipeline struct.
     221             :         // For more information, see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
     222             :         // Queue of pending batches to commit.
     223             :         pending commitQueue
     224             :         env     commitEnv
     225             :         // The commit path has two queues:
     226             :         // - commitPipeline.pending contains batches whose seqnums have not yet been
     227             :         //   published. It is a lock-free single producer multi consumer queue.
     228             :         // - LogWriter.flusher.syncQ contains state for batches that have asked for
     229             :         //   a sync. It is a lock-free single producer single consumer queue.
     230             :         // These lock-free queues have a fixed capacity. And since they are
     231             :         // lock-free, we cannot do blocking waits when pushing onto these queues, in
     232             :         // case they are full. Additionally, adding to these queues happens while
     233             :         // holding commitPipeline.mu, and we don't want to block while holding that
     234             :         // mutex since it is also needed by other code.
     235             :         //
     236             :         // Popping from these queues is independent and for a particular batch can
     237             :         // occur in either order, though it is more common that popping from the
     238             :         // commitPipeline.pending will happen first.
     239             :         //
     240             :         // Due to these constraints, we reserve a unit of space in each queue before
     241             :         // acquiring commitPipeline.mu, which also ensures that the push operation
     242             :         // is guaranteed to have space in the queue. The commitQueueSem and
     243             :         // logSyncQSem are used for this reservation.
     244             :         commitQueueSem chan struct{}
     245             :         logSyncQSem    chan struct{}
     246             :         ingestSem      chan struct{}
     247             :         // The mutex to use for synchronizing access to logSeqNum and serializing
     248             :         // calls to commitEnv.write().
     249             :         mu sync.Mutex
     250             : }
     251             : 
     252           1 : func newCommitPipeline(env commitEnv) *commitPipeline {
     253           1 :         p := &commitPipeline{
     254           1 :                 env: env,
     255           1 :                 // The capacity of both commitQueue.slots and syncQueue.slots is set to
     256           1 :                 // record.SyncConcurrency, which also determines the value of these
     257           1 :                 // semaphores. We used to have a single semaphore, which required that the
     258           1 :                 // capacity of these queues be the same. Now that we have two semaphores,
     259           1 :                 // the capacity of these queues could be changed to be different. Say half
     260           1 :                 // of the batches asked to be synced, but syncing took 5x the latency of
     261           1 :                 // adding to the memtable and publishing. Then syncQueue.slots could be
     262           1 :                 // sized as 0.5*5 of the commitQueue.slots. We can explore this if we find
     263           1 :                 // that LogWriterMetrics.SyncQueueLen has high utilization under some
     264           1 :                 // workloads.
     265           1 :                 //
     266           1 :                 // NB: the commit concurrency is one less than SyncConcurrency because we
     267           1 :                 // have to allow one "slot" for a concurrent WAL rotation which will close
     268           1 :                 // and sync the WAL.
     269           1 :                 commitQueueSem: make(chan struct{}, record.SyncConcurrency-1),
     270           1 :                 logSyncQSem:    make(chan struct{}, record.SyncConcurrency-1),
     271           1 :                 ingestSem:      make(chan struct{}, 1),
     272           1 :         }
     273           1 :         return p
     274           1 : }
     275             : 
     276             : // directWrite is used to directly write to the WAL. commitPipeline.mu must be
     277             : // held while this is called. DB.mu must not be held. directWrite will only
     278             : // return once the WAL sync is complete. Note that DirectWrite is a special case
     279             : // function which is currently only used when ingesting sstables as a flushable.
     280             : // Reason carefully about the correctness argument when calling this function
     281             : // from any context.
     282           1 : func (p *commitPipeline) directWrite(b *Batch) error {
     283           1 :         var syncWG sync.WaitGroup
     284           1 :         var syncErr error
     285           1 :         syncWG.Add(1)
     286           1 :         p.logSyncQSem <- struct{}{}
     287           1 :         _, err := p.env.write(b, &syncWG, &syncErr)
     288           1 :         syncWG.Wait()
     289           1 :         err = firstError(err, syncErr)
     290           1 :         return err
     291           1 : }
     292             : 
     293             : // Commit the specified batch, writing it to the WAL, optionally syncing the
     294             : // WAL, and applying the batch to the memtable. Upon successful return the
     295             : // batch's mutations will be visible for reading.
     296             : // REQUIRES: noSyncWait => syncWAL
     297           1 : func (p *commitPipeline) Commit(b *Batch, syncWAL bool, noSyncWait bool) error {
     298           1 :         if b.Empty() {
     299           1 :                 return nil
     300           1 :         }
     301             : 
     302           1 :         commitStartTime := time.Now()
     303           1 :         // Acquire semaphores.
     304           1 :         p.commitQueueSem <- struct{}{}
     305           1 :         if syncWAL {
     306           1 :                 p.logSyncQSem <- struct{}{}
     307           1 :         }
     308           1 :         b.commitStats.SemaphoreWaitDuration = time.Since(commitStartTime)
     309           1 : 
     310           1 :         // Prepare the batch for committing: enqueuing the batch in the pending
     311           1 :         // queue, determining the batch sequence number and writing the data to the
     312           1 :         // WAL.
     313           1 :         //
     314           1 :         // NB: We set Batch.commitErr on error so that the batch won't be a candidate
     315           1 :         // for reuse. See Batch.release().
     316           1 :         mem, err := p.prepare(b, syncWAL, noSyncWait)
     317           1 :         if err != nil {
     318           0 :                 b.db = nil // prevent batch reuse on error
     319           0 :                 // NB: we are not doing <-p.commitQueueSem since the batch is still
     320           0 :                 // sitting in the pending queue. We should consider fixing this by also
     321           0 :                 // removing the batch from the pending queue.
     322           0 :                 return err
     323           0 :         }
     324             : 
     325             :         // Apply the batch to the memtable.
     326           1 :         if err := p.env.apply(b, mem); err != nil {
     327           0 :                 b.db = nil // prevent batch reuse on error
     328           0 :                 // NB: we are not doing <-p.commitQueueSem since the batch is still
     329           0 :                 // sitting in the pending queue. We should consider fixing this by also
     330           0 :                 // removing the batch from the pending queue.
     331           0 :                 return err
     332           0 :         }
     333             : 
     334             :         // Publish the batch sequence number.
     335           1 :         p.publish(b)
     336           1 : 
     337           1 :         <-p.commitQueueSem
     338           1 : 
     339           1 :         if !noSyncWait {
     340           1 :                 // Already waited for commit, so look at the error.
     341           1 :                 if b.commitErr != nil {
     342           0 :                         b.db = nil // prevent batch reuse on error
     343           0 :                         err = b.commitErr
     344           0 :                 }
     345             :         }
     346             :         // Else noSyncWait. The LogWriter can be concurrently writing to
     347             :         // b.commitErr. We will read b.commitErr in Batch.SyncWait after the
     348             :         // LogWriter is done writing.
     349             : 
     350           1 :         b.commitStats.TotalDuration = time.Since(commitStartTime)
     351           1 : 
     352           1 :         return err
     353             : }
     354             : 
     355             : // AllocateSeqNum allocates count sequence numbers, invokes the prepare
     356             : // callback, then the apply callback, and then publishes the sequence
     357             : // numbers. AllocateSeqNum does not write to the WAL or add entries to the
     358             : // memtable. AllocateSeqNum can be used to sequence an operation such as
     359             : // sstable ingestion within the commit pipeline. The prepare callback is
     360             : // invoked with commitPipeline.mu held, but note that DB.mu is not held and
     361             : // must be locked if necessary.
     362             : func (p *commitPipeline) AllocateSeqNum(
     363             :         count int, prepare func(seqNum uint64), apply func(seqNum uint64),
     364           1 : ) {
     365           1 :         // This method is similar to Commit and prepare. Be careful about trying to
     366           1 :         // share additional code with those methods because Commit and prepare are
     367           1 :         // performance critical code paths.
     368           1 : 
     369           1 :         b := newBatch(nil)
     370           1 :         defer b.Close()
     371           1 : 
     372           1 :         // Give the batch a count of 1 so that the log and visible sequence number
     373           1 :         // are incremented correctly.
     374           1 :         b.data = make([]byte, batchrepr.HeaderLen)
     375           1 :         b.setCount(uint32(count))
     376           1 :         b.commit.Add(1)
     377           1 : 
     378           1 :         p.commitQueueSem <- struct{}{}
     379           1 : 
     380           1 :         p.mu.Lock()
     381           1 : 
     382           1 :         // Enqueue the batch in the pending queue. Note that while the pending queue
     383           1 :         // is lock-free, we want the order of batches to be the same as the sequence
     384           1 :         // number order.
     385           1 :         p.pending.enqueue(b)
     386           1 : 
     387           1 :         // Assign the batch a sequence number. Note that we use atomic operations
     388           1 :         // here to handle concurrent reads of logSeqNum. commitPipeline.mu provides
     389           1 :         // mutual exclusion for other goroutines writing to logSeqNum.
     390           1 :         logSeqNum := p.env.logSeqNum.Add(uint64(count)) - uint64(count)
     391           1 :         seqNum := logSeqNum
     392           1 :         if seqNum == 0 {
     393           1 :                 // We can't use the value 0 for the global seqnum during ingestion, because
     394           1 :                 // 0 indicates no global seqnum. So allocate one more seqnum.
     395           1 :                 p.env.logSeqNum.Add(1)
     396           1 :                 seqNum++
     397           1 :         }
     398           1 :         b.setSeqNum(seqNum)
     399           1 : 
     400           1 :         // Wait for any outstanding writes to the memtable to complete. This is
     401           1 :         // necessary for ingestion so that the check for memtable overlap can see any
     402           1 :         // writes that were sequenced before the ingestion. The spin loop is
     403           1 :         // unfortunate, but obviates the need for additional synchronization.
     404           1 :         for {
     405           1 :                 visibleSeqNum := p.env.visibleSeqNum.Load()
     406           1 :                 if visibleSeqNum == logSeqNum {
     407           1 :                         break
     408             :                 }
     409           1 :                 runtime.Gosched()
     410             :         }
     411             : 
     412             :         // Invoke the prepare callback. Note the lack of error reporting. Even if the
     413             :         // callback internally fails, the sequence number needs to be published in
     414             :         // order to allow the commit pipeline to proceed.
     415           1 :         prepare(b.SeqNum())
     416           1 : 
     417           1 :         p.mu.Unlock()
     418           1 : 
     419           1 :         // Invoke the apply callback.
     420           1 :         apply(b.SeqNum())
     421           1 : 
     422           1 :         // Publish the sequence number.
     423           1 :         p.publish(b)
     424           1 : 
     425           1 :         <-p.commitQueueSem
     426             : }
     427             : 
     428           1 : func (p *commitPipeline) prepare(b *Batch, syncWAL bool, noSyncWait bool) (*memTable, error) {
     429           1 :         n := uint64(b.Count())
     430           1 :         if n == invalidBatchCount {
     431           0 :                 return nil, ErrInvalidBatch
     432           0 :         }
     433           1 :         var syncWG *sync.WaitGroup
     434           1 :         var syncErr *error
     435           1 :         switch {
     436           1 :         case !syncWAL:
     437           1 :                 // Only need to wait for the publish.
     438           1 :                 b.commit.Add(1)
     439             :         // Remaining cases represent syncWAL=true.
     440           1 :         case noSyncWait:
     441           1 :                 syncErr = &b.commitErr
     442           1 :                 syncWG = &b.fsyncWait
     443           1 :                 // Only need to wait synchronously for the publish. The user will
     444           1 :                 // (asynchronously) wait on the batch's fsyncWait.
     445           1 :                 b.commit.Add(1)
     446           1 :                 b.fsyncWait.Add(1)
     447           1 :         case !noSyncWait:
     448           1 :                 syncErr = &b.commitErr
     449           1 :                 syncWG = &b.commit
     450           1 :                 // Must wait for both the publish and the WAL fsync.
     451           1 :                 b.commit.Add(2)
     452             :         }
     453             : 
     454           1 :         p.mu.Lock()
     455           1 : 
     456           1 :         // Enqueue the batch in the pending queue. Note that while the pending queue
     457           1 :         // is lock-free, we want the order of batches to be the same as the sequence
     458           1 :         // number order.
     459           1 :         p.pending.enqueue(b)
     460           1 : 
     461           1 :         // Assign the batch a sequence number. Note that we use atomic operations
     462           1 :         // here to handle concurrent reads of logSeqNum. commitPipeline.mu provides
     463           1 :         // mutual exclusion for other goroutines writing to logSeqNum.
     464           1 :         b.setSeqNum(p.env.logSeqNum.Add(n) - n)
     465           1 : 
     466           1 :         // Write the data to the WAL.
     467           1 :         mem, err := p.env.write(b, syncWG, syncErr)
     468           1 : 
     469           1 :         p.mu.Unlock()
     470           1 : 
     471           1 :         return mem, err
     472             : }
     473             : 
     474           1 : func (p *commitPipeline) publish(b *Batch) {
     475           1 :         // Mark the batch as applied.
     476           1 :         b.applied.Store(true)
     477           1 : 
     478           1 :         // Loop dequeuing applied batches from the pending queue. If our batch was
     479           1 :         // the head of the pending queue we are guaranteed that either we'll publish
     480           1 :         // it or someone else will dequeueApplied and publish it. If our batch is not the
     481           1 :         // head of the queue then either we'll dequeueApplied applied batches and reach our
     482           1 :         // batch or there is an unapplied batch blocking us. When that unapplied
     483           1 :         // batch applies it will go through the same process and publish our batch
     484           1 :         // for us.
     485           1 :         for {
     486           1 :                 t := p.pending.dequeueApplied()
     487           1 :                 if t == nil {
     488           1 :                         // Wait for another goroutine to publish us. We might also be waiting for
     489           1 :                         // the WAL sync to finish.
     490           1 :                         now := time.Now()
     491           1 :                         b.commit.Wait()
     492           1 :                         b.commitStats.CommitWaitDuration += time.Since(now)
     493           1 :                         break
     494             :                 }
     495           1 :                 if !t.applied.Load() {
     496           0 :                         panic("not reached")
     497             :                 }
     498             : 
     499             :                 // We're responsible for publishing the sequence number for batch t, but
     500             :                 // another concurrent goroutine might sneak in and publish the sequence
     501             :                 // number for a subsequent batch. That's ok as all we're guaranteeing is
     502             :                 // that the sequence number ratchets up.
     503           1 :                 for {
     504           1 :                         curSeqNum := p.env.visibleSeqNum.Load()
     505           1 :                         newSeqNum := t.SeqNum() + uint64(t.Count())
     506           1 :                         if newSeqNum <= curSeqNum {
     507           1 :                                 // t's sequence number has already been published.
     508           1 :                                 break
     509             :                         }
     510           1 :                         if p.env.visibleSeqNum.CompareAndSwap(curSeqNum, newSeqNum) {
     511           1 :                                 // We successfully published t's sequence number.
     512           1 :                                 break
     513             :                         }
     514             :                 }
     515             : 
     516           1 :                 t.commit.Done()
     517             :         }
     518             : }

Generated by: LCOV version 1.14