Line data Source code
1 : package sstable 2 : 3 : import ( 4 : "sync" 5 : 6 : "github.com/cockroachdb/pebble/internal/base" 7 : ) 8 : 9 : type writeTask struct { 10 : // Since writeTasks are pooled, the compressionDone channel will be re-used. 11 : // It is necessary that any writes to the channel have already been read, 12 : // before adding the writeTask back to the pool. 13 : compressionDone chan bool 14 : buf *dataBlockBuf 15 : // If this is not nil, then this index block will be flushed. 16 : flushableIndexBlock *indexBlockBuf 17 : // currIndexBlock is the index block on which indexBlock.add must be called. 18 : currIndexBlock *indexBlockBuf 19 : indexEntrySep InternalKey 20 : // inflightIndexEntrySize is used to decrement Writer.indexBlock.sizeEstimate.inflightSize. 21 : indexInflightSize int 22 : // If the index block is finished, then we set the finishedIndexProps here. 23 : finishedIndexProps []byte 24 : } 25 : 26 : // It is not the responsibility of the writeTask to clear the 27 : // task.flushableIndexBlock, and task.buf. 28 1 : func (task *writeTask) clear() { 29 1 : *task = writeTask{ 30 1 : indexEntrySep: base.InvalidInternalKey, 31 1 : compressionDone: task.compressionDone, 32 1 : } 33 1 : } 34 : 35 : // Note that only the Writer client goroutine will be adding tasks to the writeQueue. 36 : // Both the Writer client and the compression goroutines will be able to write to 37 : // writeTask.compressionDone to indicate that the compression job associated with 38 : // a writeTask has finished. 39 : type writeQueue struct { 40 : tasks chan *writeTask 41 : wg sync.WaitGroup 42 : writer *Writer 43 : 44 : // err represents an error which is encountered when the write queue attempts 45 : // to write a block to disk. The error is stored here to skip unnecessary block 46 : // writes once the first error is encountered. 47 : err error 48 : closed bool 49 : } 50 : 51 1 : func newWriteQueue(size int, writer *Writer) *writeQueue { 52 1 : w := &writeQueue{} 53 1 : w.tasks = make(chan *writeTask, size) 54 1 : w.writer = writer 55 1 : 56 1 : w.wg.Add(1) 57 1 : go w.runWorker() 58 1 : return w 59 1 : } 60 : 61 1 : func (w *writeQueue) performWrite(task *writeTask) error { 62 1 : var bh BlockHandle 63 1 : var bhp BlockHandleWithProperties 64 1 : 65 1 : var err error 66 1 : if bh, err = w.writer.writeCompressedBlock(task.buf.compressed, task.buf.tmp[:]); err != nil { 67 0 : return err 68 0 : } 69 : 70 1 : bhp = BlockHandleWithProperties{BlockHandle: bh, Props: task.buf.dataBlockProps} 71 1 : if err = w.writer.addIndexEntry( 72 1 : task.indexEntrySep, bhp, task.buf.tmp[:], task.flushableIndexBlock, task.currIndexBlock, 73 1 : task.indexInflightSize, task.finishedIndexProps); err != nil { 74 0 : return err 75 0 : } 76 : 77 1 : return nil 78 : } 79 : 80 : // It is necessary to ensure that none of the buffers in the writeTask, 81 : // dataBlockBuf, indexBlockBuf, are pointed to by another struct. 82 1 : func (w *writeQueue) releaseBuffers(task *writeTask) { 83 1 : task.buf.clear() 84 1 : dataBlockBufPool.Put(task.buf) 85 1 : 86 1 : // This index block is no longer used by the Writer, so we can add it back 87 1 : // to the pool. 88 1 : if task.flushableIndexBlock != nil { 89 1 : task.flushableIndexBlock.clear() 90 1 : indexBlockBufPool.Put(task.flushableIndexBlock) 91 1 : } 92 : 93 1 : task.clear() 94 1 : writeTaskPool.Put(task) 95 : } 96 : 97 1 : func (w *writeQueue) runWorker() { 98 1 : for task := range w.tasks { 99 1 : <-task.compressionDone 100 1 : 101 1 : if w.err == nil { 102 1 : w.err = w.performWrite(task) 103 1 : } 104 : 105 1 : w.releaseBuffers(task) 106 : } 107 1 : w.wg.Done() 108 : } 109 : 110 1 : func (w *writeQueue) add(task *writeTask) { 111 1 : w.tasks <- task 112 1 : } 113 : 114 : // addSync will perform the writeTask synchronously with the caller goroutine. Calls to addSync 115 : // are no longer valid once writeQueue.add has been called at least once. 116 1 : func (w *writeQueue) addSync(task *writeTask) error { 117 1 : // This should instantly return without blocking. 118 1 : <-task.compressionDone 119 1 : 120 1 : if w.err == nil { 121 1 : w.err = w.performWrite(task) 122 1 : } 123 : 124 1 : w.releaseBuffers(task) 125 1 : 126 1 : return w.err 127 : } 128 : 129 : // finish should only be called once no more tasks will be added to the writeQueue. 130 : // finish will return any error which was encountered while tasks were processed. 131 1 : func (w *writeQueue) finish() error { 132 1 : if w.closed { 133 0 : return w.err 134 0 : } 135 : 136 1 : close(w.tasks) 137 1 : w.wg.Wait() 138 1 : w.closed = true 139 1 : return w.err 140 : }