Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package objstorageprovider
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "runtime"
11 : "slices"
12 : "sync"
13 : "sync/atomic"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/invariants"
18 : "github.com/cockroachdb/pebble/objstorage"
19 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
20 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
21 : "github.com/cockroachdb/pebble/objstorage/remote"
22 : "github.com/cockroachdb/redact"
23 : )
24 :
25 : // remoteSubsystem contains the provider fields related to remote storage.
26 : // All fields remain unset if remote storage is not configured.
27 : type remoteSubsystem struct {
28 : catalog *remoteobjcat.Catalog
29 : // catalogSyncMutex is used to correctly serialize two sharedSync operations.
30 : // It must be acquired before the provider mutex.
31 : catalogSyncMutex sync.Mutex
32 :
33 : cache *sharedcache.Cache
34 :
35 : // shared contains the fields relevant to shared objects, i.e. objects that
36 : // are created by Pebble and potentially shared between Pebble instances.
37 : shared struct {
38 : // initialized guards access to the creatorID field.
39 : initialized atomic.Bool
40 : creatorID objstorage.CreatorID
41 : initOnce sync.Once
42 :
43 : // checkRefsOnOpen controls whether we check the ref marker file when opening
44 : // an object. Normally this is true when invariants are enabled (but the provider
45 : // test tweaks this field).
46 : checkRefsOnOpen bool
47 : }
48 : }
49 :
50 : type remoteLockedState struct {
51 : // catalogBatch accumulates remote object creations and deletions until
52 : // Sync is called.
53 : catalogBatch remoteobjcat.Batch
54 :
55 : storageObjects map[remote.Locator]remote.Storage
56 :
57 : externalObjects map[remote.ObjectKey][]base.DiskFileNum
58 : }
59 :
60 1 : func (rs *remoteLockedState) addExternalObject(meta objstorage.ObjectMetadata) {
61 1 : if rs.externalObjects == nil {
62 1 : rs.externalObjects = make(map[remote.ObjectKey][]base.DiskFileNum)
63 1 : }
64 1 : key := remote.MakeObjectKey(meta.Remote.Locator, meta.Remote.CustomObjectName)
65 1 : rs.externalObjects[key] = append(rs.externalObjects[key], meta.DiskFileNum)
66 : }
67 :
68 1 : func (rs *remoteLockedState) removeExternalObject(meta objstorage.ObjectMetadata) {
69 1 : key := remote.MakeObjectKey(meta.Remote.Locator, meta.Remote.CustomObjectName)
70 1 : newSlice := slices.DeleteFunc(rs.externalObjects[key], func(n base.DiskFileNum) bool {
71 1 : return n == meta.DiskFileNum
72 1 : })
73 1 : if len(newSlice) == 0 {
74 1 : delete(rs.externalObjects, key)
75 1 : } else {
76 1 : rs.externalObjects[key] = newSlice
77 1 : }
78 : }
79 :
80 : func (rs *remoteLockedState) getExternalObjects(
81 : locator remote.Locator, objName string,
82 1 : ) []base.DiskFileNum {
83 1 : key := remote.MakeObjectKey(locator, objName)
84 1 : return rs.externalObjects[key]
85 1 : }
86 :
87 : // remoteInit initializes the remote object subsystem (if configured) and finds
88 : // any remote objects.
89 1 : func (p *provider) remoteInit() error {
90 1 : if p.st.Remote.StorageFactory == nil {
91 1 : return nil
92 1 : }
93 1 : catalog, contents, err := remoteobjcat.Open(p.st.FS, p.st.FSDirName)
94 1 : if err != nil {
95 0 : return errors.Wrapf(err, "pebble: could not open remote object catalog")
96 0 : }
97 1 : p.remote.catalog = catalog
98 1 : p.remote.shared.checkRefsOnOpen = invariants.Enabled
99 1 :
100 1 : // The creator ID may or may not be initialized yet.
101 1 : if contents.CreatorID.IsSet() {
102 1 : p.remote.initShared(contents.CreatorID)
103 1 : p.st.Logger.Infof("remote storage configured; creatorID = %s", contents.CreatorID)
104 1 : } else {
105 1 : p.st.Logger.Infof("remote storage configured; no creatorID yet")
106 1 : }
107 :
108 1 : if p.st.Remote.CacheSizeBytes > 0 {
109 0 : const defaultBlockSize = 32 * 1024
110 0 : blockSize := p.st.Remote.CacheBlockSize
111 0 : if blockSize == 0 {
112 0 : blockSize = defaultBlockSize
113 0 : }
114 :
115 0 : const defaultShardingBlockSize = 1024 * 1024
116 0 : shardingBlockSize := p.st.Remote.ShardingBlockSize
117 0 : if shardingBlockSize == 0 {
118 0 : shardingBlockSize = defaultShardingBlockSize
119 0 : }
120 :
121 0 : numShards := p.st.Remote.CacheShardCount
122 0 : if numShards == 0 {
123 0 : numShards = 2 * runtime.GOMAXPROCS(0)
124 0 : }
125 :
126 0 : p.remote.cache, err = sharedcache.Open(
127 0 : p.st.FS, p.st.Logger, p.st.FSDirName, blockSize, shardingBlockSize, p.st.Remote.CacheSizeBytes, numShards)
128 0 : if err != nil {
129 0 : return errors.Wrapf(err, "pebble: could not open remote object cache")
130 0 : }
131 : }
132 :
133 1 : for _, meta := range contents.Objects {
134 1 : o := objstorage.ObjectMetadata{
135 1 : DiskFileNum: meta.FileNum,
136 1 : FileType: meta.FileType,
137 1 : }
138 1 : o.Remote.CreatorID = meta.CreatorID
139 1 : o.Remote.CreatorFileNum = meta.CreatorFileNum
140 1 : o.Remote.CleanupMethod = meta.CleanupMethod
141 1 : o.Remote.Locator = meta.Locator
142 1 : o.Remote.CustomObjectName = meta.CustomObjectName
143 1 : o.Remote.Storage, err = p.ensureStorageLocked(o.Remote.Locator)
144 1 : if err != nil {
145 0 : return errors.Wrapf(err, "creating remote.Storage object for locator '%s'", o.Remote.Locator)
146 0 : }
147 1 : if invariants.Enabled {
148 1 : o.AssertValid()
149 1 : }
150 1 : p.mu.knownObjects[o.DiskFileNum] = o
151 1 : if o.IsExternal() {
152 1 : p.mu.remote.addExternalObject(o)
153 1 : }
154 : }
155 1 : return nil
156 : }
157 :
158 : // initShared initializes the creator ID, allowing use of shared objects.
159 1 : func (ss *remoteSubsystem) initShared(creatorID objstorage.CreatorID) {
160 1 : ss.shared.initOnce.Do(func() {
161 1 : ss.shared.creatorID = creatorID
162 1 : ss.shared.initialized.Store(true)
163 1 : })
164 : }
165 :
166 1 : func (p *provider) sharedClose() error {
167 1 : if p.st.Remote.StorageFactory == nil {
168 1 : return nil
169 1 : }
170 1 : err := p.sharedSync()
171 1 : if p.remote.cache != nil {
172 0 : err = firstError(err, p.remote.cache.Close())
173 0 : p.remote.cache = nil
174 0 : }
175 1 : if p.remote.catalog != nil {
176 1 : err = firstError(err, p.remote.catalog.Close())
177 1 : p.remote.catalog = nil
178 1 : }
179 1 : return err
180 : }
181 :
182 : // SetCreatorID is part of the objstorage.Provider interface.
183 1 : func (p *provider) SetCreatorID(creatorID objstorage.CreatorID) error {
184 1 : if p.st.Remote.StorageFactory == nil {
185 0 : return base.AssertionFailedf("attempt to set CreatorID but remote storage not enabled")
186 0 : }
187 : // Note: this call is a cheap no-op if the creator ID was already set. This
188 : // call also checks if we are trying to change the ID.
189 1 : if err := p.remote.catalog.SetCreatorID(creatorID); err != nil {
190 0 : return err
191 0 : }
192 1 : if !p.remote.shared.initialized.Load() {
193 1 : p.st.Logger.Infof("remote storage creatorID set to %s", creatorID)
194 1 : p.remote.initShared(creatorID)
195 1 : }
196 1 : return nil
197 : }
198 :
199 : // IsSharedForeign is part of the objstorage.Provider interface.
200 1 : func (p *provider) IsSharedForeign(meta objstorage.ObjectMetadata) bool {
201 1 : if !p.remote.shared.initialized.Load() {
202 0 : return false
203 0 : }
204 1 : return meta.IsShared() && (meta.Remote.CreatorID != p.remote.shared.creatorID)
205 : }
206 :
207 1 : func (p *provider) remoteCheckInitialized() error {
208 1 : if p.st.Remote.StorageFactory == nil {
209 0 : return errors.Errorf("remote object support not configured")
210 0 : }
211 1 : return nil
212 : }
213 :
214 1 : func (p *provider) sharedCheckInitialized() error {
215 1 : if err := p.remoteCheckInitialized(); err != nil {
216 0 : return err
217 0 : }
218 1 : if !p.remote.shared.initialized.Load() {
219 0 : return errors.Errorf("remote object support not available: remote creator ID not yet set")
220 0 : }
221 1 : return nil
222 : }
223 :
224 1 : func (p *provider) sharedSync() error {
225 1 : // Serialize parallel sync operations. Note that ApplyBatch is already
226 1 : // serialized internally, but we want to make sure they get called with
227 1 : // batches in the right order.
228 1 : p.remote.catalogSyncMutex.Lock()
229 1 : defer p.remote.catalogSyncMutex.Unlock()
230 1 :
231 1 : batch := func() remoteobjcat.Batch {
232 1 : p.mu.Lock()
233 1 : defer p.mu.Unlock()
234 1 : res := p.mu.remote.catalogBatch.Copy()
235 1 : p.mu.remote.catalogBatch.Reset()
236 1 : return res
237 1 : }()
238 :
239 1 : if batch.IsEmpty() {
240 1 : return nil
241 1 : }
242 :
243 1 : if err := p.remote.catalog.ApplyBatch(batch); err != nil {
244 0 : // Put back the batch (for the next Sync), appending any operations that
245 0 : // happened in the meantime.
246 0 : p.mu.Lock()
247 0 : defer p.mu.Unlock()
248 0 : batch.Append(p.mu.remote.catalogBatch)
249 0 : p.mu.remote.catalogBatch = batch
250 0 : return err
251 0 : }
252 :
253 1 : return nil
254 : }
255 :
256 1 : func (p *provider) remotePath(meta objstorage.ObjectMetadata) string {
257 1 : if meta.Remote.Locator != "" {
258 1 : return fmt.Sprintf("remote-%s://%s", meta.Remote.Locator, remoteObjectName(meta))
259 1 : }
260 1 : return "remote://" + remoteObjectName(meta)
261 : }
262 :
263 : // sharedCreateRef creates a reference marker object.
264 1 : func (p *provider) sharedCreateRef(meta objstorage.ObjectMetadata) error {
265 1 : if err := p.sharedCheckInitialized(); err != nil {
266 0 : return err
267 0 : }
268 1 : if meta.Remote.CleanupMethod != objstorage.SharedRefTracking {
269 1 : return nil
270 1 : }
271 1 : refName := p.sharedObjectRefName(meta)
272 1 : writer, err := meta.Remote.Storage.CreateObject(refName)
273 1 : if err == nil {
274 1 : // The object is empty, just close the writer.
275 1 : err = writer.Close()
276 1 : }
277 1 : if err != nil {
278 0 : return errors.Wrapf(err, "creating marker object %q", errors.Safe(refName))
279 0 : }
280 1 : return nil
281 : }
282 :
283 : func (p *provider) sharedCreate(
284 : _ context.Context,
285 : fileType base.FileType,
286 : fileNum base.DiskFileNum,
287 : locator remote.Locator,
288 : opts objstorage.CreateOptions,
289 1 : ) (objstorage.Writable, objstorage.ObjectMetadata, error) {
290 1 : if err := p.sharedCheckInitialized(); err != nil {
291 0 : return nil, objstorage.ObjectMetadata{}, err
292 0 : }
293 1 : storage, err := p.ensureStorage(locator)
294 1 : if err != nil {
295 0 : return nil, objstorage.ObjectMetadata{}, err
296 0 : }
297 1 : meta := objstorage.ObjectMetadata{
298 1 : DiskFileNum: fileNum,
299 1 : FileType: fileType,
300 1 : }
301 1 : meta.Remote.CreatorID = p.remote.shared.creatorID
302 1 : meta.Remote.CreatorFileNum = fileNum
303 1 : meta.Remote.CleanupMethod = opts.SharedCleanupMethod
304 1 : meta.Remote.Locator = locator
305 1 : meta.Remote.Storage = storage
306 1 :
307 1 : objName := remoteObjectName(meta)
308 1 : writer, err := storage.CreateObject(objName)
309 1 : if err != nil {
310 0 : return nil, objstorage.ObjectMetadata{}, errors.Wrapf(err, "creating object %q", errors.Safe(objName))
311 0 : }
312 1 : return &sharedWritable{
313 1 : p: p,
314 1 : meta: meta,
315 1 : storageWriter: writer,
316 1 : }, meta, nil
317 : }
318 :
319 : func (p *provider) remoteOpenForReading(
320 : ctx context.Context, meta objstorage.ObjectMetadata, opts objstorage.OpenOptions,
321 1 : ) (objstorage.Readable, error) {
322 1 : if err := p.remoteCheckInitialized(); err != nil {
323 0 : return nil, err
324 0 : }
325 : // Verify we have a reference on this object; for performance reasons, we only
326 : // do this in testing scenarios.
327 1 : if p.remote.shared.checkRefsOnOpen && meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
328 1 : if err := p.sharedCheckInitialized(); err != nil {
329 0 : return nil, err
330 0 : }
331 1 : refName := p.sharedObjectRefName(meta)
332 1 : if _, err := meta.Remote.Storage.Size(refName); err != nil {
333 0 : if meta.Remote.Storage.IsNotExistError(err) {
334 0 : if opts.MustExist {
335 0 : p.st.Logger.Fatalf("marker object %q does not exist", errors.Safe(refName))
336 0 : // TODO(radu): maybe list references for the object.
337 0 : }
338 0 : return nil, errors.Errorf("marker object %q does not exist", errors.Safe(refName))
339 : }
340 0 : return nil, errors.Wrapf(err, "checking marker object %q", errors.Safe(refName))
341 : }
342 : }
343 1 : objName := remoteObjectName(meta)
344 1 : reader, size, err := meta.Remote.Storage.ReadObject(ctx, objName)
345 1 : if err != nil {
346 1 : if opts.MustExist && meta.Remote.Storage.IsNotExistError(err) {
347 0 : p.st.Logger.Fatalf("object %q does not exist", redact.SafeString(objName))
348 0 : // TODO(radu): maybe list references for the object.
349 0 : }
350 1 : return nil, err
351 : }
352 1 : return p.newRemoteReadable(reader, size, meta.DiskFileNum), nil
353 : }
354 :
355 1 : func (p *provider) remoteSize(meta objstorage.ObjectMetadata) (int64, error) {
356 1 : if err := p.remoteCheckInitialized(); err != nil {
357 0 : return 0, err
358 0 : }
359 1 : objName := remoteObjectName(meta)
360 1 : return meta.Remote.Storage.Size(objName)
361 : }
362 :
363 : // sharedUnref implements object "removal" with the remote backend. The ref
364 : // marker object is removed and the backing object is removed only if there are
365 : // no other ref markers.
366 1 : func (p *provider) sharedUnref(meta objstorage.ObjectMetadata) error {
367 1 : if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
368 1 : // Never delete objects in this mode.
369 1 : return nil
370 1 : }
371 1 : if p.isProtected(meta.DiskFileNum) {
372 1 : // TODO(radu): we need a mechanism to unref the object when it becomes
373 1 : // unprotected.
374 1 : return nil
375 1 : }
376 :
377 1 : refName := p.sharedObjectRefName(meta)
378 1 : // Tolerate a not-exists error.
379 1 : if err := meta.Remote.Storage.Delete(refName); err != nil && !meta.Remote.Storage.IsNotExistError(err) {
380 0 : return err
381 0 : }
382 1 : otherRefs, err := meta.Remote.Storage.List(sharedObjectRefPrefix(meta), "" /* delimiter */)
383 1 : if err != nil {
384 0 : return err
385 0 : }
386 1 : if len(otherRefs) == 0 {
387 1 : objName := remoteObjectName(meta)
388 1 : if err := meta.Remote.Storage.Delete(objName); err != nil && !meta.Remote.Storage.IsNotExistError(err) {
389 0 : return err
390 0 : }
391 : }
392 1 : return nil
393 : }
394 :
395 : // ensureStorageLocked populates the remote.Storage object for the given
396 : // locator, if necessary. p.mu must be held.
397 1 : func (p *provider) ensureStorageLocked(locator remote.Locator) (remote.Storage, error) {
398 1 : if p.mu.remote.storageObjects == nil {
399 1 : p.mu.remote.storageObjects = make(map[remote.Locator]remote.Storage)
400 1 : }
401 1 : if res, ok := p.mu.remote.storageObjects[locator]; ok {
402 1 : return res, nil
403 1 : }
404 1 : res, err := p.st.Remote.StorageFactory.CreateStorage(locator)
405 1 : if err != nil {
406 1 : return nil, err
407 1 : }
408 :
409 1 : p.mu.remote.storageObjects[locator] = res
410 1 : return res, nil
411 : }
412 :
413 : // ensureStorage populates the remote.Storage object for the given locator, if necessary.
414 1 : func (p *provider) ensureStorage(locator remote.Locator) (remote.Storage, error) {
415 1 : p.mu.Lock()
416 1 : defer p.mu.Unlock()
417 1 : return p.ensureStorageLocked(locator)
418 1 : }
419 :
420 : // GetExternalObjects is part of the Provider interface.
421 1 : func (p *provider) GetExternalObjects(locator remote.Locator, objName string) []base.DiskFileNum {
422 1 : p.mu.Lock()
423 1 : defer p.mu.Unlock()
424 1 : return slices.Clone(p.mu.remote.getExternalObjects(locator, objName))
425 1 : }
|