Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package record
6 :
7 : import (
8 : "context"
9 : "encoding/binary"
10 : "io"
11 : "runtime/pprof"
12 : "sync"
13 : "sync/atomic"
14 : "time"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/crc"
19 : "github.com/prometheus/client_golang/prometheus"
20 : )
21 :
22 : var walSyncLabels = pprof.Labels("pebble", "wal-sync")
23 : var errClosedWriter = errors.New("pebble/record: closed LogWriter")
24 :
25 : type block struct {
26 : // buf[:written] has already been filled with fragments. Updated atomically.
27 : written atomic.Int32
28 : // buf[:flushed] has already been flushed to w.
29 : flushed int32
30 : buf [blockSize]byte
31 : }
32 :
33 : type flusher interface {
34 : Flush() error
35 : }
36 :
37 : type syncer interface {
38 : Sync() error
39 : }
40 :
41 : const (
42 : syncConcurrencyBits = 12
43 :
44 : // SyncConcurrency is the maximum number of concurrent sync operations that
45 : // can be performed. Note that a sync operation is initiated either by a call
46 : // to SyncRecord or by a call to Close. Exported as this value also limits
47 : // the commit concurrency in commitPipeline.
48 : SyncConcurrency = 1 << syncConcurrencyBits
49 : )
50 :
51 : type syncSlot struct {
52 : wg *sync.WaitGroup
53 : err *error
54 : }
55 :
56 : // syncQueue is a lock-free fixed-size single-producer, single-consumer
57 : // queue. The single-producer can push to the head, and the single-consumer can
58 : // pop multiple values from the tail. Popping calls Done() on each of the
59 : // available *sync.WaitGroup elements.
60 : type syncQueue struct {
61 : // headTail packs together a 32-bit head index and a 32-bit tail index. Both
62 : // are indexes into slots modulo len(slots)-1.
63 : //
64 : // tail = index of oldest data in queue
65 : // head = index of next slot to fill
66 : //
67 : // Slots in the range [tail, head) are owned by consumers. A consumer
68 : // continues to own a slot outside this range until it nils the slot, at
69 : // which point ownership passes to the producer.
70 : //
71 : // The head index is stored in the most-significant bits so that we can
72 : // atomically add to it and the overflow is harmless.
73 : headTail atomic.Uint64
74 :
75 : // slots is a ring buffer of values stored in this queue. The size must be a
76 : // power of 2. A slot is in use until the tail index has moved beyond it.
77 : slots [SyncConcurrency]syncSlot
78 :
79 : // blocked is an atomic boolean which indicates whether syncing is currently
80 : // blocked or can proceed. It is used by the implementation of
81 : // min-sync-interval to block syncing until the min interval has passed.
82 : blocked atomic.Bool
83 : }
84 :
85 : const dequeueBits = 32
86 :
87 1 : func (q *syncQueue) unpack(ptrs uint64) (head, tail uint32) {
88 1 : const mask = 1<<dequeueBits - 1
89 1 : head = uint32((ptrs >> dequeueBits) & mask)
90 1 : tail = uint32(ptrs & mask)
91 1 : return
92 1 : }
93 :
94 1 : func (q *syncQueue) push(wg *sync.WaitGroup, err *error) {
95 1 : ptrs := q.headTail.Load()
96 1 : head, tail := q.unpack(ptrs)
97 1 : if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
98 0 : panic("pebble: queue is full")
99 : }
100 :
101 1 : slot := &q.slots[head&uint32(len(q.slots)-1)]
102 1 : slot.wg = wg
103 1 : slot.err = err
104 1 :
105 1 : // Increment head. This passes ownership of slot to dequeue and acts as a
106 1 : // store barrier for writing the slot.
107 1 : q.headTail.Add(1 << dequeueBits)
108 : }
109 :
110 1 : func (q *syncQueue) setBlocked() {
111 1 : q.blocked.Store(true)
112 1 : }
113 :
114 1 : func (q *syncQueue) clearBlocked() {
115 1 : q.blocked.Store(false)
116 1 : }
117 :
118 1 : func (q *syncQueue) empty() bool {
119 1 : head, tail, _ := q.load()
120 1 : return head == tail
121 1 : }
122 :
123 : // load returns the head, tail of the queue for what should be synced to the
124 : // caller. It can return a head, tail of zero if syncing is blocked due to
125 : // min-sync-interval. It additionally returns the real length of this queue,
126 : // regardless of whether syncing is blocked.
127 1 : func (q *syncQueue) load() (head, tail, realLength uint32) {
128 1 : ptrs := q.headTail.Load()
129 1 : head, tail = q.unpack(ptrs)
130 1 : realLength = head - tail
131 1 : if q.blocked.Load() {
132 1 : return 0, 0, realLength
133 1 : }
134 1 : return head, tail, realLength
135 : }
136 :
137 : // REQUIRES: queueSemChan is non-nil.
138 1 : func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{}) error {
139 1 : if tail == head {
140 1 : // Queue is empty.
141 1 : return nil
142 1 : }
143 :
144 1 : for ; tail != head; tail++ {
145 1 : slot := &q.slots[tail&uint32(len(q.slots)-1)]
146 1 : wg := slot.wg
147 1 : if wg == nil {
148 0 : return errors.Errorf("nil waiter at %d", errors.Safe(tail&uint32(len(q.slots)-1)))
149 0 : }
150 1 : *slot.err = err
151 1 : slot.wg = nil
152 1 : slot.err = nil
153 1 : // We need to bump the tail count before signalling the wait group as
154 1 : // signalling the wait group can trigger release a blocked goroutine which
155 1 : // will try to enqueue before we've "freed" space in the queue.
156 1 : q.headTail.Add(1)
157 1 : wg.Done()
158 1 : // Is always non-nil in production.
159 1 : if queueSemChan != nil {
160 1 : <-queueSemChan
161 1 : }
162 : }
163 :
164 1 : return nil
165 : }
166 :
167 : // flusherCond is a specialized condition variable that allows its condition to
168 : // change and readiness be signalled without holding its associated mutex. In
169 : // particular, when a waiter is added to syncQueue atomically, this condition
170 : // variable can be signalled without holding flusher.Mutex.
171 : type flusherCond struct {
172 : mu *sync.Mutex
173 : q *syncQueue
174 : cond sync.Cond
175 : }
176 :
177 1 : func (c *flusherCond) init(mu *sync.Mutex, q *syncQueue) {
178 1 : c.mu = mu
179 1 : c.q = q
180 1 : // Yes, this is a bit circular, but that is intentional. flusherCond.cond.L
181 1 : // points flusherCond so that when cond.L.Unlock is called flusherCond.Unlock
182 1 : // will be called and we can check the !syncQueue.empty() condition.
183 1 : c.cond.L = c
184 1 : }
185 :
186 1 : func (c *flusherCond) Signal() {
187 1 : // Pass-through to the cond var.
188 1 : c.cond.Signal()
189 1 : }
190 :
191 1 : func (c *flusherCond) Wait() {
192 1 : // Pass-through to the cond var. Note that internally the cond var implements
193 1 : // Wait as:
194 1 : //
195 1 : // t := notifyListAdd()
196 1 : // L.Unlock()
197 1 : // notifyListWait(t)
198 1 : // L.Lock()
199 1 : //
200 1 : // We've configured the cond var to call flusherReady.Unlock() which allows
201 1 : // us to check the !syncQueue.empty() condition without a danger of missing a
202 1 : // notification. Any call to flusherReady.Signal() after notifyListAdd() is
203 1 : // called will cause the subsequent notifyListWait() to return immediately.
204 1 : c.cond.Wait()
205 1 : }
206 :
207 1 : func (c *flusherCond) Lock() {
208 1 : c.mu.Lock()
209 1 : }
210 :
211 1 : func (c *flusherCond) Unlock() {
212 1 : c.mu.Unlock()
213 1 : if !c.q.empty() {
214 1 : // If the current goroutine is about to block on sync.Cond.Wait, this call
215 1 : // to Signal will prevent that. The comment in Wait above explains a bit
216 1 : // about what is going on here, but it is worth reiterating:
217 1 : //
218 1 : // flusherCond.Wait()
219 1 : // sync.Cond.Wait()
220 1 : // t := notifyListAdd()
221 1 : // flusherCond.Unlock() <-- we are here
222 1 : // notifyListWait(t)
223 1 : // flusherCond.Lock()
224 1 : //
225 1 : // The call to Signal here results in:
226 1 : //
227 1 : // sync.Cond.Signal()
228 1 : // notifyListNotifyOne()
229 1 : //
230 1 : // The call to notifyListNotifyOne() will prevent the call to
231 1 : // notifyListWait(t) from blocking.
232 1 : c.cond.Signal()
233 1 : }
234 : }
235 :
236 : type durationFunc func() time.Duration
237 :
238 : // syncTimer is an interface for timers, modeled on the closure callback mode
239 : // of time.Timer. See time.AfterFunc and LogWriter.afterFunc. syncTimer is used
240 : // by tests to mock out the timer functionality used to implement
241 : // min-sync-interval.
242 : type syncTimer interface {
243 : Reset(time.Duration) bool
244 : Stop() bool
245 : }
246 :
247 : // LogWriter writes records to an underlying io.Writer. In order to support WAL
248 : // file reuse, a LogWriter's records are tagged with the WAL's file
249 : // number. When reading a log file a record from a previous incarnation of the
250 : // file will return the error ErrInvalidLogNum.
251 : type LogWriter struct {
252 : // w is the underlying writer.
253 : w io.Writer
254 : // c is w as a closer.
255 : c io.Closer
256 : // s is w as a syncer.
257 : s syncer
258 : // logNum is the low 32-bits of the log's file number.
259 : logNum uint32
260 : // blockNum is the zero based block number for the current block.
261 : blockNum int64
262 : // err is any accumulated error. TODO(peter): This needs to be protected in
263 : // some fashion. Perhaps using atomic.Value.
264 : err error
265 : // block is the current block being written. Protected by flusher.Mutex.
266 : block *block
267 : free struct {
268 : sync.Mutex
269 : blocks []*block
270 : }
271 :
272 : flusher struct {
273 : sync.Mutex
274 : // Flusher ready is a condition variable that is signalled when there are
275 : // blocks to flush, syncing has been requested, or the LogWriter has been
276 : // closed. For signalling of a sync, it is safe to call without holding
277 : // flusher.Mutex.
278 : ready flusherCond
279 : // Set to true when the flush loop should be closed.
280 : close bool
281 : // Closed when the flush loop has terminated.
282 : closed chan struct{}
283 : // Accumulated flush error.
284 : err error
285 : // minSyncInterval is the minimum duration between syncs.
286 : minSyncInterval durationFunc
287 : fsyncLatency prometheus.Histogram
288 : pending []*block
289 : syncQ syncQueue
290 : metrics *LogWriterMetrics
291 : }
292 :
293 : // afterFunc is a hook to allow tests to mock out the timer functionality
294 : // used for min-sync-interval. In normal operation this points to
295 : // time.AfterFunc.
296 : afterFunc func(d time.Duration, f func()) syncTimer
297 :
298 : // See the comment for LogWriterConfig.QueueSemChan.
299 : queueSemChan chan struct{}
300 : }
301 :
302 : // LogWriterConfig is a struct used for configuring new LogWriters
303 : type LogWriterConfig struct {
304 : WALMinSyncInterval durationFunc
305 : WALFsyncLatency prometheus.Histogram
306 : // QueueSemChan is an optional channel to pop from when popping from
307 : // LogWriter.flusher.syncQueue. It functions as a semaphore that prevents
308 : // the syncQueue from overflowing (which will cause a panic). All production
309 : // code ensures this is non-nil.
310 : QueueSemChan chan struct{}
311 : }
312 :
313 : // initialAllocatedBlocksCap is the initial capacity of the various slices
314 : // intended to hold LogWriter blocks. The LogWriter may allocate more blocks
315 : // than this threshold allows.
316 : const initialAllocatedBlocksCap = 32
317 :
318 : // blockPool pools *blocks to avoid allocations. Blocks are only added to the
319 : // Pool when a LogWriter is closed. Before that, free blocks are maintained
320 : // within a LogWriter's own internal free list `w.free.blocks`.
321 : var blockPool = sync.Pool{
322 1 : New: func() any { return &block{} },
323 : }
324 :
325 : // NewLogWriter returns a new LogWriter.
326 : func NewLogWriter(
327 : w io.Writer, logNum base.DiskFileNum, logWriterConfig LogWriterConfig,
328 1 : ) *LogWriter {
329 1 : c, _ := w.(io.Closer)
330 1 : s, _ := w.(syncer)
331 1 : r := &LogWriter{
332 1 : w: w,
333 1 : c: c,
334 1 : s: s,
335 1 : // NB: we truncate the 64-bit log number to 32-bits. This is ok because a)
336 1 : // we are very unlikely to reach a file number of 4 billion and b) the log
337 1 : // number is used as a validation check and using only the low 32-bits is
338 1 : // sufficient for that purpose.
339 1 : logNum: uint32(logNum),
340 1 : afterFunc: func(d time.Duration, f func()) syncTimer {
341 0 : return time.AfterFunc(d, f)
342 0 : },
343 : queueSemChan: logWriterConfig.QueueSemChan,
344 : }
345 1 : r.free.blocks = make([]*block, 0, initialAllocatedBlocksCap)
346 1 : r.block = blockPool.Get().(*block)
347 1 : r.flusher.ready.init(&r.flusher.Mutex, &r.flusher.syncQ)
348 1 : r.flusher.closed = make(chan struct{})
349 1 : r.flusher.pending = make([]*block, 0, cap(r.free.blocks))
350 1 : r.flusher.metrics = &LogWriterMetrics{}
351 1 :
352 1 : f := &r.flusher
353 1 : f.minSyncInterval = logWriterConfig.WALMinSyncInterval
354 1 : f.fsyncLatency = logWriterConfig.WALFsyncLatency
355 1 :
356 1 : go func() {
357 1 : pprof.Do(context.Background(), walSyncLabels, r.flushLoop)
358 1 : }()
359 1 : return r
360 : }
361 :
362 1 : func (w *LogWriter) flushLoop(context.Context) {
363 1 : f := &w.flusher
364 1 : f.Lock()
365 1 :
366 1 : // Initialize idleStartTime to when the loop starts.
367 1 : idleStartTime := time.Now()
368 1 : var syncTimer syncTimer
369 1 : defer func() {
370 1 : // Capture the idle duration between the last piece of work and when the
371 1 : // loop terminated.
372 1 : f.metrics.WriteThroughput.IdleDuration += time.Since(idleStartTime)
373 1 : if syncTimer != nil {
374 1 : syncTimer.Stop()
375 1 : }
376 1 : close(f.closed)
377 1 : f.Unlock()
378 : }()
379 :
380 : // The flush loop performs flushing of full and partial data blocks to the
381 : // underlying writer (LogWriter.w), syncing of the writer, and notification
382 : // to sync requests that they have completed.
383 : //
384 : // - flusher.ready is a condition variable that is signalled when there is
385 : // work to do. Full blocks are contained in flusher.pending. The current
386 : // partial block is in LogWriter.block. And sync operations are held in
387 : // flusher.syncQ.
388 : //
389 : // - The decision to sync is determined by whether there are any sync
390 : // requests present in flusher.syncQ and whether enough time has elapsed
391 : // since the last sync. If not enough time has elapsed since the last sync,
392 : // flusher.syncQ.blocked will be set to 1. If syncing is blocked,
393 : // syncQueue.empty() will return true and syncQueue.load() will return 0,0
394 : // (i.e. an empty list).
395 : //
396 : // - flusher.syncQ.blocked is cleared by a timer that is initialized when
397 : // blocked is set to 1. When blocked is 1, no syncing will take place, but
398 : // flushing will continue to be performed. The on/off toggle for syncing
399 : // does not need to be carefully synchronized with the rest of processing
400 : // -- all we need to ensure is that after any transition to blocked=1 there
401 : // is eventually a transition to blocked=0. syncTimer performs this
402 : // transition. Note that any change to min-sync-interval will not take
403 : // effect until the previous timer elapses.
404 : //
405 : // - Picking up the syncing work to perform requires coordination with
406 : // picking up the flushing work. Specifically, flushing work is queued
407 : // before syncing work. The guarantee of this code is that when a sync is
408 : // requested, any previously queued flush work will be synced. This
409 : // motivates reading the syncing work (f.syncQ.load()) before picking up
410 : // the flush work (w.block.written.Load()).
411 :
412 : // The list of full blocks that need to be written. This is copied from
413 : // f.pending on every loop iteration, though the number of elements is
414 : // usually small (most frequently 1). In the case of the WAL LogWriter, the
415 : // number of blocks is bounded by the size of the WAL's corresponding
416 : // memtable (MemtableSize/BlockSize). With the default 64 MiB memtables,
417 : // this works out to at most 2048 elements if the entirety of the memtable's
418 : // contents are queued.
419 1 : pending := make([]*block, 0, cap(f.pending))
420 1 : for {
421 1 : for {
422 1 : // Grab the portion of the current block that requires flushing. Note that
423 1 : // the current block can be added to the pending blocks list after we release
424 1 : // the flusher lock, but it won't be part of pending.
425 1 : written := w.block.written.Load()
426 1 : if len(f.pending) > 0 || written > w.block.flushed || !f.syncQ.empty() {
427 1 : break
428 : }
429 1 : if f.close {
430 1 : // If the writer is closed, pretend the sync timer fired immediately so
431 1 : // that we can process any queued sync requests.
432 1 : f.syncQ.clearBlocked()
433 1 : if !f.syncQ.empty() {
434 1 : break
435 : }
436 1 : return
437 : }
438 1 : f.ready.Wait()
439 1 : continue
440 : }
441 : // Found work to do, so no longer idle.
442 1 : workStartTime := time.Now()
443 1 : idleDuration := workStartTime.Sub(idleStartTime)
444 1 : pending = append(pending[:0], f.pending...)
445 1 : f.pending = f.pending[:0]
446 1 : f.metrics.PendingBufferLen.AddSample(int64(len(pending)))
447 1 :
448 1 : // Grab the list of sync waiters. Note that syncQueue.load() will return
449 1 : // 0,0 while we're waiting for the min-sync-interval to expire. This
450 1 : // allows flushing to proceed even if we're not ready to sync.
451 1 : head, tail, realSyncQLen := f.syncQ.load()
452 1 : f.metrics.SyncQueueLen.AddSample(int64(realSyncQLen))
453 1 :
454 1 : // Grab the portion of the current block that requires flushing. Note that
455 1 : // the current block can be added to the pending blocks list after we
456 1 : // release the flusher lock, but it won't be part of pending. This has to
457 1 : // be ordered after we get the list of sync waiters from syncQ in order to
458 1 : // prevent a race where a waiter adds itself to syncQ, but this thread
459 1 : // picks up the entry in syncQ and not the buffered data.
460 1 : written := w.block.written.Load()
461 1 : data := w.block.buf[w.block.flushed:written]
462 1 : w.block.flushed = written
463 1 :
464 1 : // If flusher has an error, we propagate it to waiters. Note in spite of
465 1 : // error we consume the pending list above to free blocks for writers.
466 1 : if f.err != nil {
467 1 : f.syncQ.pop(head, tail, f.err, w.queueSemChan)
468 1 : // Update the idleStartTime if work could not be done, so that we don't
469 1 : // include the duration we tried to do work as idle. We don't bother
470 1 : // with the rest of the accounting, which means we will undercount.
471 1 : idleStartTime = time.Now()
472 1 : continue
473 : }
474 1 : f.Unlock()
475 1 : synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, head, tail)
476 1 : f.Lock()
477 1 : if synced && f.fsyncLatency != nil {
478 1 : f.fsyncLatency.Observe(float64(syncLatency))
479 1 : }
480 1 : f.err = err
481 1 : if f.err != nil {
482 1 : f.syncQ.clearBlocked()
483 1 : // Update the idleStartTime if work could not be done, so that we don't
484 1 : // include the duration we tried to do work as idle. We don't bother
485 1 : // with the rest of the accounting, which means we will undercount.
486 1 : idleStartTime = time.Now()
487 1 : continue
488 : }
489 :
490 1 : if synced && f.minSyncInterval != nil {
491 1 : // A sync was performed. Make sure we've waited for the min sync
492 1 : // interval before syncing again.
493 1 : if min := f.minSyncInterval(); min > 0 {
494 1 : f.syncQ.setBlocked()
495 1 : if syncTimer == nil {
496 1 : syncTimer = w.afterFunc(min, func() {
497 1 : f.syncQ.clearBlocked()
498 1 : f.ready.Signal()
499 1 : })
500 1 : } else {
501 1 : syncTimer.Reset(min)
502 1 : }
503 : }
504 : }
505 : // Finished work, and started idling.
506 1 : idleStartTime = time.Now()
507 1 : workDuration := idleStartTime.Sub(workStartTime)
508 1 : f.metrics.WriteThroughput.Bytes += bytesWritten
509 1 : f.metrics.WriteThroughput.WorkDuration += workDuration
510 1 : f.metrics.WriteThroughput.IdleDuration += idleDuration
511 : }
512 : }
513 :
514 : func (w *LogWriter) flushPending(
515 : data []byte, pending []*block, head, tail uint32,
516 1 : ) (synced bool, syncLatency time.Duration, bytesWritten int64, err error) {
517 1 : defer func() {
518 1 : // Translate panics into errors. The errors will cause flushLoop to shut
519 1 : // down, but allows us to do so in a controlled way and avoid swallowing
520 1 : // the stack that created the panic if panic'ing itself hits a panic
521 1 : // (e.g. unlock of unlocked mutex).
522 1 : if r := recover(); r != nil {
523 0 : err = errors.Newf("%v", r)
524 0 : }
525 : }()
526 :
527 1 : for _, b := range pending {
528 1 : bytesWritten += blockSize - int64(b.flushed)
529 1 : if err = w.flushBlock(b); err != nil {
530 0 : break
531 : }
532 : }
533 1 : if n := len(data); err == nil && n > 0 {
534 1 : bytesWritten += int64(n)
535 1 : _, err = w.w.Write(data)
536 1 : }
537 :
538 1 : synced = head != tail
539 1 : if synced {
540 1 : if err == nil && w.s != nil {
541 1 : syncLatency, err = w.syncWithLatency()
542 1 : }
543 1 : f := &w.flusher
544 1 : if popErr := f.syncQ.pop(head, tail, err, w.queueSemChan); popErr != nil {
545 0 : return synced, syncLatency, bytesWritten, popErr
546 0 : }
547 : }
548 :
549 1 : return synced, syncLatency, bytesWritten, err
550 : }
551 :
552 1 : func (w *LogWriter) syncWithLatency() (time.Duration, error) {
553 1 : start := time.Now()
554 1 : err := w.s.Sync()
555 1 : syncLatency := time.Since(start)
556 1 : return syncLatency, err
557 1 : }
558 :
559 1 : func (w *LogWriter) flushBlock(b *block) error {
560 1 : if _, err := w.w.Write(b.buf[b.flushed:]); err != nil {
561 0 : return err
562 0 : }
563 1 : b.written.Store(0)
564 1 : b.flushed = 0
565 1 : w.free.Lock()
566 1 : w.free.blocks = append(w.free.blocks, b)
567 1 : w.free.Unlock()
568 1 : return nil
569 : }
570 :
571 : // queueBlock queues the current block for writing to the underlying writer,
572 : // allocates a new block and reserves space for the next header.
573 1 : func (w *LogWriter) queueBlock() {
574 1 : // Allocate a new block, blocking until one is available. We do this first
575 1 : // because w.block is protected by w.flusher.Mutex.
576 1 : w.free.Lock()
577 1 : if len(w.free.blocks) == 0 {
578 1 : w.free.blocks = append(w.free.blocks, blockPool.Get().(*block))
579 1 : }
580 1 : nextBlock := w.free.blocks[len(w.free.blocks)-1]
581 1 : w.free.blocks = w.free.blocks[:len(w.free.blocks)-1]
582 1 : w.free.Unlock()
583 1 :
584 1 : f := &w.flusher
585 1 : f.Lock()
586 1 : f.pending = append(f.pending, w.block)
587 1 : w.block = nextBlock
588 1 : f.ready.Signal()
589 1 : w.err = w.flusher.err
590 1 : f.Unlock()
591 1 :
592 1 : w.blockNum++
593 : }
594 :
595 : // Close flushes and syncs any unwritten data and closes the writer.
596 : // Where required, external synchronisation is provided by commitPipeline.mu.
597 1 : func (w *LogWriter) Close() error {
598 1 : f := &w.flusher
599 1 :
600 1 : // Emit an EOF trailer signifying the end of this log. This helps readers
601 1 : // differentiate between a corrupted entry in the middle of a log from
602 1 : // garbage at the tail from a recycled log file.
603 1 : w.emitEOFTrailer()
604 1 :
605 1 : // Signal the flush loop to close.
606 1 : f.Lock()
607 1 : f.close = true
608 1 : f.ready.Signal()
609 1 : f.Unlock()
610 1 :
611 1 : // Wait for the flush loop to close. The flush loop will not close until all
612 1 : // pending data has been written or an error occurs.
613 1 : <-f.closed
614 1 :
615 1 : // Sync any flushed data to disk. NB: flushLoop will sync after flushing the
616 1 : // last buffered data only if it was requested via syncQ, so we need to sync
617 1 : // here to ensure that all the data is synced.
618 1 : err := w.flusher.err
619 1 : var syncLatency time.Duration
620 1 : if err == nil && w.s != nil {
621 1 : syncLatency, err = w.syncWithLatency()
622 1 : }
623 1 : f.Lock()
624 1 : if f.fsyncLatency != nil {
625 1 : f.fsyncLatency.Observe(float64(syncLatency))
626 1 : }
627 1 : free := w.free.blocks
628 1 : f.Unlock()
629 1 :
630 1 : if w.c != nil {
631 1 : cerr := w.c.Close()
632 1 : w.c = nil
633 1 : if cerr != nil {
634 0 : return cerr
635 0 : }
636 : }
637 :
638 1 : for _, b := range free {
639 1 : b.flushed = 0
640 1 : b.written.Store(0)
641 1 : blockPool.Put(b)
642 1 : }
643 :
644 1 : w.err = errClosedWriter
645 1 : return err
646 : }
647 :
648 : // WriteRecord writes a complete record. Returns the offset just past the end
649 : // of the record.
650 : // External synchronisation provided by commitPipeline.mu.
651 1 : func (w *LogWriter) WriteRecord(p []byte) (int64, error) {
652 1 : logSize, err := w.SyncRecord(p, nil, nil)
653 1 : return logSize, err
654 1 : }
655 :
656 : // SyncRecord writes a complete record. If wg != nil the record will be
657 : // asynchronously persisted to the underlying writer and done will be called on
658 : // the wait group upon completion. Returns the offset just past the end of the
659 : // record.
660 : // External synchronisation provided by commitPipeline.mu.
661 : func (w *LogWriter) SyncRecord(
662 : p []byte, wg *sync.WaitGroup, err *error,
663 1 : ) (logSize int64, err2 error) {
664 1 : if w.err != nil {
665 0 : return -1, w.err
666 0 : }
667 :
668 : // The `i == 0` condition ensures we handle empty records. Such records can
669 : // possibly be generated for VersionEdits stored in the MANIFEST. While the
670 : // MANIFEST is currently written using Writer, it is good to support the same
671 : // semantics with LogWriter.
672 1 : for i := 0; i == 0 || len(p) > 0; i++ {
673 1 : p = w.emitFragment(i, p)
674 1 : }
675 :
676 1 : if wg != nil {
677 1 : // If we've been asked to persist the record, add the WaitGroup to the sync
678 1 : // queue and signal the flushLoop. Note that flushLoop will write partial
679 1 : // blocks to the file if syncing has been requested. The contract is that
680 1 : // any record written to the LogWriter to this point will be flushed to the
681 1 : // OS and synced to disk.
682 1 : f := &w.flusher
683 1 : f.syncQ.push(wg, err)
684 1 : f.ready.Signal()
685 1 : }
686 :
687 1 : offset := w.blockNum*blockSize + int64(w.block.written.Load())
688 1 : // Note that we don't return w.err here as a concurrent call to Close would
689 1 : // race with our read. That's ok because the only error we could be seeing is
690 1 : // one to syncing for which the caller can receive notification of by passing
691 1 : // in a non-nil err argument.
692 1 : return offset, nil
693 : }
694 :
695 : // Size returns the current size of the file.
696 : // External synchronisation provided by commitPipeline.mu.
697 1 : func (w *LogWriter) Size() int64 {
698 1 : return w.blockNum*blockSize + int64(w.block.written.Load())
699 1 : }
700 :
701 1 : func (w *LogWriter) emitEOFTrailer() {
702 1 : // Write a recyclable chunk header with a different log number. Readers
703 1 : // will treat the header as EOF when the log number does not match.
704 1 : b := w.block
705 1 : i := b.written.Load()
706 1 : binary.LittleEndian.PutUint32(b.buf[i+0:i+4], 0) // CRC
707 1 : binary.LittleEndian.PutUint16(b.buf[i+4:i+6], 0) // Size
708 1 : b.buf[i+6] = recyclableFullChunkType
709 1 : binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum+1) // Log number
710 1 : b.written.Store(i + int32(recyclableHeaderSize))
711 1 : }
712 :
713 1 : func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte) {
714 1 : b := w.block
715 1 : i := b.written.Load()
716 1 : first := n == 0
717 1 : last := blockSize-i-recyclableHeaderSize >= int32(len(p))
718 1 :
719 1 : if last {
720 1 : if first {
721 1 : b.buf[i+6] = recyclableFullChunkType
722 1 : } else {
723 1 : b.buf[i+6] = recyclableLastChunkType
724 1 : }
725 1 : } else {
726 1 : if first {
727 1 : b.buf[i+6] = recyclableFirstChunkType
728 1 : } else {
729 1 : b.buf[i+6] = recyclableMiddleChunkType
730 1 : }
731 : }
732 :
733 1 : binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum)
734 1 :
735 1 : r := copy(b.buf[i+recyclableHeaderSize:], p)
736 1 : j := i + int32(recyclableHeaderSize+r)
737 1 : binary.LittleEndian.PutUint32(b.buf[i+0:i+4], crc.New(b.buf[i+6:j]).Value())
738 1 : binary.LittleEndian.PutUint16(b.buf[i+4:i+6], uint16(r))
739 1 : b.written.Store(j)
740 1 :
741 1 : if blockSize-b.written.Load() < recyclableHeaderSize {
742 1 : // There is no room for another fragment in the block, so fill the
743 1 : // remaining bytes with zeros and queue the block for flushing.
744 1 : clear(b.buf[b.written.Load():])
745 1 : w.queueBlock()
746 1 : }
747 1 : return p[r:]
748 : }
749 :
750 : // Metrics must be called after Close. The callee will no longer modify the
751 : // returned LogWriterMetrics.
752 1 : func (w *LogWriter) Metrics() *LogWriterMetrics {
753 1 : return w.flusher.metrics
754 1 : }
755 :
756 : // LogWriterMetrics contains misc metrics for the log writer.
757 : type LogWriterMetrics struct {
758 : WriteThroughput base.ThroughputMetric
759 : PendingBufferLen base.GaugeSampleMetric
760 : SyncQueueLen base.GaugeSampleMetric
761 : }
762 :
763 : // Merge merges metrics from x. Requires that x is non-nil.
764 1 : func (m *LogWriterMetrics) Merge(x *LogWriterMetrics) error {
765 1 : m.WriteThroughput.Merge(x.WriteThroughput)
766 1 : m.PendingBufferLen.Merge(x.PendingBufferLen)
767 1 : m.SyncQueueLen.Merge(x.SyncQueueLen)
768 1 : return nil
769 1 : }
|