Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package objstorageprovider
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "runtime"
11 : "sync"
12 : "sync/atomic"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/invariants"
17 : "github.com/cockroachdb/pebble/objstorage"
18 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
19 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
20 : "github.com/cockroachdb/pebble/objstorage/remote"
21 : )
22 :
23 : // remoteSubsystem contains the provider fields related to remote storage.
24 : // All fields remain unset if remote storage is not configured.
25 : type remoteSubsystem struct {
26 : catalog *remoteobjcat.Catalog
27 : // catalogSyncMutex is used to correctly serialize two sharedSync operations.
28 : // It must be acquired before the provider mutex.
29 : catalogSyncMutex sync.Mutex
30 :
31 : cache *sharedcache.Cache
32 :
33 : // shared contains the fields relevant to shared objects, i.e. objects that
34 : // are created by Pebble and potentially shared between Pebble instances.
35 : shared struct {
36 : // initialized guards access to the creatorID field.
37 : initialized atomic.Bool
38 : creatorID objstorage.CreatorID
39 : initOnce sync.Once
40 :
41 : // checkRefsOnOpen controls whether we check the ref marker file when opening
42 : // an object. Normally this is true when invariants are enabled (but the provider
43 : // test tweaks this field).
44 : checkRefsOnOpen bool
45 : }
46 : }
47 :
48 : // remoteInit initializes the remote object subsystem (if configured) and finds
49 : // any remote objects.
50 2 : func (p *provider) remoteInit() error {
51 2 : if p.st.Remote.StorageFactory == nil {
52 2 : return nil
53 2 : }
54 2 : catalog, contents, err := remoteobjcat.Open(p.st.FS, p.st.FSDirName)
55 2 : if err != nil {
56 0 : return errors.Wrapf(err, "pebble: could not open remote object catalog")
57 0 : }
58 2 : p.remote.catalog = catalog
59 2 : p.remote.shared.checkRefsOnOpen = invariants.Enabled
60 2 :
61 2 : // The creator ID may or may not be initialized yet.
62 2 : if contents.CreatorID.IsSet() {
63 2 : p.remote.initShared(contents.CreatorID)
64 2 : p.st.Logger.Infof("remote storage configured; creatorID = %s", contents.CreatorID)
65 2 : } else {
66 2 : p.st.Logger.Infof("remote storage configured; no creatorID yet")
67 2 : }
68 :
69 2 : if p.st.Remote.CacheSizeBytes > 0 {
70 1 : const defaultBlockSize = 32 * 1024
71 1 : blockSize := p.st.Remote.CacheBlockSize
72 1 : if blockSize == 0 {
73 1 : blockSize = defaultBlockSize
74 1 : }
75 :
76 1 : const defaultShardingBlockSize = 1024 * 1024
77 1 : shardingBlockSize := p.st.Remote.ShardingBlockSize
78 1 : if shardingBlockSize == 0 {
79 1 : shardingBlockSize = defaultShardingBlockSize
80 1 : }
81 :
82 1 : numShards := p.st.Remote.CacheShardCount
83 1 : if numShards == 0 {
84 1 : numShards = 2 * runtime.GOMAXPROCS(0)
85 1 : }
86 :
87 1 : p.remote.cache, err = sharedcache.Open(
88 1 : p.st.FS, p.st.Logger, p.st.FSDirName, blockSize, shardingBlockSize, p.st.Remote.CacheSizeBytes, numShards)
89 1 : if err != nil {
90 0 : return errors.Wrapf(err, "pebble: could not open remote object cache")
91 0 : }
92 : }
93 :
94 2 : for _, meta := range contents.Objects {
95 2 : o := objstorage.ObjectMetadata{
96 2 : DiskFileNum: meta.FileNum,
97 2 : FileType: meta.FileType,
98 2 : }
99 2 : o.Remote.CreatorID = meta.CreatorID
100 2 : o.Remote.CreatorFileNum = meta.CreatorFileNum
101 2 : o.Remote.CleanupMethod = meta.CleanupMethod
102 2 : o.Remote.Locator = meta.Locator
103 2 : o.Remote.CustomObjectName = meta.CustomObjectName
104 2 : o.Remote.Storage, err = p.ensureStorageLocked(o.Remote.Locator)
105 2 : if err != nil {
106 0 : return errors.Wrapf(err, "creating remote.Storage object for locator '%s'", o.Remote.Locator)
107 0 : }
108 2 : if invariants.Enabled {
109 0 : o.AssertValid()
110 0 : }
111 2 : p.mu.knownObjects[o.DiskFileNum] = o
112 : }
113 2 : return nil
114 : }
115 :
116 : // initShared initializes the creator ID, allowing use of shared objects.
117 2 : func (ss *remoteSubsystem) initShared(creatorID objstorage.CreatorID) {
118 2 : ss.shared.initOnce.Do(func() {
119 2 : ss.shared.creatorID = creatorID
120 2 : ss.shared.initialized.Store(true)
121 2 : })
122 : }
123 :
124 2 : func (p *provider) sharedClose() error {
125 2 : if p.st.Remote.StorageFactory == nil {
126 2 : return nil
127 2 : }
128 2 : var err error
129 2 : if p.remote.cache != nil {
130 1 : err = p.remote.cache.Close()
131 1 : p.remote.cache = nil
132 1 : }
133 2 : if p.remote.catalog != nil {
134 2 : err = firstError(err, p.remote.catalog.Close())
135 2 : p.remote.catalog = nil
136 2 : }
137 2 : return err
138 : }
139 :
140 : // SetCreatorID is part of the objstorage.Provider interface.
141 2 : func (p *provider) SetCreatorID(creatorID objstorage.CreatorID) error {
142 2 : if p.st.Remote.StorageFactory == nil {
143 0 : return errors.AssertionFailedf("attempt to set CreatorID but remote storage not enabled")
144 0 : }
145 : // Note: this call is a cheap no-op if the creator ID was already set. This
146 : // call also checks if we are trying to change the ID.
147 2 : if err := p.remote.catalog.SetCreatorID(creatorID); err != nil {
148 0 : return err
149 0 : }
150 2 : if !p.remote.shared.initialized.Load() {
151 2 : p.st.Logger.Infof("remote storage creatorID set to %s", creatorID)
152 2 : p.remote.initShared(creatorID)
153 2 : }
154 2 : return nil
155 : }
156 :
157 : // IsSharedForeign is part of the objstorage.Provider interface.
158 2 : func (p *provider) IsSharedForeign(meta objstorage.ObjectMetadata) bool {
159 2 : if !p.remote.shared.initialized.Load() {
160 2 : return false
161 2 : }
162 2 : return meta.IsShared() && (meta.Remote.CreatorID != p.remote.shared.creatorID)
163 : }
164 :
165 2 : func (p *provider) remoteCheckInitialized() error {
166 2 : if p.st.Remote.StorageFactory == nil {
167 0 : return errors.Errorf("remote object support not configured")
168 0 : }
169 2 : return nil
170 : }
171 :
172 2 : func (p *provider) sharedCheckInitialized() error {
173 2 : if err := p.remoteCheckInitialized(); err != nil {
174 0 : return err
175 0 : }
176 2 : if !p.remote.shared.initialized.Load() {
177 0 : return errors.Errorf("remote object support not available: remote creator ID not yet set")
178 0 : }
179 2 : return nil
180 : }
181 :
182 2 : func (p *provider) sharedSync() error {
183 2 : // Serialize parallel sync operations. Note that ApplyBatch is already
184 2 : // serialized internally, but we want to make sure they get called with
185 2 : // batches in the right order.
186 2 : p.remote.catalogSyncMutex.Lock()
187 2 : defer p.remote.catalogSyncMutex.Unlock()
188 2 :
189 2 : batch := func() remoteobjcat.Batch {
190 2 : p.mu.Lock()
191 2 : defer p.mu.Unlock()
192 2 : res := p.mu.remote.catalogBatch.Copy()
193 2 : p.mu.remote.catalogBatch.Reset()
194 2 : return res
195 2 : }()
196 :
197 2 : if batch.IsEmpty() {
198 2 : return nil
199 2 : }
200 :
201 2 : if err := p.remote.catalog.ApplyBatch(batch); err != nil {
202 0 : // Put back the batch (for the next Sync), appending any operations that
203 0 : // happened in the meantime.
204 0 : p.mu.Lock()
205 0 : defer p.mu.Unlock()
206 0 : batch.Append(p.mu.remote.catalogBatch)
207 0 : p.mu.remote.catalogBatch = batch
208 0 : return err
209 0 : }
210 :
211 2 : return nil
212 : }
213 :
214 2 : func (p *provider) remotePath(meta objstorage.ObjectMetadata) string {
215 2 : if meta.Remote.Locator != "" {
216 1 : return fmt.Sprintf("remote-%s://%s", meta.Remote.Locator, remoteObjectName(meta))
217 1 : }
218 2 : return "remote://" + remoteObjectName(meta)
219 : }
220 :
221 : // sharedCreateRef creates a reference marker object.
222 2 : func (p *provider) sharedCreateRef(meta objstorage.ObjectMetadata) error {
223 2 : if err := p.sharedCheckInitialized(); err != nil {
224 0 : return err
225 0 : }
226 2 : if meta.Remote.CleanupMethod != objstorage.SharedRefTracking {
227 1 : return nil
228 1 : }
229 2 : refName := p.sharedObjectRefName(meta)
230 2 : writer, err := meta.Remote.Storage.CreateObject(refName)
231 2 : if err == nil {
232 2 : // The object is empty, just close the writer.
233 2 : err = writer.Close()
234 2 : }
235 2 : if err != nil {
236 0 : return errors.Wrapf(err, "creating marker object %q", refName)
237 0 : }
238 2 : return nil
239 : }
240 :
241 : func (p *provider) sharedCreate(
242 : _ context.Context,
243 : fileType base.FileType,
244 : fileNum base.DiskFileNum,
245 : locator remote.Locator,
246 : opts objstorage.CreateOptions,
247 2 : ) (objstorage.Writable, objstorage.ObjectMetadata, error) {
248 2 : if err := p.sharedCheckInitialized(); err != nil {
249 0 : return nil, objstorage.ObjectMetadata{}, err
250 0 : }
251 2 : storage, err := p.ensureStorage(locator)
252 2 : if err != nil {
253 0 : return nil, objstorage.ObjectMetadata{}, err
254 0 : }
255 2 : meta := objstorage.ObjectMetadata{
256 2 : DiskFileNum: fileNum,
257 2 : FileType: fileType,
258 2 : }
259 2 : meta.Remote.CreatorID = p.remote.shared.creatorID
260 2 : meta.Remote.CreatorFileNum = fileNum
261 2 : meta.Remote.CleanupMethod = opts.SharedCleanupMethod
262 2 : meta.Remote.Locator = locator
263 2 : meta.Remote.Storage = storage
264 2 :
265 2 : objName := remoteObjectName(meta)
266 2 : writer, err := storage.CreateObject(objName)
267 2 : if err != nil {
268 0 : return nil, objstorage.ObjectMetadata{}, errors.Wrapf(err, "creating object %q", objName)
269 0 : }
270 2 : return &sharedWritable{
271 2 : p: p,
272 2 : meta: meta,
273 2 : storageWriter: writer,
274 2 : }, meta, nil
275 : }
276 :
277 : func (p *provider) remoteOpenForReading(
278 : ctx context.Context, meta objstorage.ObjectMetadata, opts objstorage.OpenOptions,
279 2 : ) (objstorage.Readable, error) {
280 2 : if err := p.remoteCheckInitialized(); err != nil {
281 0 : return nil, err
282 0 : }
283 : // Verify we have a reference on this object; for performance reasons, we only
284 : // do this in testing scenarios.
285 2 : if p.remote.shared.checkRefsOnOpen && meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
286 1 : if err := p.sharedCheckInitialized(); err != nil {
287 0 : return nil, err
288 0 : }
289 1 : refName := p.sharedObjectRefName(meta)
290 1 : if _, err := meta.Remote.Storage.Size(refName); err != nil {
291 0 : if meta.Remote.Storage.IsNotExistError(err) {
292 0 : if opts.MustExist {
293 0 : p.st.Logger.Fatalf("marker object %q does not exist", refName)
294 0 : // TODO(radu): maybe list references for the object.
295 0 : }
296 0 : return nil, errors.Errorf("marker object %q does not exist", refName)
297 : }
298 0 : return nil, errors.Wrapf(err, "checking marker object %q", refName)
299 : }
300 : }
301 2 : objName := remoteObjectName(meta)
302 2 : reader, size, err := meta.Remote.Storage.ReadObject(ctx, objName)
303 2 : if err != nil {
304 1 : if opts.MustExist && meta.Remote.Storage.IsNotExistError(err) {
305 0 : p.st.Logger.Fatalf("object %q does not exist", objName)
306 0 : // TODO(radu): maybe list references for the object.
307 0 : }
308 1 : return nil, err
309 : }
310 2 : return p.newRemoteReadable(reader, size, meta.DiskFileNum), nil
311 : }
312 :
313 2 : func (p *provider) remoteSize(meta objstorage.ObjectMetadata) (int64, error) {
314 2 : if err := p.remoteCheckInitialized(); err != nil {
315 0 : return 0, err
316 0 : }
317 2 : objName := remoteObjectName(meta)
318 2 : return meta.Remote.Storage.Size(objName)
319 : }
320 :
321 : // sharedUnref implements object "removal" with the remote backend. The ref
322 : // marker object is removed and the backing object is removed only if there are
323 : // no other ref markers.
324 2 : func (p *provider) sharedUnref(meta objstorage.ObjectMetadata) error {
325 2 : if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
326 1 : // Never delete objects in this mode.
327 1 : return nil
328 1 : }
329 2 : if p.isProtected(meta.DiskFileNum) {
330 1 : // TODO(radu): we need a mechanism to unref the object when it becomes
331 1 : // unprotected.
332 1 : return nil
333 1 : }
334 :
335 2 : refName := p.sharedObjectRefName(meta)
336 2 : // Tolerate a not-exists error.
337 2 : if err := meta.Remote.Storage.Delete(refName); err != nil && !meta.Remote.Storage.IsNotExistError(err) {
338 0 : return err
339 0 : }
340 2 : otherRefs, err := meta.Remote.Storage.List(sharedObjectRefPrefix(meta), "" /* delimiter */)
341 2 : if err != nil {
342 0 : return err
343 0 : }
344 2 : if len(otherRefs) == 0 {
345 2 : objName := remoteObjectName(meta)
346 2 : if err := meta.Remote.Storage.Delete(objName); err != nil && !meta.Remote.Storage.IsNotExistError(err) {
347 0 : return err
348 0 : }
349 : }
350 2 : return nil
351 : }
352 :
353 : // ensureStorageLocked populates the remote.Storage object for the given
354 : // locator, if necessary. p.mu must be held.
355 2 : func (p *provider) ensureStorageLocked(locator remote.Locator) (remote.Storage, error) {
356 2 : if p.mu.remote.storageObjects == nil {
357 2 : p.mu.remote.storageObjects = make(map[remote.Locator]remote.Storage)
358 2 : }
359 2 : if res, ok := p.mu.remote.storageObjects[locator]; ok {
360 2 : return res, nil
361 2 : }
362 2 : res, err := p.st.Remote.StorageFactory.CreateStorage(locator)
363 2 : if err != nil {
364 1 : return nil, err
365 1 : }
366 :
367 2 : p.mu.remote.storageObjects[locator] = res
368 2 : return res, nil
369 : }
370 :
371 : // ensureStorage populates the remote.Storage object for the given locator, if necessary.
372 2 : func (p *provider) ensureStorage(locator remote.Locator) (remote.Storage, error) {
373 2 : p.mu.Lock()
374 2 : defer p.mu.Unlock()
375 2 : return p.ensureStorageLocked(locator)
376 2 : }
|