Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : // Package record reads and writes sequences of records. Each record is a stream
6 : // of bytes that completes before the next record starts.
7 : //
8 : // When reading, call Next to obtain an io.Reader for the next record. Next will
9 : // return io.EOF when there are no more records. It is valid to call Next
10 : // without reading the current record to exhaustion.
11 : //
12 : // When writing, call Next to obtain an io.Writer for the next record. Calling
13 : // Next finishes the current record. Call Close to finish the final record.
14 : //
15 : // Optionally, call Flush to finish the current record and flush the underlying
16 : // writer without starting a new record. To start a new record after flushing,
17 : // call Next.
18 : //
19 : // Neither Readers or Writers are safe to use concurrently.
20 : //
21 : // Example code:
22 : //
23 : // func read(r io.Reader) ([]string, error) {
24 : // var ss []string
25 : // records := record.NewReader(r)
26 : // for {
27 : // rec, err := records.Next()
28 : // if err == io.EOF {
29 : // break
30 : // }
31 : // if err != nil {
32 : // log.Printf("recovering from %v", err)
33 : // r.Recover()
34 : // continue
35 : // }
36 : // s, err := io.ReadAll(rec)
37 : // if err != nil {
38 : // log.Printf("recovering from %v", err)
39 : // r.Recover()
40 : // continue
41 : // }
42 : // ss = append(ss, string(s))
43 : // }
44 : // return ss, nil
45 : // }
46 : //
47 : // func write(w io.Writer, ss []string) error {
48 : // records := record.NewWriter(w)
49 : // for _, s := range ss {
50 : // rec, err := records.Next()
51 : // if err != nil {
52 : // return err
53 : // }
54 : // if _, err := rec.Write([]byte(s)), err != nil {
55 : // return err
56 : // }
57 : // }
58 : // return records.Close()
59 : // }
60 : //
61 : // The wire format is that the stream is divided into 32KiB blocks, and each
62 : // block contains a number of tightly packed chunks. Chunks cannot cross block
63 : // boundaries. The last block may be shorter than 32 KiB. Any unused bytes in a
64 : // block must be zero.
65 : //
66 : // A record maps to one or more chunks. There are two chunk formats: legacy and
67 : // recyclable. The legacy chunk format:
68 : //
69 : // +----------+-----------+-----------+--- ... ---+
70 : // | CRC (4B) | Size (2B) | Type (1B) | Payload |
71 : // +----------+-----------+-----------+--- ... ---+
72 : //
73 : // CRC is computed over the type and payload
74 : // Size is the length of the payload in bytes
75 : // Type is the chunk type
76 : //
77 : // There are four chunk types: whether the chunk is the full record, or the
78 : // first, middle or last chunk of a multi-chunk record. A multi-chunk record
79 : // has one first chunk, zero or more middle chunks, and one last chunk.
80 : //
81 : // The recyclyable chunk format is similar to the legacy format, but extends
82 : // the chunk header with an additional log number field. This allows reuse
83 : // (recycling) of log files which can provide significantly better performance
84 : // when syncing frequently as it avoids needing to update the file
85 : // metadata. Additionally, recycling log files is a prequisite for using direct
86 : // IO with log writing. The recyclyable format is:
87 : //
88 : // +----------+-----------+-----------+----------------+--- ... ---+
89 : // | CRC (4B) | Size (2B) | Type (1B) | Log number (4B)| Payload |
90 : // +----------+-----------+-----------+----------------+--- ... ---+
91 : //
92 : // Recyclable chunks are distinguished from legacy chunks by the addition of 4
93 : // extra "recyclable" chunk types that map directly to the legacy chunk types
94 : // (i.e. full, first, middle, last). The CRC is computed over the type, log
95 : // number, and payload.
96 : //
97 : // The wire format allows for limited recovery in the face of data corruption:
98 : // on a format error (such as a checksum mismatch), the reader moves to the
99 : // next block and looks for the next full or first chunk.
100 : package record
101 :
102 : // The C++ Level-DB code calls this the log, but it has been renamed to record
103 : // to avoid clashing with the standard log package, and because it is generally
104 : // useful outside of logging. The C++ code also uses the term "physical record"
105 : // instead of "chunk", but "chunk" is shorter and less confusing.
106 :
107 : import (
108 : "encoding/binary"
109 : "io"
110 :
111 : "github.com/cockroachdb/errors"
112 : "github.com/cockroachdb/pebble/internal/base"
113 : "github.com/cockroachdb/pebble/internal/crc"
114 : )
115 :
116 : // These constants are part of the wire format and should not be changed.
117 : const (
118 : fullChunkType = 1
119 : firstChunkType = 2
120 : middleChunkType = 3
121 : lastChunkType = 4
122 :
123 : recyclableFullChunkType = 5
124 : recyclableFirstChunkType = 6
125 : recyclableMiddleChunkType = 7
126 : recyclableLastChunkType = 8
127 : )
128 :
129 : const (
130 : blockSize = 32 * 1024
131 : blockSizeMask = blockSize - 1
132 : legacyHeaderSize = 7
133 : recyclableHeaderSize = legacyHeaderSize + 4
134 : )
135 :
136 : var (
137 : // ErrNotAnIOSeeker is returned if the io.Reader underlying a Reader does not implement io.Seeker.
138 : ErrNotAnIOSeeker = errors.New("pebble/record: reader does not implement io.Seeker")
139 :
140 : // ErrNoLastRecord is returned if LastRecordOffset is called and there is no previous record.
141 : ErrNoLastRecord = errors.New("pebble/record: no last record exists")
142 :
143 : // ErrZeroedChunk is returned if a chunk is encountered that is zeroed. This
144 : // usually occurs due to log file preallocation.
145 : ErrZeroedChunk = base.CorruptionErrorf("pebble/record: zeroed chunk")
146 :
147 : // ErrInvalidChunk is returned if a chunk is encountered with an invalid
148 : // header, length, or checksum. This usually occurs when a log is recycled,
149 : // but can also occur due to corruption.
150 : ErrInvalidChunk = base.CorruptionErrorf("pebble/record: invalid chunk")
151 : )
152 :
153 : // IsInvalidRecord returns true if the error matches one of the error types
154 : // returned for invalid records. These are treated in a way similar to io.EOF
155 : // in recovery code.
156 1 : func IsInvalidRecord(err error) bool {
157 1 : return err == ErrZeroedChunk || err == ErrInvalidChunk || err == io.ErrUnexpectedEOF
158 1 : }
159 :
160 : // Reader reads records from an underlying io.Reader.
161 : type Reader struct {
162 : // r is the underlying reader.
163 : r io.Reader
164 : // logNum is the low 32-bits of the log's file number. May be zero when used
165 : // with log files that do not have a file number (e.g. the MANIFEST).
166 : logNum uint32
167 : // blockNum is the zero based block number currently held in buf.
168 : blockNum int64
169 : // seq is the sequence number of the current record.
170 : seq int
171 : // buf[begin:end] is the unread portion of the current chunk's payload. The
172 : // low bound, begin, excludes the chunk header.
173 : begin, end int
174 : // n is the number of bytes of buf that are valid. Once reading has started,
175 : // only the final block can have n < blockSize.
176 : n int
177 : // recovering is true when recovering from corruption.
178 : recovering bool
179 : // last is whether the current chunk is the last chunk of the record.
180 : last bool
181 : // err is any accumulated error.
182 : err error
183 : // buf is the buffer.
184 : buf [blockSize]byte
185 : }
186 :
187 : // NewReader returns a new reader. If the file contains records encoded using
188 : // the recyclable record format, then the log number in those records must
189 : // match the specified logNum.
190 1 : func NewReader(r io.Reader, logNum base.DiskFileNum) *Reader {
191 1 : return &Reader{
192 1 : r: r,
193 1 : logNum: uint32(logNum),
194 1 : blockNum: -1,
195 1 : }
196 1 : }
197 :
198 : // nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the
199 : // next block into the buffer if necessary.
200 1 : func (r *Reader) nextChunk(wantFirst bool) error {
201 1 : for {
202 1 : if r.end+legacyHeaderSize <= r.n {
203 1 : checksum := binary.LittleEndian.Uint32(r.buf[r.end+0 : r.end+4])
204 1 : length := binary.LittleEndian.Uint16(r.buf[r.end+4 : r.end+6])
205 1 : chunkType := r.buf[r.end+6]
206 1 :
207 1 : if checksum == 0 && length == 0 && chunkType == 0 {
208 1 : if r.end+recyclableHeaderSize > r.n {
209 1 : // Skip the rest of the block if the recyclable header size does not
210 1 : // fit within it.
211 1 : r.end = r.n
212 1 : continue
213 : }
214 0 : if r.recovering {
215 0 : // Skip the rest of the block, if it looks like it is all
216 0 : // zeroes. This is common with WAL preallocation.
217 0 : //
218 0 : // Set r.err to be an error so r.recover actually recovers.
219 0 : r.err = ErrZeroedChunk
220 0 : r.recover()
221 0 : continue
222 : }
223 0 : return ErrZeroedChunk
224 : }
225 :
226 1 : headerSize := legacyHeaderSize
227 1 : if chunkType >= recyclableFullChunkType && chunkType <= recyclableLastChunkType {
228 1 : headerSize = recyclableHeaderSize
229 1 : if r.end+headerSize > r.n {
230 0 : return ErrInvalidChunk
231 0 : }
232 :
233 1 : logNum := binary.LittleEndian.Uint32(r.buf[r.end+7 : r.end+11])
234 1 : if logNum != r.logNum {
235 1 : if wantFirst {
236 1 : // If we're looking for the first chunk of a record, we can treat a
237 1 : // previous instance of the log as EOF.
238 1 : return io.EOF
239 1 : }
240 : // Otherwise, treat this chunk as invalid in order to prevent reading
241 : // of a partial record.
242 1 : return ErrInvalidChunk
243 : }
244 :
245 1 : chunkType -= (recyclableFullChunkType - 1)
246 : }
247 :
248 1 : r.begin = r.end + headerSize
249 1 : r.end = r.begin + int(length)
250 1 : if r.end > r.n {
251 1 : // The chunk straddles a 32KB boundary (or the end of file).
252 1 : if r.recovering {
253 0 : r.recover()
254 0 : continue
255 : }
256 1 : return ErrInvalidChunk
257 : }
258 1 : if checksum != crc.New(r.buf[r.begin-headerSize+6:r.end]).Value() {
259 1 : if r.recovering {
260 1 : r.recover()
261 1 : continue
262 : }
263 1 : return ErrInvalidChunk
264 : }
265 1 : if wantFirst {
266 1 : if chunkType != fullChunkType && chunkType != firstChunkType {
267 1 : continue
268 : }
269 : }
270 1 : r.last = chunkType == fullChunkType || chunkType == lastChunkType
271 1 : r.recovering = false
272 1 : return nil
273 : }
274 1 : if r.n < blockSize && r.blockNum >= 0 {
275 1 : if !wantFirst || r.end != r.n {
276 1 : // This can happen if the previous instance of the log ended with a
277 1 : // partial block at the same blockNum as the new log but extended
278 1 : // beyond the partial block of the new log.
279 1 : return ErrInvalidChunk
280 1 : }
281 1 : return io.EOF
282 : }
283 1 : n, err := io.ReadFull(r.r, r.buf[:])
284 1 : if err != nil && err != io.ErrUnexpectedEOF {
285 1 : if err == io.EOF && !wantFirst {
286 1 : return io.ErrUnexpectedEOF
287 1 : }
288 1 : return err
289 : }
290 1 : r.begin, r.end, r.n = 0, 0, n
291 1 : r.blockNum++
292 : }
293 : }
294 :
295 : // Next returns a reader for the next record. It returns io.EOF if there are no
296 : // more records. The reader returned becomes stale after the next Next call,
297 : // and should no longer be used.
298 1 : func (r *Reader) Next() (io.Reader, error) {
299 1 : r.seq++
300 1 : if r.err != nil {
301 1 : return nil, r.err
302 1 : }
303 1 : r.begin = r.end
304 1 : r.err = r.nextChunk(true)
305 1 : if r.err != nil {
306 1 : return nil, r.err
307 1 : }
308 1 : return singleReader{r, r.seq}, nil
309 : }
310 :
311 : // Offset returns the current offset within the file. If called immediately
312 : // before a call to Next(), Offset() will return the record offset.
313 1 : func (r *Reader) Offset() int64 {
314 1 : if r.blockNum < 0 {
315 1 : return 0
316 1 : }
317 1 : return int64(r.blockNum)*blockSize + int64(r.end)
318 : }
319 :
320 : // recover clears any errors read so far, so that calling Next will start
321 : // reading from the next good 32KiB block. If there are no such blocks, Next
322 : // will return io.EOF. recover also marks the current reader, the one most
323 : // recently returned by Next, as stale. If recover is called without any
324 : // prior error, then recover is a no-op.
325 1 : func (r *Reader) recover() {
326 1 : if r.err == nil {
327 1 : return
328 1 : }
329 1 : r.recovering = true
330 1 : r.err = nil
331 1 : // Discard the rest of the current block.
332 1 : r.begin, r.end, r.last = r.n, r.n, false
333 1 : // Invalidate any outstanding singleReader.
334 1 : r.seq++
335 : }
336 :
337 : // seekRecord seeks in the underlying io.Reader such that calling r.Next
338 : // returns the record whose first chunk header starts at the provided offset.
339 : // Its behavior is undefined if the argument given is not such an offset, as
340 : // the bytes at that offset may coincidentally appear to be a valid header.
341 : //
342 : // It returns ErrNotAnIOSeeker if the underlying io.Reader does not implement
343 : // io.Seeker.
344 : //
345 : // seekRecord will fail and return an error if the Reader previously
346 : // encountered an error, including io.EOF. Such errors can be cleared by
347 : // calling Recover. Calling seekRecord after Recover will make calling Next
348 : // return the record at the given offset, instead of the record at the next
349 : // good 32KiB block as Recover normally would. Calling seekRecord before
350 : // Recover has no effect on Recover's semantics other than changing the
351 : // starting point for determining the next good 32KiB block.
352 : //
353 : // The offset is always relative to the start of the underlying io.Reader, so
354 : // negative values will result in an error as per io.Seeker.
355 1 : func (r *Reader) seekRecord(offset int64) error {
356 1 : r.seq++
357 1 : if r.err != nil {
358 0 : return r.err
359 0 : }
360 :
361 1 : s, ok := r.r.(io.Seeker)
362 1 : if !ok {
363 0 : return ErrNotAnIOSeeker
364 0 : }
365 :
366 : // Only seek to an exact block offset.
367 1 : c := int(offset & blockSizeMask)
368 1 : if _, r.err = s.Seek(offset&^blockSizeMask, io.SeekStart); r.err != nil {
369 0 : return r.err
370 0 : }
371 :
372 : // Clear the state of the internal reader.
373 1 : r.begin, r.end, r.n = 0, 0, 0
374 1 : r.blockNum, r.recovering, r.last = -1, false, false
375 1 : if r.err = r.nextChunk(false); r.err != nil {
376 1 : return r.err
377 1 : }
378 :
379 : // Now skip to the offset requested within the block. A subsequent
380 : // call to Next will return the block at the requested offset.
381 1 : r.begin, r.end = c, c
382 1 :
383 1 : return nil
384 : }
385 :
386 : type singleReader struct {
387 : r *Reader
388 : seq int
389 : }
390 :
391 1 : func (x singleReader) Read(p []byte) (int, error) {
392 1 : r := x.r
393 1 : if r.seq != x.seq {
394 1 : return 0, errors.New("pebble/record: stale reader")
395 1 : }
396 1 : if r.err != nil {
397 0 : return 0, r.err
398 0 : }
399 1 : for r.begin == r.end {
400 1 : if r.last {
401 1 : return 0, io.EOF
402 1 : }
403 1 : if r.err = r.nextChunk(false); r.err != nil {
404 1 : return 0, r.err
405 1 : }
406 : }
407 1 : n := copy(p, r.buf[r.begin:r.end])
408 1 : r.begin += n
409 1 : return n, nil
410 : }
411 :
412 : // Writer writes records to an underlying io.Writer.
413 : type Writer struct {
414 : // w is the underlying writer.
415 : w io.Writer
416 : // seq is the sequence number of the current record.
417 : seq int
418 : // f is w as a flusher.
419 : f flusher
420 : // buf[i:j] is the bytes that will become the current chunk.
421 : // The low bound, i, includes the chunk header.
422 : i, j int
423 : // buf[:written] has already been written to w.
424 : // written is zero unless Flush has been called.
425 : written int
426 : // baseOffset is the base offset in w at which writing started. If
427 : // w implements io.Seeker, it's relative to the start of w, 0 otherwise.
428 : baseOffset int64
429 : // blockNumber is the zero based block number currently held in buf.
430 : blockNumber int64
431 : // lastRecordOffset is the offset in w where the last record was
432 : // written (including the chunk header). It is a relative offset to
433 : // baseOffset, thus the absolute offset of the last record is
434 : // baseOffset + lastRecordOffset.
435 : lastRecordOffset int64
436 : // first is whether the current chunk is the first chunk of the record.
437 : first bool
438 : // pending is whether a chunk is buffered but not yet written.
439 : pending bool
440 : // err is any accumulated error.
441 : err error
442 : // buf is the buffer.
443 : buf [blockSize]byte
444 : }
445 :
446 : // NewWriter returns a new Writer.
447 1 : func NewWriter(w io.Writer) *Writer {
448 1 : f, _ := w.(flusher)
449 1 :
450 1 : var o int64
451 1 : if s, ok := w.(io.Seeker); ok {
452 1 : var err error
453 1 : if o, err = s.Seek(0, io.SeekCurrent); err != nil {
454 0 : o = 0
455 0 : }
456 : }
457 1 : return &Writer{
458 1 : w: w,
459 1 : f: f,
460 1 : baseOffset: o,
461 1 : lastRecordOffset: -1,
462 1 : }
463 : }
464 :
465 : // fillHeader fills in the header for the pending chunk.
466 1 : func (w *Writer) fillHeader(last bool) {
467 1 : if w.i+legacyHeaderSize > w.j || w.j > blockSize {
468 0 : panic("pebble/record: bad writer state")
469 : }
470 1 : if last {
471 1 : if w.first {
472 1 : w.buf[w.i+6] = fullChunkType
473 1 : } else {
474 1 : w.buf[w.i+6] = lastChunkType
475 1 : }
476 1 : } else {
477 1 : if w.first {
478 1 : w.buf[w.i+6] = firstChunkType
479 1 : } else {
480 1 : w.buf[w.i+6] = middleChunkType
481 1 : }
482 : }
483 1 : binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], crc.New(w.buf[w.i+6:w.j]).Value())
484 1 : binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-legacyHeaderSize))
485 : }
486 :
487 : // writeBlock writes the buffered block to the underlying writer, and reserves
488 : // space for the next chunk's header.
489 1 : func (w *Writer) writeBlock() {
490 1 : _, w.err = w.w.Write(w.buf[w.written:])
491 1 : w.i = 0
492 1 : w.j = legacyHeaderSize
493 1 : w.written = 0
494 1 : w.blockNumber++
495 1 : }
496 :
497 : // writePending finishes the current record and writes the buffer to the
498 : // underlying writer.
499 1 : func (w *Writer) writePending() {
500 1 : if w.err != nil {
501 0 : return
502 0 : }
503 1 : if w.pending {
504 1 : w.fillHeader(true)
505 1 : w.pending = false
506 1 : }
507 1 : _, w.err = w.w.Write(w.buf[w.written:w.j])
508 1 : w.written = w.j
509 : }
510 :
511 : // Close finishes the current record and closes the writer.
512 1 : func (w *Writer) Close() error {
513 1 : w.seq++
514 1 : w.writePending()
515 1 : if w.err != nil {
516 0 : return w.err
517 0 : }
518 1 : w.err = errors.New("pebble/record: closed Writer")
519 1 : return nil
520 : }
521 :
522 : // Flush finishes the current record, writes to the underlying writer, and
523 : // flushes it if that writer implements interface{ Flush() error }.
524 1 : func (w *Writer) Flush() error {
525 1 : w.seq++
526 1 : w.writePending()
527 1 : if w.err != nil {
528 1 : return w.err
529 1 : }
530 1 : if w.f != nil {
531 1 : w.err = w.f.Flush()
532 1 : return w.err
533 1 : }
534 1 : return nil
535 : }
536 :
537 : // Next returns a writer for the next record. The writer returned becomes stale
538 : // after the next Close, Flush or Next call, and should no longer be used.
539 1 : func (w *Writer) Next() (io.Writer, error) {
540 1 : w.seq++
541 1 : if w.err != nil {
542 0 : return nil, w.err
543 0 : }
544 1 : if w.pending {
545 1 : w.fillHeader(true)
546 1 : }
547 1 : w.i = w.j
548 1 : w.j = w.j + legacyHeaderSize
549 1 : // Check if there is room in the block for the header.
550 1 : if w.j > blockSize {
551 1 : // Fill in the rest of the block with zeroes.
552 1 : clear(w.buf[w.i:])
553 1 : w.writeBlock()
554 1 : if w.err != nil {
555 0 : return nil, w.err
556 0 : }
557 : }
558 1 : w.lastRecordOffset = w.baseOffset + w.blockNumber*blockSize + int64(w.i)
559 1 : w.first = true
560 1 : w.pending = true
561 1 : return singleWriter{w, w.seq}, nil
562 : }
563 :
564 : // WriteRecord writes a complete record. Returns the offset just past the end
565 : // of the record.
566 1 : func (w *Writer) WriteRecord(p []byte) (int64, error) {
567 1 : if w.err != nil {
568 0 : return -1, w.err
569 0 : }
570 1 : t, err := w.Next()
571 1 : if err != nil {
572 0 : return -1, err
573 0 : }
574 1 : if _, err := t.Write(p); err != nil {
575 0 : return -1, err
576 0 : }
577 1 : w.writePending()
578 1 : offset := w.blockNumber*blockSize + int64(w.j)
579 1 : return offset, w.err
580 : }
581 :
582 : // Size returns the current size of the file.
583 1 : func (w *Writer) Size() int64 {
584 1 : if w == nil {
585 1 : return 0
586 1 : }
587 1 : return w.blockNumber*blockSize + int64(w.j)
588 : }
589 :
590 : // LastRecordOffset returns the offset in the underlying io.Writer of the last
591 : // record so far - the one created by the most recent Next call. It is the
592 : // offset of the first chunk header, suitable to pass to Reader.SeekRecord.
593 : //
594 : // If that io.Writer also implements io.Seeker, the return value is an absolute
595 : // offset, in the sense of io.SeekStart, regardless of whether the io.Writer
596 : // was initially at the zero position when passed to NewWriter. Otherwise, the
597 : // return value is a relative offset, being the number of bytes written between
598 : // the NewWriter call and any records written prior to the last record.
599 : //
600 : // If there is no last record, i.e. nothing was written, LastRecordOffset will
601 : // return ErrNoLastRecord.
602 1 : func (w *Writer) LastRecordOffset() (int64, error) {
603 1 : if w.err != nil {
604 0 : return 0, w.err
605 0 : }
606 1 : if w.lastRecordOffset < 0 {
607 1 : return 0, ErrNoLastRecord
608 1 : }
609 1 : return w.lastRecordOffset, nil
610 : }
611 :
612 : type singleWriter struct {
613 : w *Writer
614 : seq int
615 : }
616 :
617 1 : func (x singleWriter) Write(p []byte) (int, error) {
618 1 : w := x.w
619 1 : if w.seq != x.seq {
620 0 : return 0, errors.New("pebble/record: stale writer")
621 0 : }
622 1 : if w.err != nil {
623 0 : return 0, w.err
624 0 : }
625 1 : n0 := len(p)
626 1 : for len(p) > 0 {
627 1 : // Write a block, if it is full.
628 1 : if w.j == blockSize {
629 1 : w.fillHeader(false)
630 1 : w.writeBlock()
631 1 : if w.err != nil {
632 0 : return 0, w.err
633 0 : }
634 1 : w.first = false
635 : }
636 : // Copy bytes into the buffer.
637 1 : n := copy(w.buf[w.j:], p)
638 1 : w.j += n
639 1 : p = p[n:]
640 : }
641 1 : return n0, nil
642 : }
|