Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package objstorageprovider
6 :
7 : import (
8 : "context"
9 : "io"
10 :
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/objstorage"
13 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
14 : "github.com/cockroachdb/pebble/objstorage/remote"
15 : )
16 :
17 : // NewRemoteReadable creates an objstorage.Readable out of a remote.ObjectReader.
18 2 : func NewRemoteReadable(objReader remote.ObjectReader, size int64) objstorage.Readable {
19 2 : return &remoteReadable{
20 2 : objReader: objReader,
21 2 : size: size,
22 2 : }
23 2 : }
24 :
25 : const remoteMaxReadaheadSize = 1024 * 1024 /* 1MB */
26 :
27 : // remoteReadable is a very simple implementation of Readable on top of the
28 : // ReadCloser returned by remote.Storage.CreateObject.
29 : type remoteReadable struct {
30 : objReader remote.ObjectReader
31 : size int64
32 : fileNum base.DiskFileNum
33 : cache *sharedcache.Cache
34 : }
35 :
36 : var _ objstorage.Readable = (*remoteReadable)(nil)
37 :
38 : func (p *provider) newRemoteReadable(
39 : objReader remote.ObjectReader, size int64, fileNum base.DiskFileNum,
40 2 : ) *remoteReadable {
41 2 : return &remoteReadable{
42 2 : objReader: objReader,
43 2 : size: size,
44 2 : fileNum: fileNum,
45 2 : cache: p.remote.cache,
46 2 : }
47 2 : }
48 :
49 : // ReadAt is part of the objstorage.Readable interface.
50 2 : func (r *remoteReadable) ReadAt(ctx context.Context, p []byte, offset int64) error {
51 2 : return r.readInternal(ctx, p, offset, false /* forCompaction */)
52 2 : }
53 :
54 : // readInternal performs a read for the object, using the cache when
55 : // appropriate.
56 : func (r *remoteReadable) readInternal(
57 : ctx context.Context, p []byte, offset int64, forCompaction bool,
58 2 : ) error {
59 2 : if r.cache != nil {
60 1 : flags := sharedcache.ReadFlags{
61 1 : // Don't add data to the cache if this read is for a compaction.
62 1 : ReadOnly: forCompaction,
63 1 : }
64 1 : return r.cache.ReadAt(ctx, r.fileNum, p, offset, r.objReader, r.size, flags)
65 1 : }
66 2 : return r.objReader.ReadAt(ctx, p, offset)
67 : }
68 :
69 2 : func (r *remoteReadable) Close() error {
70 2 : defer func() { r.objReader = nil }()
71 2 : return r.objReader.Close()
72 : }
73 :
74 2 : func (r *remoteReadable) Size() int64 {
75 2 : return r.size
76 2 : }
77 :
78 2 : func (r *remoteReadable) NewReadHandle(_ context.Context) objstorage.ReadHandle {
79 2 : // TODO(radu): use a pool.
80 2 : rh := &remoteReadHandle{readable: r}
81 2 : rh.readahead.state = makeReadaheadState(remoteMaxReadaheadSize)
82 2 : return rh
83 2 : }
84 :
85 : type remoteReadHandle struct {
86 : readable *remoteReadable
87 : readahead struct {
88 : state readaheadState
89 : data []byte
90 : offset int64
91 : }
92 : forCompaction bool
93 : }
94 :
95 : var _ objstorage.ReadHandle = (*remoteReadHandle)(nil)
96 :
97 : // ReadAt is part of the objstorage.ReadHandle interface.
98 2 : func (r *remoteReadHandle) ReadAt(ctx context.Context, p []byte, offset int64) error {
99 2 : readaheadSize := r.maybeReadahead(offset, len(p))
100 2 :
101 2 : // Check if we already have the data from a previous read-ahead.
102 2 : if rhSize := int64(len(r.readahead.data)); rhSize > 0 {
103 2 : if r.readahead.offset <= offset && r.readahead.offset+rhSize > offset {
104 2 : n := copy(p, r.readahead.data[offset-r.readahead.offset:])
105 2 : if n == len(p) {
106 2 : // All data was available.
107 2 : return nil
108 2 : }
109 : // Use the data that we had and do a shorter read.
110 1 : offset += int64(n)
111 1 : p = p[n:]
112 1 : readaheadSize -= n
113 : }
114 : }
115 :
116 2 : if readaheadSize > len(p) {
117 2 : // Don't try to read past EOF.
118 2 : if offset+int64(readaheadSize) > r.readable.size {
119 2 : readaheadSize = int(r.readable.size - offset)
120 2 : if readaheadSize <= 0 {
121 0 : // This shouldn't happen in practice (Pebble should never try to read
122 0 : // past EOF).
123 0 : return io.EOF
124 0 : }
125 : }
126 2 : r.readahead.offset = offset
127 2 : // TODO(radu): we need to somehow account for this memory.
128 2 : if cap(r.readahead.data) >= readaheadSize {
129 1 : r.readahead.data = r.readahead.data[:readaheadSize]
130 2 : } else {
131 2 : r.readahead.data = make([]byte, readaheadSize)
132 2 : }
133 :
134 2 : if err := r.readable.readInternal(ctx, r.readahead.data, offset, r.forCompaction); err != nil {
135 0 : // Make sure we don't treat the data as valid next time.
136 0 : r.readahead.data = r.readahead.data[:0]
137 0 : return err
138 0 : }
139 2 : copy(p, r.readahead.data)
140 2 : return nil
141 : }
142 :
143 2 : return r.readable.readInternal(ctx, p, offset, r.forCompaction)
144 : }
145 :
146 2 : func (r *remoteReadHandle) maybeReadahead(offset int64, len int) int {
147 2 : if r.forCompaction {
148 2 : return remoteMaxReadaheadSize
149 2 : }
150 2 : return int(r.readahead.state.maybeReadahead(offset, int64(len)))
151 : }
152 :
153 : // Close is part of the objstorage.ReadHandle interface.
154 2 : func (r *remoteReadHandle) Close() error {
155 2 : r.readable = nil
156 2 : r.readahead.data = nil
157 2 : return nil
158 2 : }
159 :
160 : // SetupForCompaction is part of the objstorage.ReadHandle interface.
161 2 : func (r *remoteReadHandle) SetupForCompaction() {
162 2 : r.forCompaction = true
163 2 : }
164 :
165 : // RecordCacheHit is part of the objstorage.ReadHandle interface.
166 2 : func (r *remoteReadHandle) RecordCacheHit(_ context.Context, offset, size int64) {
167 2 : if !r.forCompaction {
168 2 : r.readahead.state.recordCacheHit(offset, size)
169 2 : }
170 : }
|