LCOV - code coverage report
Current view: top level - pebble - commit.go (source / functions) Coverage Total Hit
Test: 2025-01-31 08:17Z 906e431e - tests only.lcov Lines: 90.2 % 266 240
Test Date: 2025-01-31 08:17:45 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "runtime"
       9              :         "sync"
      10              :         "sync/atomic"
      11              : 
      12              :         "github.com/cockroachdb/crlib/crtime"
      13              :         "github.com/cockroachdb/pebble/batchrepr"
      14              :         "github.com/cockroachdb/pebble/internal/base"
      15              :         "github.com/cockroachdb/pebble/record"
      16              : )
      17              : 
      18              : // commitQueue is a lock-free fixed-size single-producer, multi-consumer
      19              : // queue. The single producer can enqueue (push) to the head, and consumers can
      20              : // dequeue (pop) from the tail.
      21              : //
      22              : // It has the added feature that it nils out unused slots to avoid unnecessary
      23              : // retention of objects.
      24              : type commitQueue struct {
      25              :         // headTail packs together a 32-bit head index and a 32-bit tail index. Both
      26              :         // are indexes into slots modulo len(slots)-1.
      27              :         //
      28              :         // tail = index of oldest data in queue
      29              :         // head = index of next slot to fill
      30              :         //
      31              :         // Slots in the range [tail, head) are owned by consumers.  A consumer
      32              :         // continues to own a slot outside this range until it nils the slot, at
      33              :         // which point ownership passes to the producer.
      34              :         //
      35              :         // The head index is stored in the most-significant bits so that we can
      36              :         // atomically add to it and the overflow is harmless.
      37              :         headTail atomic.Uint64
      38              : 
      39              :         // slots is a ring buffer of values stored in this queue. The size must be a
      40              :         // power of 2. A slot is in use until *both* the tail index has moved beyond
      41              :         // it and the slot value has been set to nil. The slot value is set to nil
      42              :         // atomically by the consumer and read atomically by the producer.
      43              :         slots [record.SyncConcurrency]atomic.Pointer[Batch]
      44              : }
      45              : 
      46              : const dequeueBits = 32
      47              : 
      48            1 : func (q *commitQueue) unpack(ptrs uint64) (head, tail uint32) {
      49            1 :         const mask = 1<<dequeueBits - 1
      50            1 :         head = uint32((ptrs >> dequeueBits) & mask)
      51            1 :         tail = uint32(ptrs & mask)
      52            1 :         return
      53            1 : }
      54              : 
      55            1 : func (q *commitQueue) pack(head, tail uint32) uint64 {
      56            1 :         const mask = 1<<dequeueBits - 1
      57            1 :         return (uint64(head) << dequeueBits) |
      58            1 :                 uint64(tail&mask)
      59            1 : }
      60              : 
      61            1 : func (q *commitQueue) enqueue(b *Batch) {
      62            1 :         ptrs := q.headTail.Load()
      63            1 :         head, tail := q.unpack(ptrs)
      64            1 :         if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
      65            0 :                 // Queue is full. This should never be reached because commitPipeline.commitQueueSem
      66            0 :                 // limits the number of concurrent operations.
      67            0 :                 panic("pebble: not reached")
      68              :         }
      69            1 :         slot := &q.slots[head&uint32(len(q.slots)-1)]
      70            1 : 
      71            1 :         // Check if the head slot has been released by dequeueApplied.
      72            1 :         for slot.Load() != nil {
      73            0 :                 // Another goroutine is still cleaning up the tail, so the queue is
      74            0 :                 // actually still full. We spin because this should resolve itself
      75            0 :                 // momentarily.
      76            0 :                 runtime.Gosched()
      77            0 :         }
      78              : 
      79              :         // The head slot is free, so we own it.
      80            1 :         slot.Store(b)
      81            1 : 
      82            1 :         // Increment head. This passes ownership of slot to dequeueApplied and acts as a
      83            1 :         // store barrier for writing the slot.
      84            1 :         q.headTail.Add(1 << dequeueBits)
      85              : }
      86              : 
      87              : // dequeueApplied removes the earliest enqueued Batch, if it is applied.
      88              : //
      89              : // Returns nil if the commit queue is empty or the earliest Batch is not yet
      90              : // applied.
      91            1 : func (q *commitQueue) dequeueApplied() *Batch {
      92            1 :         for {
      93            1 :                 ptrs := q.headTail.Load()
      94            1 :                 head, tail := q.unpack(ptrs)
      95            1 :                 if tail == head {
      96            1 :                         // Queue is empty.
      97            1 :                         return nil
      98            1 :                 }
      99              : 
     100            1 :                 slot := &q.slots[tail&uint32(len(q.slots)-1)]
     101            1 :                 b := slot.Load()
     102            1 :                 if b == nil || !b.applied.Load() {
     103            1 :                         // The batch is not ready to be dequeued, or another goroutine has
     104            1 :                         // already dequeued it.
     105            1 :                         return nil
     106            1 :                 }
     107              : 
     108              :                 // Confirm head and tail (for our speculative check above) and increment
     109              :                 // tail. If this succeeds, then we own the slot at tail.
     110            1 :                 ptrs2 := q.pack(head, tail+1)
     111            1 :                 if q.headTail.CompareAndSwap(ptrs, ptrs2) {
     112            1 :                         // We now own slot.
     113            1 :                         //
     114            1 :                         // Tell enqueue that we're done with this slot. Zeroing the slot is also
     115            1 :                         // important so we don't leave behind references that could keep this object
     116            1 :                         // live longer than necessary.
     117            1 :                         slot.Store(nil)
     118            1 :                         // At this point enqueue owns the slot.
     119            1 :                         return b
     120            1 :                 }
     121              :         }
     122              : }
     123              : 
     124              : // commitEnv contains the environment that a commitPipeline interacts
     125              : // with. This allows fine-grained testing of commitPipeline behavior without
     126              : // construction of an entire DB.
     127              : type commitEnv struct {
     128              :         // The next sequence number to give to a batch. Protected by
     129              :         // commitPipeline.mu.
     130              :         logSeqNum *base.AtomicSeqNum
     131              :         // The visible sequence number at which reads should be performed. Ratcheted
     132              :         // upwards atomically as batches are applied to the memtable.
     133              :         visibleSeqNum *base.AtomicSeqNum
     134              : 
     135              :         // Apply the batch to the specified memtable. Called concurrently.
     136              :         apply func(b *Batch, mem *memTable) error
     137              :         // Write the batch to the WAL. If wg != nil, the data will be persisted
     138              :         // asynchronously and done will be called on wg upon completion. If wg != nil
     139              :         // and err != nil, a failure to persist the WAL will populate *err. Returns
     140              :         // the memtable the batch should be applied to. Serial execution enforced by
     141              :         // commitPipeline.mu.
     142              :         write func(b *Batch, wg *sync.WaitGroup, err *error) (*memTable, error)
     143              : }
     144              : 
     145              : // A commitPipeline manages the stages of committing a set of mutations
     146              : // (contained in a single Batch) atomically to the DB. The steps are
     147              : // conceptually:
     148              : //
     149              : //  1. Write the batch to the WAL and optionally sync the WAL
     150              : //  2. Apply the mutations in the batch to the memtable
     151              : //
     152              : // These two simple steps are made complicated by the desire for high
     153              : // performance. In the absence of concurrency, performance is limited by how
     154              : // fast a batch can be written (and synced) to the WAL and then added to the
     155              : // memtable, both of which are outside the purview of the commit
     156              : // pipeline. Performance under concurrency is the primary concern of the commit
     157              : // pipeline, though it also needs to maintain two invariants:
     158              : //
     159              : //  1. Batches need to be written to the WAL in sequence number order.
     160              : //  2. Batches need to be made visible for reads in sequence number order. This
     161              : //     invariant arises from the use of a single sequence number which
     162              : //     indicates which mutations are visible.
     163              : //
     164              : // Taking these invariants into account, let's revisit the work the commit
     165              : // pipeline needs to perform. Writing the batch to the WAL is necessarily
     166              : // serialized as there is a single WAL object. The order of the entries in the
     167              : // WAL defines the sequence number order. Note that writing to the WAL is
     168              : // extremely fast, usually just a memory copy. Applying the mutations in a
     169              : // batch to the memtable can occur concurrently as the underlying skiplist
     170              : // supports concurrent insertions. Publishing the visible sequence number is
     171              : // another serialization point, but one with a twist: the visible sequence
     172              : // number cannot be bumped until the mutations for earlier batches have
     173              : // finished applying to the memtable (the visible sequence number only ratchets
     174              : // up). Lastly, if requested, the commit waits for the WAL to sync. Note that
     175              : // waiting for the WAL sync after ratcheting the visible sequence number allows
     176              : // another goroutine to read committed data before the WAL has synced. This is
     177              : // similar behavior to RocksDB's manual WAL flush functionality. Application
     178              : // code needs to protect against this if necessary.
     179              : //
     180              : // The full outline of the commit pipeline operation is as follows:
     181              : //
     182              : //      with commitPipeline mutex locked:
     183              : //        assign batch sequence number
     184              : //        write batch to WAL
     185              : //      (optionally) add batch to WAL sync list
     186              : //      apply batch to memtable (concurrently)
     187              : //      wait for earlier batches to apply
     188              : //      ratchet read sequence number
     189              : //      (optionally) wait for the WAL to sync
     190              : //
     191              : // As soon as a batch has been written to the WAL, the commitPipeline mutex is
     192              : // released allowing another batch to write to the WAL. Each commit operation
     193              : // individually applies its batch to the memtable providing concurrency. The
     194              : // WAL sync happens concurrently with applying to the memtable (see
     195              : // commitPipeline.syncLoop).
     196              : //
     197              : // The "waits for earlier batches to apply" work is more complicated than might
     198              : // be expected. The obvious approach would be to keep a queue of pending
     199              : // batches and for each batch to wait for the previous batch to finish
     200              : // committing. This approach was tried initially and turned out to be too
     201              : // slow. The problem is that it causes excessive goroutine activity as each
     202              : // committing goroutine needs to wake up in order for the next goroutine to be
     203              : // unblocked. The approach taken in the current code is conceptually similar,
     204              : // though it avoids waking a goroutine to perform work that another goroutine
     205              : // can perform. A commitQueue (a single-producer, multiple-consumer queue)
     206              : // holds the ordered list of committing batches. Addition to the queue is done
     207              : // while holding commitPipeline.mutex ensuring the same ordering of batches in
     208              : // the queue as the ordering in the WAL. When a batch finishes applying to the
     209              : // memtable, it atomically updates its Batch.applied field. Ratcheting of the
     210              : // visible sequence number is done by commitPipeline.publish which loops
     211              : // dequeueing "applied" batches and ratcheting the visible sequence number. If
     212              : // we hit an unapplied batch at the head of the queue we can block as we know
     213              : // that committing of that unapplied batch will eventually find our (applied)
     214              : // batch in the queue. See commitPipeline.publish for additional commentary.
     215              : type commitPipeline struct {
     216              :         // WARNING: The following struct `commitQueue` contains fields which will
     217              :         // be accessed atomically.
     218              :         //
     219              :         // Go allocations are guaranteed to be 64-bit aligned which we take advantage
     220              :         // of by placing the 64-bit fields which we access atomically at the beginning
     221              :         // of the commitPipeline struct.
     222              :         // For more information, see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
     223              :         // Queue of pending batches to commit.
     224              :         pending commitQueue
     225              :         env     commitEnv
     226              :         // The commit path has two queues:
     227              :         // - commitPipeline.pending contains batches whose seqnums have not yet been
     228              :         //   published. It is a lock-free single producer multi consumer queue.
     229              :         // - LogWriter.flusher.syncQ contains state for batches that have asked for
     230              :         //   a sync. It is a lock-free single producer single consumer queue.
     231              :         // These lock-free queues have a fixed capacity. And since they are
     232              :         // lock-free, we cannot do blocking waits when pushing onto these queues, in
     233              :         // case they are full. Additionally, adding to these queues happens while
     234              :         // holding commitPipeline.mu, and we don't want to block while holding that
     235              :         // mutex since it is also needed by other code.
     236              :         //
     237              :         // Popping from these queues is independent and for a particular batch can
     238              :         // occur in either order, though it is more common that popping from the
     239              :         // commitPipeline.pending will happen first.
     240              :         //
     241              :         // Due to these constraints, we reserve a unit of space in each queue before
     242              :         // acquiring commitPipeline.mu, which also ensures that the push operation
     243              :         // is guaranteed to have space in the queue. The commitQueueSem and
     244              :         // logSyncQSem are used for this reservation.
     245              :         commitQueueSem chan struct{}
     246              :         logSyncQSem    chan struct{}
     247              :         ingestSem      chan struct{}
     248              :         // The mutex to use for synchronizing access to logSeqNum and serializing
     249              :         // calls to commitEnv.write().
     250              :         mu sync.Mutex
     251              : }
     252              : 
     253            1 : func newCommitPipeline(env commitEnv) *commitPipeline {
     254            1 :         p := &commitPipeline{
     255            1 :                 env: env,
     256            1 :                 // The capacity of both commitQueue.slots and syncQueue.slots is set to
     257            1 :                 // record.SyncConcurrency, which also determines the value of these
     258            1 :                 // semaphores. We used to have a single semaphore, which required that the
     259            1 :                 // capacity of these queues be the same. Now that we have two semaphores,
     260            1 :                 // the capacity of these queues could be changed to be different. Say half
     261            1 :                 // of the batches asked to be synced, but syncing took 5x the latency of
     262            1 :                 // adding to the memtable and publishing. Then syncQueue.slots could be
     263            1 :                 // sized as 0.5*5 of the commitQueue.slots. We can explore this if we find
     264            1 :                 // that LogWriterMetrics.SyncQueueLen has high utilization under some
     265            1 :                 // workloads.
     266            1 :                 //
     267            1 :                 // NB: the commit concurrency is one less than SyncConcurrency because we
     268            1 :                 // have to allow one "slot" for a concurrent WAL rotation which will close
     269            1 :                 // and sync the WAL.
     270            1 :                 commitQueueSem: make(chan struct{}, record.SyncConcurrency-1),
     271            1 :                 logSyncQSem:    make(chan struct{}, record.SyncConcurrency-1),
     272            1 :                 ingestSem:      make(chan struct{}, 1),
     273            1 :         }
     274            1 :         return p
     275            1 : }
     276              : 
     277              : // directWrite is used to directly write to the WAL. commitPipeline.mu must be
     278              : // held while this is called. DB.mu must not be held. directWrite will only
     279              : // return once the WAL sync is complete. Note that DirectWrite is a special case
     280              : // function which is currently only used when ingesting sstables as a flushable.
     281              : // Reason carefully about the correctness argument when calling this function
     282              : // from any context.
     283            1 : func (p *commitPipeline) directWrite(b *Batch) error {
     284            1 :         var syncWG sync.WaitGroup
     285            1 :         var syncErr error
     286            1 :         syncWG.Add(1)
     287            1 :         p.logSyncQSem <- struct{}{}
     288            1 :         _, err := p.env.write(b, &syncWG, &syncErr)
     289            1 :         syncWG.Wait()
     290            1 :         err = firstError(err, syncErr)
     291            1 :         return err
     292            1 : }
     293              : 
     294              : // Commit the specified batch, writing it to the WAL, optionally syncing the
     295              : // WAL, and applying the batch to the memtable. Upon successful return the
     296              : // batch's mutations will be visible for reading.
     297              : // REQUIRES: noSyncWait => syncWAL
     298            1 : func (p *commitPipeline) Commit(b *Batch, syncWAL bool, noSyncWait bool) error {
     299            1 :         if b.Empty() {
     300            1 :                 return nil
     301            1 :         }
     302              : 
     303            1 :         commitStartTime := crtime.NowMono()
     304            1 :         // Acquire semaphores.
     305            1 :         p.commitQueueSem <- struct{}{}
     306            1 :         if syncWAL {
     307            1 :                 p.logSyncQSem <- struct{}{}
     308            1 :         }
     309            1 :         b.commitStats.SemaphoreWaitDuration = commitStartTime.Elapsed()
     310            1 : 
     311            1 :         // Prepare the batch for committing: enqueuing the batch in the pending
     312            1 :         // queue, determining the batch sequence number and writing the data to the
     313            1 :         // WAL.
     314            1 :         //
     315            1 :         // NB: We set Batch.commitErr on error so that the batch won't be a candidate
     316            1 :         // for reuse. See Batch.release().
     317            1 :         mem, err := p.prepare(b, syncWAL, noSyncWait)
     318            1 :         if err != nil {
     319            0 :                 b.db = nil // prevent batch reuse on error
     320            0 :                 // NB: we are not doing <-p.commitQueueSem since the batch is still
     321            0 :                 // sitting in the pending queue. We should consider fixing this by also
     322            0 :                 // removing the batch from the pending queue.
     323            0 :                 return err
     324            0 :         }
     325              : 
     326              :         // Apply the batch to the memtable.
     327            1 :         if err := p.env.apply(b, mem); err != nil {
     328            0 :                 b.db = nil // prevent batch reuse on error
     329            0 :                 // NB: we are not doing <-p.commitQueueSem since the batch is still
     330            0 :                 // sitting in the pending queue. We should consider fixing this by also
     331            0 :                 // removing the batch from the pending queue.
     332            0 :                 return err
     333            0 :         }
     334              : 
     335              :         // Publish the batch sequence number.
     336            1 :         p.publish(b)
     337            1 : 
     338            1 :         <-p.commitQueueSem
     339            1 : 
     340            1 :         if !noSyncWait {
     341            1 :                 // Already waited for commit, so look at the error.
     342            1 :                 if b.commitErr != nil {
     343            0 :                         b.db = nil // prevent batch reuse on error
     344            0 :                         err = b.commitErr
     345            0 :                 }
     346              :         }
     347              :         // Else noSyncWait. The LogWriter can be concurrently writing to
     348              :         // b.commitErr. We will read b.commitErr in Batch.SyncWait after the
     349              :         // LogWriter is done writing.
     350              : 
     351            1 :         b.commitStats.TotalDuration = commitStartTime.Elapsed()
     352            1 : 
     353            1 :         return err
     354              : }
     355              : 
     356              : // AllocateSeqNum allocates count sequence numbers, invokes the prepare
     357              : // callback, then the apply callback, and then publishes the sequence
     358              : // numbers. AllocateSeqNum does not write to the WAL or add entries to the
     359              : // memtable. AllocateSeqNum can be used to sequence an operation such as
     360              : // sstable ingestion within the commit pipeline. The prepare callback is
     361              : // invoked with commitPipeline.mu held, but note that DB.mu is not held and
     362              : // must be locked if necessary.
     363              : func (p *commitPipeline) AllocateSeqNum(
     364              :         count int, prepare func(seqNum base.SeqNum), apply func(seqNum base.SeqNum),
     365            1 : ) {
     366            1 :         // This method is similar to Commit and prepare. Be careful about trying to
     367            1 :         // share additional code with those methods because Commit and prepare are
     368            1 :         // performance critical code paths.
     369            1 : 
     370            1 :         b := newBatch(nil)
     371            1 :         defer b.Close()
     372            1 : 
     373            1 :         // Give the batch a count of 1 so that the log and visible sequence number
     374            1 :         // are incremented correctly.
     375            1 :         b.data = make([]byte, batchrepr.HeaderLen)
     376            1 :         b.setCount(uint32(count))
     377            1 :         b.commit.Add(1)
     378            1 : 
     379            1 :         p.commitQueueSem <- struct{}{}
     380            1 : 
     381            1 :         p.mu.Lock()
     382            1 : 
     383            1 :         // Enqueue the batch in the pending queue. Note that while the pending queue
     384            1 :         // is lock-free, we want the order of batches to be the same as the sequence
     385            1 :         // number order.
     386            1 :         p.pending.enqueue(b)
     387            1 : 
     388            1 :         // Assign the batch a sequence number. Note that we use atomic operations
     389            1 :         // here to handle concurrent reads of logSeqNum. commitPipeline.mu provides
     390            1 :         // mutual exclusion for other goroutines writing to logSeqNum.
     391            1 :         logSeqNum := p.env.logSeqNum.Add(base.SeqNum(count)) - base.SeqNum(count)
     392            1 :         seqNum := logSeqNum
     393            1 :         if seqNum == 0 {
     394            1 :                 // We can't use the value 0 for the global seqnum during ingestion, because
     395            1 :                 // 0 indicates no global seqnum. So allocate one more seqnum.
     396            1 :                 p.env.logSeqNum.Add(1)
     397            1 :                 seqNum++
     398            1 :         }
     399            1 :         b.setSeqNum(seqNum)
     400            1 : 
     401            1 :         // Wait for any outstanding writes to the memtable to complete. This is
     402            1 :         // necessary for ingestion so that the check for memtable overlap can see any
     403            1 :         // writes that were sequenced before the ingestion. The spin loop is
     404            1 :         // unfortunate, but obviates the need for additional synchronization.
     405            1 :         for {
     406            1 :                 visibleSeqNum := p.env.visibleSeqNum.Load()
     407            1 :                 if visibleSeqNum == logSeqNum {
     408            1 :                         break
     409              :                 }
     410            1 :                 runtime.Gosched()
     411              :         }
     412              : 
     413              :         // Invoke the prepare callback. Note the lack of error reporting. Even if the
     414              :         // callback internally fails, the sequence number needs to be published in
     415              :         // order to allow the commit pipeline to proceed.
     416            1 :         prepare(b.SeqNum())
     417            1 : 
     418            1 :         p.mu.Unlock()
     419            1 : 
     420            1 :         // Invoke the apply callback.
     421            1 :         apply(b.SeqNum())
     422            1 : 
     423            1 :         // Publish the sequence number.
     424            1 :         p.publish(b)
     425            1 : 
     426            1 :         <-p.commitQueueSem
     427              : }
     428              : 
     429            1 : func (p *commitPipeline) prepare(b *Batch, syncWAL bool, noSyncWait bool) (*memTable, error) {
     430            1 :         n := uint64(b.Count())
     431            1 :         if n == invalidBatchCount {
     432            0 :                 return nil, ErrInvalidBatch
     433            0 :         }
     434            1 :         var syncWG *sync.WaitGroup
     435            1 :         var syncErr *error
     436            1 :         switch {
     437            1 :         case !syncWAL:
     438            1 :                 // Only need to wait for the publish.
     439            1 :                 b.commit.Add(1)
     440              :         // Remaining cases represent syncWAL=true.
     441            1 :         case noSyncWait:
     442            1 :                 syncErr = &b.commitErr
     443            1 :                 syncWG = &b.fsyncWait
     444            1 :                 // Only need to wait synchronously for the publish. The user will
     445            1 :                 // (asynchronously) wait on the batch's fsyncWait.
     446            1 :                 b.commit.Add(1)
     447            1 :                 b.fsyncWait.Add(1)
     448            1 :         case !noSyncWait:
     449            1 :                 syncErr = &b.commitErr
     450            1 :                 syncWG = &b.commit
     451            1 :                 // Must wait for both the publish and the WAL fsync.
     452            1 :                 b.commit.Add(2)
     453              :         }
     454              : 
     455            1 :         p.mu.Lock()
     456            1 : 
     457            1 :         // Enqueue the batch in the pending queue. Note that while the pending queue
     458            1 :         // is lock-free, we want the order of batches to be the same as the sequence
     459            1 :         // number order.
     460            1 :         p.pending.enqueue(b)
     461            1 : 
     462            1 :         // Assign the batch a sequence number. Note that we use atomic operations
     463            1 :         // here to handle concurrent reads of logSeqNum. commitPipeline.mu provides
     464            1 :         // mutual exclusion for other goroutines writing to logSeqNum.
     465            1 :         b.setSeqNum(p.env.logSeqNum.Add(base.SeqNum(n)) - base.SeqNum(n))
     466            1 : 
     467            1 :         // Write the data to the WAL.
     468            1 :         mem, err := p.env.write(b, syncWG, syncErr)
     469            1 : 
     470            1 :         p.mu.Unlock()
     471            1 : 
     472            1 :         return mem, err
     473              : }
     474              : 
     475            1 : func (p *commitPipeline) publish(b *Batch) {
     476            1 :         // Mark the batch as applied.
     477            1 :         b.applied.Store(true)
     478            1 : 
     479            1 :         // Loop dequeuing applied batches from the pending queue. If our batch was
     480            1 :         // the head of the pending queue we are guaranteed that either we'll publish
     481            1 :         // it or someone else will dequeueApplied and publish it. If our batch is not the
     482            1 :         // head of the queue then either we'll dequeueApplied applied batches and reach our
     483            1 :         // batch or there is an unapplied batch blocking us. When that unapplied
     484            1 :         // batch applies it will go through the same process and publish our batch
     485            1 :         // for us.
     486            1 :         for {
     487            1 :                 t := p.pending.dequeueApplied()
     488            1 :                 if t == nil {
     489            1 :                         // Wait for another goroutine to publish us. We might also be waiting for
     490            1 :                         // the WAL sync to finish.
     491            1 :                         now := crtime.NowMono()
     492            1 :                         b.commit.Wait()
     493            1 :                         b.commitStats.CommitWaitDuration += now.Elapsed()
     494            1 :                         break
     495              :                 }
     496            1 :                 if !t.applied.Load() {
     497            0 :                         panic("not reached")
     498              :                 }
     499              : 
     500              :                 // We're responsible for publishing the sequence number for batch t, but
     501              :                 // another concurrent goroutine might sneak in and publish the sequence
     502              :                 // number for a subsequent batch. That's ok as all we're guaranteeing is
     503              :                 // that the sequence number ratchets up.
     504            1 :                 for {
     505            1 :                         curSeqNum := p.env.visibleSeqNum.Load()
     506            1 :                         newSeqNum := t.SeqNum() + base.SeqNum(t.Count())
     507            1 :                         if newSeqNum <= curSeqNum {
     508            1 :                                 // t's sequence number has already been published.
     509            1 :                                 break
     510              :                         }
     511            1 :                         if p.env.visibleSeqNum.CompareAndSwap(curSeqNum, newSeqNum) {
     512            1 :                                 // We successfully published t's sequence number.
     513            1 :                                 break
     514              :                         }
     515              :                 }
     516              : 
     517            1 :                 t.commit.Done()
     518              :         }
     519              : }
        

Generated by: LCOV version 2.0-1