LCOV - code coverage report
Current view: top level - pebble/sstable - write_queue.go (source / functions) Hit Total Coverage
Test: 2023-11-15 08:16Z 6682fd5e - tests + meta.lcov Lines: 68 72 94.4 %
Date: 2023-11-15 08:18:05 Functions: 0 0 -

          Line data    Source code
       1             : package sstable
       2             : 
       3             : import (
       4             :         "sync"
       5             : 
       6             :         "github.com/cockroachdb/pebble/internal/base"
       7             : )
       8             : 
       9             : type writeTask struct {
      10             :         // Since writeTasks are pooled, the compressionDone channel will be re-used.
      11             :         // It is necessary that any writes to the channel have already been read,
      12             :         // before adding the writeTask back to the pool.
      13             :         compressionDone chan bool
      14             :         buf             *dataBlockBuf
      15             :         // If this is not nil, then this index block will be flushed.
      16             :         flushableIndexBlock *indexBlockBuf
      17             :         // currIndexBlock is the index block on which indexBlock.add must be called.
      18             :         currIndexBlock *indexBlockBuf
      19             :         indexEntrySep  InternalKey
      20             :         // inflightIndexEntrySize is used to decrement Writer.indexBlock.sizeEstimate.inflightSize.
      21             :         indexInflightSize int
      22             :         // If the index block is finished, then we set the finishedIndexProps here.
      23             :         finishedIndexProps []byte
      24             : }
      25             : 
      26             : // It is not the responsibility of the writeTask to clear the
      27             : // task.flushableIndexBlock, and task.buf.
      28           2 : func (task *writeTask) clear() {
      29           2 :         *task = writeTask{
      30           2 :                 indexEntrySep:   base.InvalidInternalKey,
      31           2 :                 compressionDone: task.compressionDone,
      32           2 :         }
      33           2 : }
      34             : 
      35             : // Note that only the Writer client goroutine will be adding tasks to the writeQueue.
      36             : // Both the Writer client and the compression goroutines will be able to write to
      37             : // writeTask.compressionDone to indicate that the compression job associated with
      38             : // a writeTask has finished.
      39             : type writeQueue struct {
      40             :         tasks  chan *writeTask
      41             :         wg     sync.WaitGroup
      42             :         writer *Writer
      43             : 
      44             :         // err represents an error which is encountered when the write queue attempts
      45             :         // to write a block to disk. The error is stored here to skip unnecessary block
      46             :         // writes once the first error is encountered.
      47             :         err    error
      48             :         closed bool
      49             : }
      50             : 
      51           2 : func newWriteQueue(size int, writer *Writer) *writeQueue {
      52           2 :         w := &writeQueue{}
      53           2 :         w.tasks = make(chan *writeTask, size)
      54           2 :         w.writer = writer
      55           2 : 
      56           2 :         w.wg.Add(1)
      57           2 :         go w.runWorker()
      58           2 :         return w
      59           2 : }
      60             : 
      61           2 : func (w *writeQueue) performWrite(task *writeTask) error {
      62           2 :         var bh BlockHandle
      63           2 :         var bhp BlockHandleWithProperties
      64           2 : 
      65           2 :         var err error
      66           2 :         if bh, err = w.writer.writeCompressedBlock(task.buf.compressed, task.buf.tmp[:]); err != nil {
      67           0 :                 return err
      68           0 :         }
      69             : 
      70           2 :         bhp = BlockHandleWithProperties{BlockHandle: bh, Props: task.buf.dataBlockProps}
      71           2 :         if err = w.writer.addIndexEntry(
      72           2 :                 task.indexEntrySep, bhp, task.buf.tmp[:], task.flushableIndexBlock, task.currIndexBlock,
      73           2 :                 task.indexInflightSize, task.finishedIndexProps); err != nil {
      74           0 :                 return err
      75           0 :         }
      76             : 
      77           2 :         return nil
      78             : }
      79             : 
      80             : // It is necessary to ensure that none of the buffers in the writeTask,
      81             : // dataBlockBuf, indexBlockBuf, are pointed to by another struct.
      82           2 : func (w *writeQueue) releaseBuffers(task *writeTask) {
      83           2 :         task.buf.clear()
      84           2 :         dataBlockBufPool.Put(task.buf)
      85           2 : 
      86           2 :         // This index block is no longer used by the Writer, so we can add it back
      87           2 :         // to the pool.
      88           2 :         if task.flushableIndexBlock != nil {
      89           2 :                 task.flushableIndexBlock.clear()
      90           2 :                 indexBlockBufPool.Put(task.flushableIndexBlock)
      91           2 :         }
      92             : 
      93           2 :         task.clear()
      94           2 :         writeTaskPool.Put(task)
      95             : }
      96             : 
      97           2 : func (w *writeQueue) runWorker() {
      98           2 :         for task := range w.tasks {
      99           2 :                 <-task.compressionDone
     100           2 : 
     101           2 :                 if w.err == nil {
     102           2 :                         w.err = w.performWrite(task)
     103           2 :                 }
     104             : 
     105           2 :                 w.releaseBuffers(task)
     106             :         }
     107           2 :         w.wg.Done()
     108             : }
     109             : 
     110           2 : func (w *writeQueue) add(task *writeTask) {
     111           2 :         w.tasks <- task
     112           2 : }
     113             : 
     114             : // addSync will perform the writeTask synchronously with the caller goroutine. Calls to addSync
     115             : // are no longer valid once writeQueue.add has been called at least once.
     116           2 : func (w *writeQueue) addSync(task *writeTask) error {
     117           2 :         // This should instantly return without blocking.
     118           2 :         <-task.compressionDone
     119           2 : 
     120           2 :         if w.err == nil {
     121           2 :                 w.err = w.performWrite(task)
     122           2 :         }
     123             : 
     124           2 :         w.releaseBuffers(task)
     125           2 : 
     126           2 :         return w.err
     127             : }
     128             : 
     129             : // finish should only be called once no more tasks will be added to the writeQueue.
     130             : // finish will return any error which was encountered while tasks were processed.
     131           2 : func (w *writeQueue) finish() error {
     132           2 :         if w.closed {
     133           1 :                 return w.err
     134           1 :         }
     135             : 
     136           2 :         close(w.tasks)
     137           2 :         w.wg.Wait()
     138           2 :         w.closed = true
     139           2 :         return w.err
     140             : }

Generated by: LCOV version 1.14