Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package objstorageprovider
6 :
7 : import (
8 : "context"
9 : "io"
10 : "sync"
11 :
12 : "github.com/cockroachdb/pebble/internal/base"
13 : "github.com/cockroachdb/pebble/objstorage"
14 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
15 : "github.com/cockroachdb/pebble/objstorage/remote"
16 : )
17 :
18 : // NewRemoteReadable creates an objstorage.Readable out of a remote.ObjectReader.
19 1 : func NewRemoteReadable(objReader remote.ObjectReader, size int64) objstorage.Readable {
20 1 : return &remoteReadable{
21 1 : objReader: objReader,
22 1 : size: size,
23 1 : }
24 1 : }
25 :
26 : const remoteMaxReadaheadSize = 1024 * 1024 /* 1MB */
27 :
28 : // Number of concurrent compactions is bounded and significantly lower than
29 : // the number of concurrent queries, and compactions consume reads from a few
30 : // levels, so there is no risk of high memory usage due to a higher readahead
31 : // size. So set this higher than remoteMaxReadaheadSize
32 : const remoteReadaheadSizeForCompaction = 8 * 1024 * 1024 /* 8MB */
33 :
34 : // remoteReadable is a very simple implementation of Readable on top of the
35 : // remote.ObjectReader returned by remote.Storage.ReadObject. It is stateless
36 : // and can be called concurrently.
37 : type remoteReadable struct {
38 : objReader remote.ObjectReader
39 : size int64
40 : fileNum base.DiskFileNum
41 : cache *sharedcache.Cache
42 : }
43 :
44 : var _ objstorage.Readable = (*remoteReadable)(nil)
45 :
46 : func (p *provider) newRemoteReadable(
47 : objReader remote.ObjectReader, size int64, fileNum base.DiskFileNum,
48 1 : ) *remoteReadable {
49 1 : return &remoteReadable{
50 1 : objReader: objReader,
51 1 : size: size,
52 1 : fileNum: fileNum,
53 1 : cache: p.remote.cache,
54 1 : }
55 1 : }
56 :
57 : // ReadAt is part of the objstorage.Readable interface.
58 1 : func (r *remoteReadable) ReadAt(ctx context.Context, p []byte, offset int64) error {
59 1 : return r.readInternal(ctx, p, offset, false /* forCompaction */)
60 1 : }
61 :
62 : // readInternal performs a read for the object, using the cache when
63 : // appropriate.
64 : func (r *remoteReadable) readInternal(
65 : ctx context.Context, p []byte, offset int64, forCompaction bool,
66 1 : ) error {
67 1 : if r.cache != nil {
68 0 : flags := sharedcache.ReadFlags{
69 0 : // Don't add data to the cache if this read is for a compaction.
70 0 : ReadOnly: forCompaction,
71 0 : }
72 0 : return r.cache.ReadAt(ctx, r.fileNum, p, offset, r.objReader, r.size, flags)
73 0 : }
74 1 : return r.objReader.ReadAt(ctx, p, offset)
75 : }
76 :
77 1 : func (r *remoteReadable) Close() error {
78 1 : defer func() { r.objReader = nil }()
79 1 : return r.objReader.Close()
80 : }
81 :
82 1 : func (r *remoteReadable) Size() int64 {
83 1 : return r.size
84 1 : }
85 :
86 : // TODO(sumeer): both readBeforeSize and ReadHandle.SetupForCompaction are
87 : // initial configuration of a ReadHandle. So they should both be passed as
88 : // Options to NewReadHandle. But currently the latter is a separate method.
89 : // This is because of how the sstable.Reader calls setupForCompaction on the
90 : // iterators after they are constructed. Consider fixing this oddity.
91 :
92 : func (r *remoteReadable) NewReadHandle(
93 : readBeforeSize objstorage.ReadBeforeSize,
94 1 : ) objstorage.ReadHandle {
95 1 : rh := remoteReadHandlePool.Get().(*remoteReadHandle)
96 1 : *rh = remoteReadHandle{readable: r, readBeforeSize: readBeforeSize, buffered: rh.buffered}
97 1 : rh.readAheadState = makeReadaheadState(remoteMaxReadaheadSize)
98 1 : return rh
99 1 : }
100 :
101 : // TODO(sumeer): add test for remoteReadHandle.
102 :
103 : // remoteReadHandle supports doing larger reads, and buffering the additional
104 : // data, to serve future reads. It is not thread-safe. There are two kinds of
105 : // larger reads (a) read-ahead (for sequential data reads), (b) read-before,
106 : // for non-data reads.
107 : //
108 : // For both (a) and (b), the goal is to reduce the number of reads since
109 : // remote read latency and cost can be high. We have to balance this with
110 : // buffers consuming too much memory, since there can be a large number of
111 : // iterators holding remoteReadHandles open for every level.
112 : //
113 : // For (b) we have to two use-cases:
114 : //
115 : // - When a sstable.Reader is opened, it needs to read the footer, metaindex
116 : // block and meta properties block. It starts by reading the footer which is
117 : // at the end of the table and then proceeds to read the other two. Instead
118 : // of doing 3 tiny reads, we would like to do one read.
119 : //
120 : // - When a single-level or two-level iterator is opened, it reads the
121 : // (top-level) index block first. When the iterator is used, it will
122 : // typically follow this by reading the filter block (since SeeKPrefixGE is
123 : // common in CockroachDB). For a two-level iterator it will also read the
124 : // lower-level index blocks which are after the filter block and before the
125 : // top-level index block. It would be ideal if reading the top-level index
126 : // block read enough to include the filter block. And for two-level
127 : // iterators this would also include the lower-level index blocks.
128 : //
129 : // In both use-cases we want the first read from the remoteReadable to do a
130 : // larger read, and read bytes earlier than the requested read, hence
131 : // "read-before". Subsequent reads from the remoteReadable can use the usual
132 : // readahead logic (for the second use-case above, this can help with
133 : // sequential reads of the lower-level index blocks when the read-before was
134 : // insufficient to satisfy such reads). In the first use-case, the block cache
135 : // is not used. In the second use-case, the block cache is used, and if the
136 : // first read, which reads the top-level index, has a cache hit, we do not do
137 : // any read-before, since we assume that with some locality in the workload
138 : // the other reads will also have a cache hit (it is also messier code to try
139 : // to preserve some read-before).
140 : //
141 : // Note that both use-cases can often occur near each other if there is enough
142 : // locality of access, in which case file cache and block cache misses are
143 : // mainly happening for new sstables created by compactions -- in this case a
144 : // user-facing read will cause a file cache miss and a new sstable.Reader to
145 : // be created, followed by an iterator creation. We don't currently combine
146 : // the reads across the Reader and the iterator creation, since the code
147 : // structure is not simple enough, but we could consider that in the future.
148 : type remoteReadHandle struct {
149 : readable *remoteReadable
150 : readBeforeSize objstorage.ReadBeforeSize
151 : readAheadState readaheadState
152 : buffered struct {
153 : data []byte
154 : offset int64
155 : }
156 : forCompaction bool
157 : }
158 :
159 : var _ objstorage.ReadHandle = (*remoteReadHandle)(nil)
160 :
161 : var remoteReadHandlePool = sync.Pool{
162 1 : New: func() interface{} {
163 1 : return &remoteReadHandle{}
164 1 : },
165 : }
166 :
167 : // ReadAt is part of the objstorage.ReadHandle interface.
168 1 : func (r *remoteReadHandle) ReadAt(ctx context.Context, p []byte, offset int64) error {
169 1 : var extraBytesBefore int64
170 1 : if r.readBeforeSize > 0 {
171 1 : if int(r.readBeforeSize) > len(p) {
172 1 : extraBytesBefore = min(int64(int(r.readBeforeSize)-len(p)), offset)
173 1 : }
174 : // Only first read uses read-before.
175 1 : r.readBeforeSize = 0
176 : }
177 1 : readaheadSize := r.maybeReadahead(offset, len(p))
178 1 :
179 1 : // Prefer read-before to read-ahead since only first call does read-before.
180 1 : // Also, since this is the first call, the buffer must be empty.
181 1 : if extraBytesBefore > 0 {
182 1 : r.buffered.offset = offset - extraBytesBefore
183 1 : err := r.readToBuffer(ctx, offset-extraBytesBefore, len(p)+int(extraBytesBefore))
184 1 : if err != nil {
185 0 : return err
186 0 : }
187 1 : copy(p, r.buffered.data[int(extraBytesBefore):])
188 1 : return nil
189 : }
190 : // Check if we already have the data from a previous read-ahead/read-before.
191 1 : if rhSize := int64(len(r.buffered.data)); rhSize > 0 {
192 1 : // We only consider the case where we have a prefix of the needed data. We
193 1 : // could enhance this to utilize a suffix of the needed data.
194 1 : if r.buffered.offset <= offset && r.buffered.offset+rhSize > offset {
195 1 : n := copy(p, r.buffered.data[offset-r.buffered.offset:])
196 1 : if n == len(p) {
197 1 : // All data was available.
198 1 : return nil
199 1 : }
200 : // Use the data that we had and do a shorter read.
201 1 : offset += int64(n)
202 1 : p = p[n:]
203 1 : readaheadSize -= n
204 : }
205 : }
206 :
207 1 : if readaheadSize > len(p) {
208 1 : // Don't try to read past EOF.
209 1 : if offset+int64(readaheadSize) > r.readable.size {
210 1 : readaheadSize = int(r.readable.size - offset)
211 1 : if readaheadSize <= 0 {
212 1 : // This shouldn't happen in practice (Pebble should never try to read
213 1 : // past EOF).
214 1 : return io.EOF
215 1 : }
216 : }
217 1 : if err := r.readToBuffer(ctx, offset, readaheadSize); err != nil {
218 0 : return err
219 0 : }
220 1 : copy(p, r.buffered.data)
221 1 : return nil
222 : }
223 :
224 1 : return r.readable.readInternal(ctx, p, offset, r.forCompaction)
225 : }
226 :
227 1 : func (r *remoteReadHandle) maybeReadahead(offset int64, len int) int {
228 1 : if r.forCompaction {
229 1 : return remoteReadaheadSizeForCompaction
230 1 : }
231 1 : return int(r.readAheadState.maybeReadahead(offset, int64(len)))
232 : }
233 :
234 1 : func (r *remoteReadHandle) readToBuffer(ctx context.Context, offset int64, length int) error {
235 1 : r.buffered.offset = offset
236 1 : // TODO(radu): we need to somehow account for this memory.
237 1 : if cap(r.buffered.data) >= length {
238 1 : r.buffered.data = r.buffered.data[:length]
239 1 : } else {
240 1 : r.buffered.data = make([]byte, length)
241 1 : }
242 1 : if err := r.readable.readInternal(
243 1 : ctx, r.buffered.data, r.buffered.offset, r.forCompaction); err != nil {
244 0 : // Make sure we don't treat the data as valid next time.
245 0 : r.buffered.data = r.buffered.data[:0]
246 0 : return err
247 0 : }
248 1 : return nil
249 : }
250 :
251 : // Close is part of the objstorage.ReadHandle interface.
252 1 : func (r *remoteReadHandle) Close() error {
253 1 : buf := r.buffered.data[:0]
254 1 : *r = remoteReadHandle{}
255 1 : r.buffered.data = buf
256 1 : remoteReadHandlePool.Put(r)
257 1 : return nil
258 1 : }
259 :
260 : // SetupForCompaction is part of the objstorage.ReadHandle interface.
261 1 : func (r *remoteReadHandle) SetupForCompaction() {
262 1 : r.forCompaction = true
263 1 : }
264 :
265 : // RecordCacheHit is part of the objstorage.ReadHandle interface.
266 1 : func (r *remoteReadHandle) RecordCacheHit(_ context.Context, offset, size int64) {
267 1 : if !r.forCompaction {
268 1 : r.readAheadState.recordCacheHit(offset, size)
269 1 : }
270 1 : if r.readBeforeSize > 0 {
271 1 : r.readBeforeSize = 0
272 1 : }
273 : }
|