Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package objstorageprovider
6 :
7 : import (
8 : "context"
9 : "io"
10 :
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/objstorage"
13 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
14 : "github.com/cockroachdb/pebble/objstorage/remote"
15 : )
16 :
17 : const remoteMaxReadaheadSize = 1024 * 1024 /* 1MB */
18 :
19 : // remoteReadable is a very simple implementation of Readable on top of the
20 : // ReadCloser returned by remote.Storage.CreateObject.
21 : type remoteReadable struct {
22 : objReader remote.ObjectReader
23 : size int64
24 : fileNum base.DiskFileNum
25 : provider *provider
26 : }
27 :
28 : var _ objstorage.Readable = (*remoteReadable)(nil)
29 :
30 : func (p *provider) newRemoteReadable(
31 : objReader remote.ObjectReader, size int64, fileNum base.DiskFileNum,
32 1 : ) *remoteReadable {
33 1 : return &remoteReadable{
34 1 : objReader: objReader,
35 1 : size: size,
36 1 : fileNum: fileNum,
37 1 : provider: p,
38 1 : }
39 1 : }
40 :
41 : // ReadAt is part of the objstorage.Readable interface.
42 1 : func (r *remoteReadable) ReadAt(ctx context.Context, p []byte, offset int64) error {
43 1 : return r.readInternal(ctx, p, offset, false /* forCompaction */)
44 1 : }
45 :
46 : // readInternal performs a read for the object, using the cache when
47 : // appropriate.
48 : func (r *remoteReadable) readInternal(
49 : ctx context.Context, p []byte, offset int64, forCompaction bool,
50 1 : ) error {
51 1 : if cache := r.provider.remote.cache; cache != nil {
52 0 : flags := sharedcache.ReadFlags{
53 0 : // Don't add data to the cache if this read is for a compaction.
54 0 : ReadOnly: forCompaction,
55 0 : }
56 0 : return r.provider.remote.cache.ReadAt(ctx, r.fileNum, p, offset, r.objReader, r.size, flags)
57 0 : }
58 1 : return r.objReader.ReadAt(ctx, p, offset)
59 : }
60 :
61 1 : func (r *remoteReadable) Close() error {
62 1 : defer func() { r.objReader = nil }()
63 1 : return r.objReader.Close()
64 : }
65 :
66 1 : func (r *remoteReadable) Size() int64 {
67 1 : return r.size
68 1 : }
69 :
70 1 : func (r *remoteReadable) NewReadHandle(_ context.Context) objstorage.ReadHandle {
71 1 : // TODO(radu): use a pool.
72 1 : rh := &remoteReadHandle{readable: r}
73 1 : rh.readahead.state = makeReadaheadState(remoteMaxReadaheadSize)
74 1 : return rh
75 1 : }
76 :
77 : type remoteReadHandle struct {
78 : readable *remoteReadable
79 : readahead struct {
80 : state readaheadState
81 : data []byte
82 : offset int64
83 : }
84 : forCompaction bool
85 : }
86 :
87 : var _ objstorage.ReadHandle = (*remoteReadHandle)(nil)
88 :
89 : // ReadAt is part of the objstorage.ReadHandle interface.
90 1 : func (r *remoteReadHandle) ReadAt(ctx context.Context, p []byte, offset int64) error {
91 1 : readaheadSize := r.maybeReadahead(offset, len(p))
92 1 :
93 1 : // Check if we already have the data from a previous read-ahead.
94 1 : if rhSize := int64(len(r.readahead.data)); rhSize > 0 {
95 1 : if r.readahead.offset <= offset && r.readahead.offset+rhSize > offset {
96 1 : n := copy(p, r.readahead.data[offset-r.readahead.offset:])
97 1 : if n == len(p) {
98 1 : // All data was available.
99 1 : return nil
100 1 : }
101 : // Use the data that we had and do a shorter read.
102 1 : offset += int64(n)
103 1 : p = p[n:]
104 1 : readaheadSize -= n
105 : }
106 : }
107 :
108 1 : if readaheadSize > len(p) {
109 1 : // Don't try to read past EOF.
110 1 : if offset+int64(readaheadSize) > r.readable.size {
111 1 : readaheadSize = int(r.readable.size - offset)
112 1 : if readaheadSize <= 0 {
113 0 : // This shouldn't happen in practice (Pebble should never try to read
114 0 : // past EOF).
115 0 : return io.EOF
116 0 : }
117 : }
118 1 : r.readahead.offset = offset
119 1 : // TODO(radu): we need to somehow account for this memory.
120 1 : if cap(r.readahead.data) >= readaheadSize {
121 1 : r.readahead.data = r.readahead.data[:readaheadSize]
122 1 : } else {
123 1 : r.readahead.data = make([]byte, readaheadSize)
124 1 : }
125 :
126 1 : if err := r.readable.readInternal(ctx, r.readahead.data, offset, r.forCompaction); err != nil {
127 0 : // Make sure we don't treat the data as valid next time.
128 0 : r.readahead.data = r.readahead.data[:0]
129 0 : return err
130 0 : }
131 1 : copy(p, r.readahead.data)
132 1 : return nil
133 : }
134 :
135 1 : return r.readable.readInternal(ctx, p, offset, r.forCompaction)
136 : }
137 :
138 1 : func (r *remoteReadHandle) maybeReadahead(offset int64, len int) int {
139 1 : if r.forCompaction {
140 1 : return remoteMaxReadaheadSize
141 1 : }
142 1 : return int(r.readahead.state.maybeReadahead(offset, int64(len)))
143 : }
144 :
145 : // Close is part of the objstorage.ReadHandle interface.
146 1 : func (r *remoteReadHandle) Close() error {
147 1 : r.readable = nil
148 1 : r.readahead.data = nil
149 1 : return nil
150 1 : }
151 :
152 : // SetupForCompaction is part of the objstorage.ReadHandle interface.
153 1 : func (r *remoteReadHandle) SetupForCompaction() {
154 1 : r.forCompaction = true
155 1 : }
156 :
157 : // RecordCacheHit is part of the objstorage.ReadHandle interface.
158 1 : func (r *remoteReadHandle) RecordCacheHit(_ context.Context, offset, size int64) {
159 1 : if !r.forCompaction {
160 1 : r.readahead.state.recordCacheHit(offset, size)
161 1 : }
162 : }
|