LCOV - code coverage report
Current view: top level - pebble/record - record.go (source / functions) Hit Total Coverage
Test: 2023-11-28 08:16Z a9595501 - tests only.lcov Lines: 240 286 83.9 %
Date: 2023-11-28 08:16:49 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : // Package record reads and writes sequences of records. Each record is a stream
       6             : // of bytes that completes before the next record starts.
       7             : //
       8             : // When reading, call Next to obtain an io.Reader for the next record. Next will
       9             : // return io.EOF when there are no more records. It is valid to call Next
      10             : // without reading the current record to exhaustion.
      11             : //
      12             : // When writing, call Next to obtain an io.Writer for the next record. Calling
      13             : // Next finishes the current record. Call Close to finish the final record.
      14             : //
      15             : // Optionally, call Flush to finish the current record and flush the underlying
      16             : // writer without starting a new record. To start a new record after flushing,
      17             : // call Next.
      18             : //
      19             : // Neither Readers or Writers are safe to use concurrently.
      20             : //
      21             : // Example code:
      22             : //
      23             : //      func read(r io.Reader) ([]string, error) {
      24             : //              var ss []string
      25             : //              records := record.NewReader(r)
      26             : //              for {
      27             : //                      rec, err := records.Next()
      28             : //                      if err == io.EOF {
      29             : //                              break
      30             : //                      }
      31             : //                      if err != nil {
      32             : //                              log.Printf("recovering from %v", err)
      33             : //                              r.Recover()
      34             : //                              continue
      35             : //                      }
      36             : //                      s, err := io.ReadAll(rec)
      37             : //                      if err != nil {
      38             : //                              log.Printf("recovering from %v", err)
      39             : //                              r.Recover()
      40             : //                              continue
      41             : //                      }
      42             : //                      ss = append(ss, string(s))
      43             : //              }
      44             : //              return ss, nil
      45             : //      }
      46             : //
      47             : //      func write(w io.Writer, ss []string) error {
      48             : //              records := record.NewWriter(w)
      49             : //              for _, s := range ss {
      50             : //                      rec, err := records.Next()
      51             : //                      if err != nil {
      52             : //                              return err
      53             : //                      }
      54             : //                      if _, err := rec.Write([]byte(s)), err != nil {
      55             : //                              return err
      56             : //                      }
      57             : //              }
      58             : //              return records.Close()
      59             : //      }
      60             : //
      61             : // The wire format is that the stream is divided into 32KiB blocks, and each
      62             : // block contains a number of tightly packed chunks. Chunks cannot cross block
      63             : // boundaries. The last block may be shorter than 32 KiB. Any unused bytes in a
      64             : // block must be zero.
      65             : //
      66             : // A record maps to one or more chunks. There are two chunk formats: legacy and
      67             : // recyclable. The legacy chunk format:
      68             : //
      69             : //      +----------+-----------+-----------+--- ... ---+
      70             : //      | CRC (4B) | Size (2B) | Type (1B) | Payload   |
      71             : //      +----------+-----------+-----------+--- ... ---+
      72             : //
      73             : // CRC is computed over the type and payload
      74             : // Size is the length of the payload in bytes
      75             : // Type is the chunk type
      76             : //
      77             : // There are four chunk types: whether the chunk is the full record, or the
      78             : // first, middle or last chunk of a multi-chunk record. A multi-chunk record
      79             : // has one first chunk, zero or more middle chunks, and one last chunk.
      80             : //
      81             : // The recyclyable chunk format is similar to the legacy format, but extends
      82             : // the chunk header with an additional log number field. This allows reuse
      83             : // (recycling) of log files which can provide significantly better performance
      84             : // when syncing frequently as it avoids needing to update the file
      85             : // metadata. Additionally, recycling log files is a prequisite for using direct
      86             : // IO with log writing. The recyclyable format is:
      87             : //
      88             : //      +----------+-----------+-----------+----------------+--- ... ---+
      89             : //      | CRC (4B) | Size (2B) | Type (1B) | Log number (4B)| Payload   |
      90             : //      +----------+-----------+-----------+----------------+--- ... ---+
      91             : //
      92             : // Recyclable chunks are distinguished from legacy chunks by the addition of 4
      93             : // extra "recyclable" chunk types that map directly to the legacy chunk types
      94             : // (i.e. full, first, middle, last). The CRC is computed over the type, log
      95             : // number, and payload.
      96             : //
      97             : // The wire format allows for limited recovery in the face of data corruption:
      98             : // on a format error (such as a checksum mismatch), the reader moves to the
      99             : // next block and looks for the next full or first chunk.
     100             : package record
     101             : 
     102             : // The C++ Level-DB code calls this the log, but it has been renamed to record
     103             : // to avoid clashing with the standard log package, and because it is generally
     104             : // useful outside of logging. The C++ code also uses the term "physical record"
     105             : // instead of "chunk", but "chunk" is shorter and less confusing.
     106             : 
     107             : import (
     108             :         "encoding/binary"
     109             :         "io"
     110             : 
     111             :         "github.com/cockroachdb/errors"
     112             :         "github.com/cockroachdb/pebble/internal/base"
     113             :         "github.com/cockroachdb/pebble/internal/crc"
     114             : )
     115             : 
     116             : // These constants are part of the wire format and should not be changed.
     117             : const (
     118             :         fullChunkType   = 1
     119             :         firstChunkType  = 2
     120             :         middleChunkType = 3
     121             :         lastChunkType   = 4
     122             : 
     123             :         recyclableFullChunkType   = 5
     124             :         recyclableFirstChunkType  = 6
     125             :         recyclableMiddleChunkType = 7
     126             :         recyclableLastChunkType   = 8
     127             : )
     128             : 
     129             : const (
     130             :         blockSize            = 32 * 1024
     131             :         blockSizeMask        = blockSize - 1
     132             :         legacyHeaderSize     = 7
     133             :         recyclableHeaderSize = legacyHeaderSize + 4
     134             : )
     135             : 
     136             : var (
     137             :         // ErrNotAnIOSeeker is returned if the io.Reader underlying a Reader does not implement io.Seeker.
     138             :         ErrNotAnIOSeeker = errors.New("pebble/record: reader does not implement io.Seeker")
     139             : 
     140             :         // ErrNoLastRecord is returned if LastRecordOffset is called and there is no previous record.
     141             :         ErrNoLastRecord = errors.New("pebble/record: no last record exists")
     142             : 
     143             :         // ErrZeroedChunk is returned if a chunk is encountered that is zeroed. This
     144             :         // usually occurs due to log file preallocation.
     145             :         ErrZeroedChunk = base.CorruptionErrorf("pebble/record: zeroed chunk")
     146             : 
     147             :         // ErrInvalidChunk is returned if a chunk is encountered with an invalid
     148             :         // header, length, or checksum. This usually occurs when a log is recycled,
     149             :         // but can also occur due to corruption.
     150             :         ErrInvalidChunk = base.CorruptionErrorf("pebble/record: invalid chunk")
     151             : )
     152             : 
     153             : // IsInvalidRecord returns true if the error matches one of the error types
     154             : // returned for invalid records. These are treated in a way similar to io.EOF
     155             : // in recovery code.
     156           1 : func IsInvalidRecord(err error) bool {
     157           1 :         return err == ErrZeroedChunk || err == ErrInvalidChunk || err == io.ErrUnexpectedEOF
     158           1 : }
     159             : 
     160             : // Reader reads records from an underlying io.Reader.
     161             : type Reader struct {
     162             :         // r is the underlying reader.
     163             :         r io.Reader
     164             :         // logNum is the low 32-bits of the log's file number. May be zero when used
     165             :         // with log files that do not have a file number (e.g. the MANIFEST).
     166             :         logNum uint32
     167             :         // blockNum is the zero based block number currently held in buf.
     168             :         blockNum int64
     169             :         // seq is the sequence number of the current record.
     170             :         seq int
     171             :         // buf[begin:end] is the unread portion of the current chunk's payload. The
     172             :         // low bound, begin, excludes the chunk header.
     173             :         begin, end int
     174             :         // n is the number of bytes of buf that are valid. Once reading has started,
     175             :         // only the final block can have n < blockSize.
     176             :         n int
     177             :         // recovering is true when recovering from corruption.
     178             :         recovering bool
     179             :         // last is whether the current chunk is the last chunk of the record.
     180             :         last bool
     181             :         // err is any accumulated error.
     182             :         err error
     183             :         // buf is the buffer.
     184             :         buf [blockSize]byte
     185             : }
     186             : 
     187             : // NewReader returns a new reader. If the file contains records encoded using
     188             : // the recyclable record format, then the log number in those records must
     189             : // match the specified logNum.
     190           1 : func NewReader(r io.Reader, logNum base.DiskFileNum) *Reader {
     191           1 :         return &Reader{
     192           1 :                 r:        r,
     193           1 :                 logNum:   uint32(logNum),
     194           1 :                 blockNum: -1,
     195           1 :         }
     196           1 : }
     197             : 
     198             : // nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the
     199             : // next block into the buffer if necessary.
     200           1 : func (r *Reader) nextChunk(wantFirst bool) error {
     201           1 :         for {
     202           1 :                 if r.end+legacyHeaderSize <= r.n {
     203           1 :                         checksum := binary.LittleEndian.Uint32(r.buf[r.end+0 : r.end+4])
     204           1 :                         length := binary.LittleEndian.Uint16(r.buf[r.end+4 : r.end+6])
     205           1 :                         chunkType := r.buf[r.end+6]
     206           1 : 
     207           1 :                         if checksum == 0 && length == 0 && chunkType == 0 {
     208           1 :                                 if r.end+recyclableHeaderSize > r.n {
     209           1 :                                         // Skip the rest of the block if the recyclable header size does not
     210           1 :                                         // fit within it.
     211           1 :                                         r.end = r.n
     212           1 :                                         continue
     213             :                                 }
     214           0 :                                 if r.recovering {
     215           0 :                                         // Skip the rest of the block, if it looks like it is all
     216           0 :                                         // zeroes. This is common with WAL preallocation.
     217           0 :                                         //
     218           0 :                                         // Set r.err to be an error so r.recover actually recovers.
     219           0 :                                         r.err = ErrZeroedChunk
     220           0 :                                         r.recover()
     221           0 :                                         continue
     222             :                                 }
     223           0 :                                 return ErrZeroedChunk
     224             :                         }
     225             : 
     226           1 :                         headerSize := legacyHeaderSize
     227           1 :                         if chunkType >= recyclableFullChunkType && chunkType <= recyclableLastChunkType {
     228           1 :                                 headerSize = recyclableHeaderSize
     229           1 :                                 if r.end+headerSize > r.n {
     230           0 :                                         return ErrInvalidChunk
     231           0 :                                 }
     232             : 
     233           1 :                                 logNum := binary.LittleEndian.Uint32(r.buf[r.end+7 : r.end+11])
     234           1 :                                 if logNum != r.logNum {
     235           1 :                                         if wantFirst {
     236           1 :                                                 // If we're looking for the first chunk of a record, we can treat a
     237           1 :                                                 // previous instance of the log as EOF.
     238           1 :                                                 return io.EOF
     239           1 :                                         }
     240             :                                         // Otherwise, treat this chunk as invalid in order to prevent reading
     241             :                                         // of a partial record.
     242           1 :                                         return ErrInvalidChunk
     243             :                                 }
     244             : 
     245           1 :                                 chunkType -= (recyclableFullChunkType - 1)
     246             :                         }
     247             : 
     248           1 :                         r.begin = r.end + headerSize
     249           1 :                         r.end = r.begin + int(length)
     250           1 :                         if r.end > r.n {
     251           1 :                                 // The chunk straddles a 32KB boundary (or the end of file).
     252           1 :                                 if r.recovering {
     253           0 :                                         r.recover()
     254           0 :                                         continue
     255             :                                 }
     256           1 :                                 return ErrInvalidChunk
     257             :                         }
     258           1 :                         if checksum != crc.New(r.buf[r.begin-headerSize+6:r.end]).Value() {
     259           1 :                                 if r.recovering {
     260           1 :                                         r.recover()
     261           1 :                                         continue
     262             :                                 }
     263           1 :                                 return ErrInvalidChunk
     264             :                         }
     265           1 :                         if wantFirst {
     266           1 :                                 if chunkType != fullChunkType && chunkType != firstChunkType {
     267           1 :                                         continue
     268             :                                 }
     269             :                         }
     270           1 :                         r.last = chunkType == fullChunkType || chunkType == lastChunkType
     271           1 :                         r.recovering = false
     272           1 :                         return nil
     273             :                 }
     274           1 :                 if r.n < blockSize && r.blockNum >= 0 {
     275           1 :                         if !wantFirst || r.end != r.n {
     276           1 :                                 // This can happen if the previous instance of the log ended with a
     277           1 :                                 // partial block at the same blockNum as the new log but extended
     278           1 :                                 // beyond the partial block of the new log.
     279           1 :                                 return ErrInvalidChunk
     280           1 :                         }
     281           1 :                         return io.EOF
     282             :                 }
     283           1 :                 n, err := io.ReadFull(r.r, r.buf[:])
     284           1 :                 if err != nil && err != io.ErrUnexpectedEOF {
     285           1 :                         if err == io.EOF && !wantFirst {
     286           1 :                                 return io.ErrUnexpectedEOF
     287           1 :                         }
     288           1 :                         return err
     289             :                 }
     290           1 :                 r.begin, r.end, r.n = 0, 0, n
     291           1 :                 r.blockNum++
     292             :         }
     293             : }
     294             : 
     295             : // Next returns a reader for the next record. It returns io.EOF if there are no
     296             : // more records. The reader returned becomes stale after the next Next call,
     297             : // and should no longer be used.
     298           1 : func (r *Reader) Next() (io.Reader, error) {
     299           1 :         r.seq++
     300           1 :         if r.err != nil {
     301           1 :                 return nil, r.err
     302           1 :         }
     303           1 :         r.begin = r.end
     304           1 :         r.err = r.nextChunk(true)
     305           1 :         if r.err != nil {
     306           1 :                 return nil, r.err
     307           1 :         }
     308           1 :         return singleReader{r, r.seq}, nil
     309             : }
     310             : 
     311             : // Offset returns the current offset within the file. If called immediately
     312             : // before a call to Next(), Offset() will return the record offset.
     313           1 : func (r *Reader) Offset() int64 {
     314           1 :         if r.blockNum < 0 {
     315           1 :                 return 0
     316           1 :         }
     317           1 :         return int64(r.blockNum)*blockSize + int64(r.end)
     318             : }
     319             : 
     320             : // recover clears any errors read so far, so that calling Next will start
     321             : // reading from the next good 32KiB block. If there are no such blocks, Next
     322             : // will return io.EOF. recover also marks the current reader, the one most
     323             : // recently returned by Next, as stale. If recover is called without any
     324             : // prior error, then recover is a no-op.
     325           1 : func (r *Reader) recover() {
     326           1 :         if r.err == nil {
     327           1 :                 return
     328           1 :         }
     329           1 :         r.recovering = true
     330           1 :         r.err = nil
     331           1 :         // Discard the rest of the current block.
     332           1 :         r.begin, r.end, r.last = r.n, r.n, false
     333           1 :         // Invalidate any outstanding singleReader.
     334           1 :         r.seq++
     335             : }
     336             : 
     337             : // seekRecord seeks in the underlying io.Reader such that calling r.Next
     338             : // returns the record whose first chunk header starts at the provided offset.
     339             : // Its behavior is undefined if the argument given is not such an offset, as
     340             : // the bytes at that offset may coincidentally appear to be a valid header.
     341             : //
     342             : // It returns ErrNotAnIOSeeker if the underlying io.Reader does not implement
     343             : // io.Seeker.
     344             : //
     345             : // seekRecord will fail and return an error if the Reader previously
     346             : // encountered an error, including io.EOF. Such errors can be cleared by
     347             : // calling Recover. Calling seekRecord after Recover will make calling Next
     348             : // return the record at the given offset, instead of the record at the next
     349             : // good 32KiB block as Recover normally would. Calling seekRecord before
     350             : // Recover has no effect on Recover's semantics other than changing the
     351             : // starting point for determining the next good 32KiB block.
     352             : //
     353             : // The offset is always relative to the start of the underlying io.Reader, so
     354             : // negative values will result in an error as per io.Seeker.
     355           1 : func (r *Reader) seekRecord(offset int64) error {
     356           1 :         r.seq++
     357           1 :         if r.err != nil {
     358           0 :                 return r.err
     359           0 :         }
     360             : 
     361           1 :         s, ok := r.r.(io.Seeker)
     362           1 :         if !ok {
     363           0 :                 return ErrNotAnIOSeeker
     364           0 :         }
     365             : 
     366             :         // Only seek to an exact block offset.
     367           1 :         c := int(offset & blockSizeMask)
     368           1 :         if _, r.err = s.Seek(offset&^blockSizeMask, io.SeekStart); r.err != nil {
     369           0 :                 return r.err
     370           0 :         }
     371             : 
     372             :         // Clear the state of the internal reader.
     373           1 :         r.begin, r.end, r.n = 0, 0, 0
     374           1 :         r.blockNum, r.recovering, r.last = -1, false, false
     375           1 :         if r.err = r.nextChunk(false); r.err != nil {
     376           1 :                 return r.err
     377           1 :         }
     378             : 
     379             :         // Now skip to the offset requested within the block. A subsequent
     380             :         // call to Next will return the block at the requested offset.
     381           1 :         r.begin, r.end = c, c
     382           1 : 
     383           1 :         return nil
     384             : }
     385             : 
     386             : type singleReader struct {
     387             :         r   *Reader
     388             :         seq int
     389             : }
     390             : 
     391           1 : func (x singleReader) Read(p []byte) (int, error) {
     392           1 :         r := x.r
     393           1 :         if r.seq != x.seq {
     394           1 :                 return 0, errors.New("pebble/record: stale reader")
     395           1 :         }
     396           1 :         if r.err != nil {
     397           0 :                 return 0, r.err
     398           0 :         }
     399           1 :         for r.begin == r.end {
     400           1 :                 if r.last {
     401           1 :                         return 0, io.EOF
     402           1 :                 }
     403           1 :                 if r.err = r.nextChunk(false); r.err != nil {
     404           1 :                         return 0, r.err
     405           1 :                 }
     406             :         }
     407           1 :         n := copy(p, r.buf[r.begin:r.end])
     408           1 :         r.begin += n
     409           1 :         return n, nil
     410             : }
     411             : 
     412             : // Writer writes records to an underlying io.Writer.
     413             : type Writer struct {
     414             :         // w is the underlying writer.
     415             :         w io.Writer
     416             :         // seq is the sequence number of the current record.
     417             :         seq int
     418             :         // f is w as a flusher.
     419             :         f flusher
     420             :         // buf[i:j] is the bytes that will become the current chunk.
     421             :         // The low bound, i, includes the chunk header.
     422             :         i, j int
     423             :         // buf[:written] has already been written to w.
     424             :         // written is zero unless Flush has been called.
     425             :         written int
     426             :         // baseOffset is the base offset in w at which writing started. If
     427             :         // w implements io.Seeker, it's relative to the start of w, 0 otherwise.
     428             :         baseOffset int64
     429             :         // blockNumber is the zero based block number currently held in buf.
     430             :         blockNumber int64
     431             :         // lastRecordOffset is the offset in w where the last record was
     432             :         // written (including the chunk header). It is a relative offset to
     433             :         // baseOffset, thus the absolute offset of the last record is
     434             :         // baseOffset + lastRecordOffset.
     435             :         lastRecordOffset int64
     436             :         // first is whether the current chunk is the first chunk of the record.
     437             :         first bool
     438             :         // pending is whether a chunk is buffered but not yet written.
     439             :         pending bool
     440             :         // err is any accumulated error.
     441             :         err error
     442             :         // buf is the buffer.
     443             :         buf [blockSize]byte
     444             : }
     445             : 
     446             : // NewWriter returns a new Writer.
     447           1 : func NewWriter(w io.Writer) *Writer {
     448           1 :         f, _ := w.(flusher)
     449           1 : 
     450           1 :         var o int64
     451           1 :         if s, ok := w.(io.Seeker); ok {
     452           1 :                 var err error
     453           1 :                 if o, err = s.Seek(0, io.SeekCurrent); err != nil {
     454           0 :                         o = 0
     455           0 :                 }
     456             :         }
     457           1 :         return &Writer{
     458           1 :                 w:                w,
     459           1 :                 f:                f,
     460           1 :                 baseOffset:       o,
     461           1 :                 lastRecordOffset: -1,
     462           1 :         }
     463             : }
     464             : 
     465             : // fillHeader fills in the header for the pending chunk.
     466           1 : func (w *Writer) fillHeader(last bool) {
     467           1 :         if w.i+legacyHeaderSize > w.j || w.j > blockSize {
     468           0 :                 panic("pebble/record: bad writer state")
     469             :         }
     470           1 :         if last {
     471           1 :                 if w.first {
     472           1 :                         w.buf[w.i+6] = fullChunkType
     473           1 :                 } else {
     474           1 :                         w.buf[w.i+6] = lastChunkType
     475           1 :                 }
     476           1 :         } else {
     477           1 :                 if w.first {
     478           1 :                         w.buf[w.i+6] = firstChunkType
     479           1 :                 } else {
     480           1 :                         w.buf[w.i+6] = middleChunkType
     481           1 :                 }
     482             :         }
     483           1 :         binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], crc.New(w.buf[w.i+6:w.j]).Value())
     484           1 :         binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-legacyHeaderSize))
     485             : }
     486             : 
     487             : // writeBlock writes the buffered block to the underlying writer, and reserves
     488             : // space for the next chunk's header.
     489           1 : func (w *Writer) writeBlock() {
     490           1 :         _, w.err = w.w.Write(w.buf[w.written:])
     491           1 :         w.i = 0
     492           1 :         w.j = legacyHeaderSize
     493           1 :         w.written = 0
     494           1 :         w.blockNumber++
     495           1 : }
     496             : 
     497             : // writePending finishes the current record and writes the buffer to the
     498             : // underlying writer.
     499           1 : func (w *Writer) writePending() {
     500           1 :         if w.err != nil {
     501           0 :                 return
     502           0 :         }
     503           1 :         if w.pending {
     504           1 :                 w.fillHeader(true)
     505           1 :                 w.pending = false
     506           1 :         }
     507           1 :         _, w.err = w.w.Write(w.buf[w.written:w.j])
     508           1 :         w.written = w.j
     509             : }
     510             : 
     511             : // Close finishes the current record and closes the writer.
     512           1 : func (w *Writer) Close() error {
     513           1 :         w.seq++
     514           1 :         w.writePending()
     515           1 :         if w.err != nil {
     516           0 :                 return w.err
     517           0 :         }
     518           1 :         w.err = errors.New("pebble/record: closed Writer")
     519           1 :         return nil
     520             : }
     521             : 
     522             : // Flush finishes the current record, writes to the underlying writer, and
     523             : // flushes it if that writer implements interface{ Flush() error }.
     524           1 : func (w *Writer) Flush() error {
     525           1 :         w.seq++
     526           1 :         w.writePending()
     527           1 :         if w.err != nil {
     528           1 :                 return w.err
     529           1 :         }
     530           1 :         if w.f != nil {
     531           1 :                 w.err = w.f.Flush()
     532           1 :                 return w.err
     533           1 :         }
     534           1 :         return nil
     535             : }
     536             : 
     537             : // Next returns a writer for the next record. The writer returned becomes stale
     538             : // after the next Close, Flush or Next call, and should no longer be used.
     539           1 : func (w *Writer) Next() (io.Writer, error) {
     540           1 :         w.seq++
     541           1 :         if w.err != nil {
     542           0 :                 return nil, w.err
     543           0 :         }
     544           1 :         if w.pending {
     545           1 :                 w.fillHeader(true)
     546           1 :         }
     547           1 :         w.i = w.j
     548           1 :         w.j = w.j + legacyHeaderSize
     549           1 :         // Check if there is room in the block for the header.
     550           1 :         if w.j > blockSize {
     551           1 :                 // Fill in the rest of the block with zeroes.
     552           1 :                 clear(w.buf[w.i:])
     553           1 :                 w.writeBlock()
     554           1 :                 if w.err != nil {
     555           0 :                         return nil, w.err
     556           0 :                 }
     557             :         }
     558           1 :         w.lastRecordOffset = w.baseOffset + w.blockNumber*blockSize + int64(w.i)
     559           1 :         w.first = true
     560           1 :         w.pending = true
     561           1 :         return singleWriter{w, w.seq}, nil
     562             : }
     563             : 
     564             : // WriteRecord writes a complete record. Returns the offset just past the end
     565             : // of the record.
     566           1 : func (w *Writer) WriteRecord(p []byte) (int64, error) {
     567           1 :         if w.err != nil {
     568           0 :                 return -1, w.err
     569           0 :         }
     570           1 :         t, err := w.Next()
     571           1 :         if err != nil {
     572           0 :                 return -1, err
     573           0 :         }
     574           1 :         if _, err := t.Write(p); err != nil {
     575           0 :                 return -1, err
     576           0 :         }
     577           1 :         w.writePending()
     578           1 :         offset := w.blockNumber*blockSize + int64(w.j)
     579           1 :         return offset, w.err
     580             : }
     581             : 
     582             : // Size returns the current size of the file.
     583           1 : func (w *Writer) Size() int64 {
     584           1 :         if w == nil {
     585           1 :                 return 0
     586           1 :         }
     587           1 :         return w.blockNumber*blockSize + int64(w.j)
     588             : }
     589             : 
     590             : // LastRecordOffset returns the offset in the underlying io.Writer of the last
     591             : // record so far - the one created by the most recent Next call. It is the
     592             : // offset of the first chunk header, suitable to pass to Reader.SeekRecord.
     593             : //
     594             : // If that io.Writer also implements io.Seeker, the return value is an absolute
     595             : // offset, in the sense of io.SeekStart, regardless of whether the io.Writer
     596             : // was initially at the zero position when passed to NewWriter. Otherwise, the
     597             : // return value is a relative offset, being the number of bytes written between
     598             : // the NewWriter call and any records written prior to the last record.
     599             : //
     600             : // If there is no last record, i.e. nothing was written, LastRecordOffset will
     601             : // return ErrNoLastRecord.
     602           1 : func (w *Writer) LastRecordOffset() (int64, error) {
     603           1 :         if w.err != nil {
     604           0 :                 return 0, w.err
     605           0 :         }
     606           1 :         if w.lastRecordOffset < 0 {
     607           1 :                 return 0, ErrNoLastRecord
     608           1 :         }
     609           1 :         return w.lastRecordOffset, nil
     610             : }
     611             : 
     612             : type singleWriter struct {
     613             :         w   *Writer
     614             :         seq int
     615             : }
     616             : 
     617           1 : func (x singleWriter) Write(p []byte) (int, error) {
     618           1 :         w := x.w
     619           1 :         if w.seq != x.seq {
     620           0 :                 return 0, errors.New("pebble/record: stale writer")
     621           0 :         }
     622           1 :         if w.err != nil {
     623           0 :                 return 0, w.err
     624           0 :         }
     625           1 :         n0 := len(p)
     626           1 :         for len(p) > 0 {
     627           1 :                 // Write a block, if it is full.
     628           1 :                 if w.j == blockSize {
     629           1 :                         w.fillHeader(false)
     630           1 :                         w.writeBlock()
     631           1 :                         if w.err != nil {
     632           0 :                                 return 0, w.err
     633           0 :                         }
     634           1 :                         w.first = false
     635             :                 }
     636             :                 // Copy bytes into the buffer.
     637           1 :                 n := copy(w.buf[w.j:], p)
     638           1 :                 w.j += n
     639           1 :                 p = p[n:]
     640             :         }
     641           1 :         return n0, nil
     642             : }

Generated by: LCOV version 1.14