Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package objstorageprovider
6 :
7 : import (
8 : "context"
9 : "io"
10 :
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/objstorage"
13 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
14 : "github.com/cockroachdb/pebble/objstorage/remote"
15 : )
16 :
17 : // NewRemoteReadable creates an objstorage.Readable out of a remote.ObjectReader.
18 1 : func NewRemoteReadable(objReader remote.ObjectReader, size int64) objstorage.Readable {
19 1 : return &remoteReadable{
20 1 : objReader: objReader,
21 1 : size: size,
22 1 : }
23 1 : }
24 :
25 : const remoteMaxReadaheadSize = 1024 * 1024 /* 1MB */
26 :
27 : // remoteReadable is a very simple implementation of Readable on top of the
28 : // ReadCloser returned by remote.Storage.CreateObject.
29 : type remoteReadable struct {
30 : objReader remote.ObjectReader
31 : size int64
32 : fileNum base.DiskFileNum
33 : cache *sharedcache.Cache
34 : }
35 :
36 : var _ objstorage.Readable = (*remoteReadable)(nil)
37 :
38 : func (p *provider) newRemoteReadable(
39 : objReader remote.ObjectReader, size int64, fileNum base.DiskFileNum,
40 1 : ) *remoteReadable {
41 1 : return &remoteReadable{
42 1 : objReader: objReader,
43 1 : size: size,
44 1 : fileNum: fileNum,
45 1 : cache: p.remote.cache,
46 1 : }
47 1 : }
48 :
49 : // ReadAt is part of the objstorage.Readable interface.
50 1 : func (r *remoteReadable) ReadAt(ctx context.Context, p []byte, offset int64) error {
51 1 : return r.readInternal(ctx, p, offset, false /* forCompaction */)
52 1 : }
53 :
54 : // readInternal performs a read for the object, using the cache when
55 : // appropriate.
56 : func (r *remoteReadable) readInternal(
57 : ctx context.Context, p []byte, offset int64, forCompaction bool,
58 1 : ) error {
59 1 : if r.cache != nil {
60 1 : flags := sharedcache.ReadFlags{
61 1 : // Don't add data to the cache if this read is for a compaction.
62 1 : ReadOnly: forCompaction,
63 1 : }
64 1 : return r.cache.ReadAt(ctx, r.fileNum, p, offset, r.objReader, r.size, flags)
65 1 : }
66 1 : return r.objReader.ReadAt(ctx, p, offset)
67 : }
68 :
69 1 : func (r *remoteReadable) Close() error {
70 1 : defer func() { r.objReader = nil }()
71 1 : return r.objReader.Close()
72 : }
73 :
74 1 : func (r *remoteReadable) Size() int64 {
75 1 : return r.size
76 1 : }
77 :
78 1 : func (r *remoteReadable) NewReadHandle(_ context.Context) objstorage.ReadHandle {
79 1 : // TODO(radu): use a pool.
80 1 : rh := &remoteReadHandle{readable: r}
81 1 : rh.readahead.state = makeReadaheadState(remoteMaxReadaheadSize)
82 1 : return rh
83 1 : }
84 :
85 : type remoteReadHandle struct {
86 : readable *remoteReadable
87 : readahead struct {
88 : state readaheadState
89 : data []byte
90 : offset int64
91 : }
92 : forCompaction bool
93 : }
94 :
95 : var _ objstorage.ReadHandle = (*remoteReadHandle)(nil)
96 :
97 : // ReadAt is part of the objstorage.ReadHandle interface.
98 1 : func (r *remoteReadHandle) ReadAt(ctx context.Context, p []byte, offset int64) error {
99 1 : readaheadSize := r.maybeReadahead(offset, len(p))
100 1 :
101 1 : // Check if we already have the data from a previous read-ahead.
102 1 : if rhSize := int64(len(r.readahead.data)); rhSize > 0 {
103 1 : if r.readahead.offset <= offset && r.readahead.offset+rhSize > offset {
104 1 : n := copy(p, r.readahead.data[offset-r.readahead.offset:])
105 1 : if n == len(p) {
106 1 : // All data was available.
107 1 : return nil
108 1 : }
109 : // Use the data that we had and do a shorter read.
110 0 : offset += int64(n)
111 0 : p = p[n:]
112 0 : readaheadSize -= n
113 : }
114 : }
115 :
116 1 : if readaheadSize > len(p) {
117 1 : // Don't try to read past EOF.
118 1 : if offset+int64(readaheadSize) > r.readable.size {
119 1 : readaheadSize = int(r.readable.size - offset)
120 1 : if readaheadSize <= 0 {
121 0 : // This shouldn't happen in practice (Pebble should never try to read
122 0 : // past EOF).
123 0 : return io.EOF
124 0 : }
125 : }
126 1 : r.readahead.offset = offset
127 1 : // TODO(radu): we need to somehow account for this memory.
128 1 : if cap(r.readahead.data) >= readaheadSize {
129 0 : r.readahead.data = r.readahead.data[:readaheadSize]
130 1 : } else {
131 1 : r.readahead.data = make([]byte, readaheadSize)
132 1 : }
133 :
134 1 : if err := r.readable.readInternal(ctx, r.readahead.data, offset, r.forCompaction); err != nil {
135 0 : // Make sure we don't treat the data as valid next time.
136 0 : r.readahead.data = r.readahead.data[:0]
137 0 : return err
138 0 : }
139 1 : copy(p, r.readahead.data)
140 1 : return nil
141 : }
142 :
143 1 : return r.readable.readInternal(ctx, p, offset, r.forCompaction)
144 : }
145 :
146 1 : func (r *remoteReadHandle) maybeReadahead(offset int64, len int) int {
147 1 : if r.forCompaction {
148 1 : return remoteMaxReadaheadSize
149 1 : }
150 1 : return int(r.readahead.state.maybeReadahead(offset, int64(len)))
151 : }
152 :
153 : // Close is part of the objstorage.ReadHandle interface.
154 1 : func (r *remoteReadHandle) Close() error {
155 1 : r.readable = nil
156 1 : r.readahead.data = nil
157 1 : return nil
158 1 : }
159 :
160 : // SetupForCompaction is part of the objstorage.ReadHandle interface.
161 1 : func (r *remoteReadHandle) SetupForCompaction() {
162 1 : r.forCompaction = true
163 1 : }
164 :
165 : // RecordCacheHit is part of the objstorage.ReadHandle interface.
166 1 : func (r *remoteReadHandle) RecordCacheHit(_ context.Context, offset, size int64) {
167 1 : if !r.forCompaction {
168 1 : r.readahead.state.recordCacheHit(offset, size)
169 1 : }
170 : }
|