LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider - remote_readable.go (source / functions) Hit Total Coverage
Test: 2024-11-02 08:17Z 71bb6ba2 - tests only.lcov Lines: 110 124 88.7 %
Date: 2024-11-02 08:18:28 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package objstorageprovider
       6             : 
       7             : import (
       8             :         "context"
       9             :         "io"
      10             :         "sync"
      11             : 
      12             :         "github.com/cockroachdb/pebble/internal/base"
      13             :         "github.com/cockroachdb/pebble/objstorage"
      14             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
      15             :         "github.com/cockroachdb/pebble/objstorage/remote"
      16             : )
      17             : 
      18             : // NewRemoteReadable creates an objstorage.Readable out of a remote.ObjectReader.
      19           1 : func NewRemoteReadable(objReader remote.ObjectReader, size int64) objstorage.Readable {
      20           1 :         return &remoteReadable{
      21           1 :                 objReader: objReader,
      22           1 :                 size:      size,
      23           1 :         }
      24           1 : }
      25             : 
      26             : const remoteMaxReadaheadSize = 1024 * 1024 /* 1MB */
      27             : 
      28             : // Number of concurrent compactions is bounded and significantly lower than
      29             : // the number of concurrent queries, and compactions consume reads from a few
      30             : // levels, so there is no risk of high memory usage due to a higher readahead
      31             : // size. So set this higher than remoteMaxReadaheadSize
      32             : const remoteReadaheadSizeForCompaction = 8 * 1024 * 1024 /* 8MB */
      33             : 
      34             : // remoteReadable is a very simple implementation of Readable on top of the
      35             : // remote.ObjectReader returned by remote.Storage.ReadObject. It is stateless
      36             : // and can be called concurrently.
      37             : type remoteReadable struct {
      38             :         objReader remote.ObjectReader
      39             :         size      int64
      40             :         fileNum   base.DiskFileNum
      41             :         cache     *sharedcache.Cache
      42             : }
      43             : 
      44             : var _ objstorage.Readable = (*remoteReadable)(nil)
      45             : 
      46             : func (p *provider) newRemoteReadable(
      47             :         objReader remote.ObjectReader, size int64, fileNum base.DiskFileNum,
      48           1 : ) *remoteReadable {
      49           1 :         return &remoteReadable{
      50           1 :                 objReader: objReader,
      51           1 :                 size:      size,
      52           1 :                 fileNum:   fileNum,
      53           1 :                 cache:     p.remote.cache,
      54           1 :         }
      55           1 : }
      56             : 
      57             : // ReadAt is part of the objstorage.Readable interface.
      58           1 : func (r *remoteReadable) ReadAt(ctx context.Context, p []byte, offset int64) error {
      59           1 :         return r.readInternal(ctx, p, offset, false /* forCompaction */)
      60           1 : }
      61             : 
      62             : // readInternal performs a read for the object, using the cache when
      63             : // appropriate.
      64             : func (r *remoteReadable) readInternal(
      65             :         ctx context.Context, p []byte, offset int64, forCompaction bool,
      66           1 : ) error {
      67           1 :         if r.cache != nil {
      68           0 :                 flags := sharedcache.ReadFlags{
      69           0 :                         // Don't add data to the cache if this read is for a compaction.
      70           0 :                         ReadOnly: forCompaction,
      71           0 :                 }
      72           0 :                 return r.cache.ReadAt(ctx, r.fileNum, p, offset, r.objReader, r.size, flags)
      73           0 :         }
      74           1 :         return r.objReader.ReadAt(ctx, p, offset)
      75             : }
      76             : 
      77           1 : func (r *remoteReadable) Close() error {
      78           1 :         defer func() { r.objReader = nil }()
      79           1 :         return r.objReader.Close()
      80             : }
      81             : 
      82           1 : func (r *remoteReadable) Size() int64 {
      83           1 :         return r.size
      84           1 : }
      85             : 
      86             : // TODO(sumeer): both readBeforeSize and ReadHandle.SetupForCompaction are
      87             : // initial configuration of a ReadHandle. So they should both be passed as
      88             : // Options to NewReadHandle. But currently the latter is a separate method.
      89             : // This is because of how the sstable.Reader calls setupForCompaction on the
      90             : // iterators after they are constructed. Consider fixing this oddity.
      91             : 
      92             : func (r *remoteReadable) NewReadHandle(
      93             :         readBeforeSize objstorage.ReadBeforeSize,
      94           1 : ) objstorage.ReadHandle {
      95           1 :         rh := remoteReadHandlePool.Get().(*remoteReadHandle)
      96           1 :         *rh = remoteReadHandle{readable: r, readBeforeSize: readBeforeSize, buffered: rh.buffered}
      97           1 :         rh.readAheadState = makeReadaheadState(remoteMaxReadaheadSize)
      98           1 :         return rh
      99           1 : }
     100             : 
     101             : // TODO(sumeer): add test for remoteReadHandle.
     102             : 
     103             : // remoteReadHandle supports doing larger reads, and buffering the additional
     104             : // data, to serve future reads. It is not thread-safe. There are two kinds of
     105             : // larger reads (a) read-ahead (for sequential data reads), (b) read-before,
     106             : // for non-data reads.
     107             : //
     108             : // For both (a) and (b), the goal is to reduce the number of reads since
     109             : // remote read latency and cost can be high. We have to balance this with
     110             : // buffers consuming too much memory, since there can be a large number of
     111             : // iterators holding remoteReadHandles open for every level.
     112             : //
     113             : // For (b) we have to two use-cases:
     114             : //
     115             : //   - When a sstable.Reader is opened, it needs to read the footer, metaindex
     116             : //     block and meta properties block. It starts by reading the footer which is
     117             : //     at the end of the table and then proceeds to read the other two. Instead
     118             : //     of doing 3 tiny reads, we would like to do one read.
     119             : //
     120             : //   - When a single-level or two-level iterator is opened, it reads the
     121             : //     (top-level) index block first. When the iterator is used, it will
     122             : //     typically follow this by reading the filter block (since SeeKPrefixGE is
     123             : //     common in CockroachDB). For a two-level iterator it will also read the
     124             : //     lower-level index blocks which are after the filter block and before the
     125             : //     top-level index block. It would be ideal if reading the top-level index
     126             : //     block read enough to include the filter block. And for two-level
     127             : //     iterators this would also include the lower-level index blocks.
     128             : //
     129             : // In both use-cases we want the first read from the remoteReadable to do a
     130             : // larger read, and read bytes earlier than the requested read, hence
     131             : // "read-before". Subsequent reads from the remoteReadable can use the usual
     132             : // readahead logic (for the second use-case above, this can help with
     133             : // sequential reads of the lower-level index blocks when the read-before was
     134             : // insufficient to satisfy such reads). In the first use-case, the block cache
     135             : // is not used. In the second use-case, the block cache is used, and if the
     136             : // first read, which reads the top-level index, has a cache hit, we do not do
     137             : // any read-before, since we assume that with some locality in the workload
     138             : // the other reads will also have a cache hit (it is also messier code to try
     139             : // to preserve some read-before).
     140             : //
     141             : // Note that both use-cases can often occur near each other if there is enough
     142             : // locality of access, in which case table cache and block cache misses are
     143             : // mainly happening for new sstables created by compactions -- in this case a
     144             : // user-facing read will cause a table cache miss and a new sstable.Reader to
     145             : // be created, followed by an iterator creation. We don't currently combine
     146             : // the reads across the Reader and the iterator creation, since the code
     147             : // structure is not simple enough, but we could consider that in the future.
     148             : type remoteReadHandle struct {
     149             :         readable       *remoteReadable
     150             :         readBeforeSize objstorage.ReadBeforeSize
     151             :         readAheadState readaheadState
     152             :         buffered       struct {
     153             :                 data   []byte
     154             :                 offset int64
     155             :         }
     156             :         forCompaction bool
     157             : }
     158             : 
     159             : var _ objstorage.ReadHandle = (*remoteReadHandle)(nil)
     160             : 
     161             : var remoteReadHandlePool = sync.Pool{
     162           1 :         New: func() interface{} {
     163           1 :                 return &remoteReadHandle{}
     164           1 :         },
     165             : }
     166             : 
     167             : // ReadAt is part of the objstorage.ReadHandle interface.
     168           1 : func (r *remoteReadHandle) ReadAt(ctx context.Context, p []byte, offset int64) error {
     169           1 :         var extraBytesBefore int64
     170           1 :         if r.readBeforeSize > 0 {
     171           1 :                 if int(r.readBeforeSize) > len(p) {
     172           1 :                         extraBytesBefore = min(int64(int(r.readBeforeSize)-len(p)), offset)
     173           1 :                 }
     174             :                 // Only first read uses read-before.
     175           1 :                 r.readBeforeSize = 0
     176             :         }
     177           1 :         readaheadSize := r.maybeReadahead(offset, len(p))
     178           1 : 
     179           1 :         // Prefer read-before to read-ahead since only first call does read-before.
     180           1 :         // Also, since this is the first call, the buffer must be empty.
     181           1 :         if extraBytesBefore > 0 {
     182           1 :                 r.buffered.offset = offset - extraBytesBefore
     183           1 :                 err := r.readToBuffer(ctx, offset-extraBytesBefore, len(p)+int(extraBytesBefore))
     184           1 :                 if err != nil {
     185           0 :                         return err
     186           0 :                 }
     187           1 :                 copy(p, r.buffered.data[int(extraBytesBefore):])
     188           1 :                 return nil
     189             :         }
     190             :         // Check if we already have the data from a previous read-ahead/read-before.
     191           1 :         if rhSize := int64(len(r.buffered.data)); rhSize > 0 {
     192           1 :                 // We only consider the case where we have a prefix of the needed data. We
     193           1 :                 // could enhance this to utilize a suffix of the needed data.
     194           1 :                 if r.buffered.offset <= offset && r.buffered.offset+rhSize > offset {
     195           1 :                         n := copy(p, r.buffered.data[offset-r.buffered.offset:])
     196           1 :                         if n == len(p) {
     197           1 :                                 // All data was available.
     198           1 :                                 return nil
     199           1 :                         }
     200             :                         // Use the data that we had and do a shorter read.
     201           1 :                         offset += int64(n)
     202           1 :                         p = p[n:]
     203           1 :                         readaheadSize -= n
     204             :                 }
     205             :         }
     206             : 
     207           1 :         if readaheadSize > len(p) {
     208           1 :                 // Don't try to read past EOF.
     209           1 :                 if offset+int64(readaheadSize) > r.readable.size {
     210           1 :                         readaheadSize = int(r.readable.size - offset)
     211           1 :                         if readaheadSize <= 0 {
     212           1 :                                 // This shouldn't happen in practice (Pebble should never try to read
     213           1 :                                 // past EOF).
     214           1 :                                 return io.EOF
     215           1 :                         }
     216             :                 }
     217           1 :                 if err := r.readToBuffer(ctx, offset, readaheadSize); err != nil {
     218           0 :                         return err
     219           0 :                 }
     220           1 :                 copy(p, r.buffered.data)
     221           1 :                 return nil
     222             :         }
     223             : 
     224           1 :         return r.readable.readInternal(ctx, p, offset, r.forCompaction)
     225             : }
     226             : 
     227           1 : func (r *remoteReadHandle) maybeReadahead(offset int64, len int) int {
     228           1 :         if r.forCompaction {
     229           1 :                 return remoteReadaheadSizeForCompaction
     230           1 :         }
     231           1 :         return int(r.readAheadState.maybeReadahead(offset, int64(len)))
     232             : }
     233             : 
     234           1 : func (r *remoteReadHandle) readToBuffer(ctx context.Context, offset int64, length int) error {
     235           1 :         r.buffered.offset = offset
     236           1 :         // TODO(radu): we need to somehow account for this memory.
     237           1 :         if cap(r.buffered.data) >= length {
     238           1 :                 r.buffered.data = r.buffered.data[:length]
     239           1 :         } else {
     240           1 :                 r.buffered.data = make([]byte, length)
     241           1 :         }
     242           1 :         if err := r.readable.readInternal(
     243           1 :                 ctx, r.buffered.data, r.buffered.offset, r.forCompaction); err != nil {
     244           0 :                 // Make sure we don't treat the data as valid next time.
     245           0 :                 r.buffered.data = r.buffered.data[:0]
     246           0 :                 return err
     247           0 :         }
     248           1 :         return nil
     249             : }
     250             : 
     251             : // Close is part of the objstorage.ReadHandle interface.
     252           1 : func (r *remoteReadHandle) Close() error {
     253           1 :         buf := r.buffered.data[:0]
     254           1 :         *r = remoteReadHandle{}
     255           1 :         r.buffered.data = buf
     256           1 :         remoteReadHandlePool.Put(r)
     257           1 :         return nil
     258           1 : }
     259             : 
     260             : // SetupForCompaction is part of the objstorage.ReadHandle interface.
     261           1 : func (r *remoteReadHandle) SetupForCompaction() {
     262           1 :         r.forCompaction = true
     263           1 : }
     264             : 
     265             : // RecordCacheHit is part of the objstorage.ReadHandle interface.
     266           1 : func (r *remoteReadHandle) RecordCacheHit(_ context.Context, offset, size int64) {
     267           1 :         if !r.forCompaction {
     268           1 :                 r.readAheadState.recordCacheHit(offset, size)
     269           1 :         }
     270           1 :         if r.readBeforeSize > 0 {
     271           1 :                 r.readBeforeSize = 0
     272           1 :         }
     273             : }

Generated by: LCOV version 1.14