LCOV - code coverage report
Current view: top level - pebble/sstable - write_queue.go (source / functions) Hit Total Coverage
Test: 2024-09-21 08:16Z aba5868b - tests only.lcov Lines: 66 70 94.3 %
Date: 2024-09-21 08:16:32 Functions: 0 0 -

          Line data    Source code
       1             : package sstable
       2             : 
       3             : import (
       4             :         "sync"
       5             : 
       6             :         "github.com/cockroachdb/pebble/internal/base"
       7             :         "github.com/cockroachdb/pebble/sstable/block"
       8             : )
       9             : 
      10             : type writeTask struct {
      11             :         // Since writeTasks are pooled, the compressionDone channel will be re-used.
      12             :         // It is necessary that any writes to the channel have already been read,
      13             :         // before adding the writeTask back to the pool.
      14             :         compressionDone chan bool
      15             :         buf             *dataBlockBuf
      16             :         // If this is not nil, then this index block will be flushed.
      17             :         flushableIndexBlock *indexBlockBuf
      18             :         // currIndexBlock is the index block on which indexBlock.add must be called.
      19             :         currIndexBlock *indexBlockBuf
      20             :         indexEntrySep  InternalKey
      21             :         // inflightIndexEntrySize is used to decrement Writer.indexBlock.sizeEstimate.inflightSize.
      22             :         indexInflightSize int
      23             :         // If the index block is finished, then we set the finishedIndexProps here.
      24             :         finishedIndexProps []byte
      25             : }
      26             : 
      27             : // It is not the responsibility of the writeTask to clear the
      28             : // task.flushableIndexBlock, and task.buf.
      29           1 : func (task *writeTask) clear() {
      30           1 :         *task = writeTask{
      31           1 :                 indexEntrySep:   base.InvalidInternalKey,
      32           1 :                 compressionDone: task.compressionDone,
      33           1 :         }
      34           1 : }
      35             : 
      36             : // Note that only the Writer client goroutine will be adding tasks to the writeQueue.
      37             : // Both the Writer client and the compression goroutines will be able to write to
      38             : // writeTask.compressionDone to indicate that the compression job associated with
      39             : // a writeTask has finished.
      40             : type writeQueue struct {
      41             :         tasks  chan *writeTask
      42             :         wg     sync.WaitGroup
      43             :         writer *RawRowWriter
      44             : 
      45             :         // err represents an error which is encountered when the write queue attempts
      46             :         // to write a block to disk. The error is stored here to skip unnecessary block
      47             :         // writes once the first error is encountered.
      48             :         err    error
      49             :         closed bool
      50             : }
      51             : 
      52           1 : func newWriteQueue(size int, writer *RawRowWriter) *writeQueue {
      53           1 :         w := &writeQueue{}
      54           1 :         w.tasks = make(chan *writeTask, size)
      55           1 :         w.writer = writer
      56           1 : 
      57           1 :         w.wg.Add(1)
      58           1 :         go w.runWorker()
      59           1 :         return w
      60           1 : }
      61             : 
      62           1 : func (w *writeQueue) performWrite(task *writeTask) error {
      63           1 :         var bhp block.HandleWithProperties
      64           1 :         var err error
      65           1 :         if bhp.Handle, err = w.writer.layout.WritePrecompressedDataBlock(task.buf.physical); err != nil {
      66           0 :                 return err
      67           0 :         }
      68           1 :         bhp = block.HandleWithProperties{Handle: bhp.Handle, Props: task.buf.dataBlockProps}
      69           1 :         if err = w.writer.addIndexEntry(
      70           1 :                 task.indexEntrySep, bhp, task.buf.tmp[:], task.flushableIndexBlock, task.currIndexBlock,
      71           1 :                 task.indexInflightSize, task.finishedIndexProps); err != nil {
      72           0 :                 return err
      73           0 :         }
      74             : 
      75           1 :         return nil
      76             : }
      77             : 
      78             : // It is necessary to ensure that none of the buffers in the writeTask,
      79             : // dataBlockBuf, indexBlockBuf, are pointed to by another struct.
      80           1 : func (w *writeQueue) releaseBuffers(task *writeTask) {
      81           1 :         task.buf.clear()
      82           1 :         dataBlockBufPool.Put(task.buf)
      83           1 : 
      84           1 :         // This index block is no longer used by the Writer, so we can add it back
      85           1 :         // to the pool.
      86           1 :         if task.flushableIndexBlock != nil {
      87           1 :                 task.flushableIndexBlock.clear()
      88           1 :                 indexBlockBufPool.Put(task.flushableIndexBlock)
      89           1 :         }
      90             : 
      91           1 :         task.clear()
      92           1 :         writeTaskPool.Put(task)
      93             : }
      94             : 
      95           1 : func (w *writeQueue) runWorker() {
      96           1 :         for task := range w.tasks {
      97           1 :                 <-task.compressionDone
      98           1 : 
      99           1 :                 if w.err == nil {
     100           1 :                         w.err = w.performWrite(task)
     101           1 :                 }
     102             : 
     103           1 :                 w.releaseBuffers(task)
     104             :         }
     105           1 :         w.wg.Done()
     106             : }
     107             : 
     108           1 : func (w *writeQueue) add(task *writeTask) {
     109           1 :         w.tasks <- task
     110           1 : }
     111             : 
     112             : // addSync will perform the writeTask synchronously with the caller goroutine. Calls to addSync
     113             : // are no longer valid once writeQueue.add has been called at least once.
     114           1 : func (w *writeQueue) addSync(task *writeTask) error {
     115           1 :         // This should instantly return without blocking.
     116           1 :         <-task.compressionDone
     117           1 : 
     118           1 :         if w.err == nil {
     119           1 :                 w.err = w.performWrite(task)
     120           1 :         }
     121             : 
     122           1 :         w.releaseBuffers(task)
     123           1 : 
     124           1 :         return w.err
     125             : }
     126             : 
     127             : // finish should only be called once no more tasks will be added to the writeQueue.
     128             : // finish will return any error which was encountered while tasks were processed.
     129           1 : func (w *writeQueue) finish() error {
     130           1 :         if w.closed {
     131           1 :                 return w.err
     132           1 :         }
     133             : 
     134           1 :         close(w.tasks)
     135           1 :         w.wg.Wait()
     136           1 :         w.closed = true
     137           1 :         return w.err
     138             : }

Generated by: LCOV version 1.14