Line data Source code
1 : package sstable 2 : 3 : import ( 4 : "sync" 5 : 6 : "github.com/cockroachdb/pebble/internal/base" 7 : "github.com/cockroachdb/pebble/sstable/block" 8 : ) 9 : 10 : type writeTask struct { 11 : // Since writeTasks are pooled, the compressionDone channel will be re-used. 12 : // It is necessary that any writes to the channel have already been read, 13 : // before adding the writeTask back to the pool. 14 : compressionDone chan bool 15 : buf *dataBlockBuf 16 : // If this is not nil, then this index block will be flushed. 17 : flushableIndexBlock *indexBlockBuf 18 : // currIndexBlock is the index block on which indexBlock.add must be called. 19 : currIndexBlock *indexBlockBuf 20 : indexEntrySep InternalKey 21 : // inflightIndexEntrySize is used to decrement Writer.indexBlock.sizeEstimate.inflightSize. 22 : indexInflightSize int 23 : // If the index block is finished, then we set the finishedIndexProps here. 24 : finishedIndexProps []byte 25 : } 26 : 27 : // It is not the responsibility of the writeTask to clear the 28 : // task.flushableIndexBlock, and task.buf. 29 1 : func (task *writeTask) clear() { 30 1 : *task = writeTask{ 31 1 : indexEntrySep: base.InvalidInternalKey, 32 1 : compressionDone: task.compressionDone, 33 1 : } 34 1 : } 35 : 36 : // Note that only the Writer client goroutine will be adding tasks to the writeQueue. 37 : // Both the Writer client and the compression goroutines will be able to write to 38 : // writeTask.compressionDone to indicate that the compression job associated with 39 : // a writeTask has finished. 40 : type writeQueue struct { 41 : tasks chan *writeTask 42 : wg sync.WaitGroup 43 : writer *RawRowWriter 44 : 45 : // err represents an error which is encountered when the write queue attempts 46 : // to write a block to disk. The error is stored here to skip unnecessary block 47 : // writes once the first error is encountered. 48 : err error 49 : closed bool 50 : } 51 : 52 1 : func newWriteQueue(size int, writer *RawRowWriter) *writeQueue { 53 1 : w := &writeQueue{} 54 1 : w.tasks = make(chan *writeTask, size) 55 1 : w.writer = writer 56 1 : 57 1 : w.wg.Add(1) 58 1 : go w.runWorker() 59 1 : return w 60 1 : } 61 : 62 1 : func (w *writeQueue) performWrite(task *writeTask) error { 63 1 : var bhp block.HandleWithProperties 64 1 : var err error 65 1 : if bhp.Handle, err = w.writer.layout.WritePrecompressedDataBlock(task.buf.physical); err != nil { 66 0 : return err 67 0 : } 68 1 : bhp = block.HandleWithProperties{Handle: bhp.Handle, Props: task.buf.dataBlockProps} 69 1 : if err = w.writer.addIndexEntry( 70 1 : task.indexEntrySep, bhp, task.buf.tmp[:], task.flushableIndexBlock, task.currIndexBlock, 71 1 : task.indexInflightSize, task.finishedIndexProps); err != nil { 72 0 : return err 73 0 : } 74 : 75 1 : return nil 76 : } 77 : 78 : // It is necessary to ensure that none of the buffers in the writeTask, 79 : // dataBlockBuf, indexBlockBuf, are pointed to by another struct. 80 1 : func (w *writeQueue) releaseBuffers(task *writeTask) { 81 1 : task.buf.clear() 82 1 : dataBlockBufPool.Put(task.buf) 83 1 : 84 1 : // This index block is no longer used by the Writer, so we can add it back 85 1 : // to the pool. 86 1 : if task.flushableIndexBlock != nil { 87 1 : task.flushableIndexBlock.clear() 88 1 : indexBlockBufPool.Put(task.flushableIndexBlock) 89 1 : } 90 : 91 1 : task.clear() 92 1 : writeTaskPool.Put(task) 93 : } 94 : 95 1 : func (w *writeQueue) runWorker() { 96 1 : for task := range w.tasks { 97 1 : <-task.compressionDone 98 1 : 99 1 : if w.err == nil { 100 1 : w.err = w.performWrite(task) 101 1 : } 102 : 103 1 : w.releaseBuffers(task) 104 : } 105 1 : w.wg.Done() 106 : } 107 : 108 1 : func (w *writeQueue) add(task *writeTask) { 109 1 : w.tasks <- task 110 1 : } 111 : 112 : // addSync will perform the writeTask synchronously with the caller goroutine. Calls to addSync 113 : // are no longer valid once writeQueue.add has been called at least once. 114 1 : func (w *writeQueue) addSync(task *writeTask) error { 115 1 : // This should instantly return without blocking. 116 1 : <-task.compressionDone 117 1 : 118 1 : if w.err == nil { 119 1 : w.err = w.performWrite(task) 120 1 : } 121 : 122 1 : w.releaseBuffers(task) 123 1 : 124 1 : return w.err 125 : } 126 : 127 : // finish should only be called once no more tasks will be added to the writeQueue. 128 : // finish will return any error which was encountered while tasks were processed. 129 1 : func (w *writeQueue) finish() error { 130 1 : if w.closed { 131 1 : return w.err 132 1 : } 133 : 134 1 : close(w.tasks) 135 1 : w.wg.Wait() 136 1 : w.closed = true 137 1 : return w.err 138 : }