Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package objstorageprovider
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "runtime"
11 : "sync"
12 : "sync/atomic"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/invariants"
17 : "github.com/cockroachdb/pebble/objstorage"
18 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
19 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
20 : "github.com/cockroachdb/pebble/objstorage/remote"
21 : "github.com/cockroachdb/redact"
22 : )
23 :
24 : // remoteSubsystem contains the provider fields related to remote storage.
25 : // All fields remain unset if remote storage is not configured.
26 : type remoteSubsystem struct {
27 : catalog *remoteobjcat.Catalog
28 : // catalogSyncMutex is used to correctly serialize two sharedSync operations.
29 : // It must be acquired before the provider mutex.
30 : catalogSyncMutex sync.Mutex
31 :
32 : cache *sharedcache.Cache
33 :
34 : // shared contains the fields relevant to shared objects, i.e. objects that
35 : // are created by Pebble and potentially shared between Pebble instances.
36 : shared struct {
37 : // initialized guards access to the creatorID field.
38 : initialized atomic.Bool
39 : creatorID objstorage.CreatorID
40 : initOnce sync.Once
41 :
42 : // checkRefsOnOpen controls whether we check the ref marker file when opening
43 : // an object. Normally this is true when invariants are enabled (but the provider
44 : // test tweaks this field).
45 : checkRefsOnOpen bool
46 : }
47 : }
48 :
49 : // remoteInit initializes the remote object subsystem (if configured) and finds
50 : // any remote objects.
51 1 : func (p *provider) remoteInit() error {
52 1 : if p.st.Remote.StorageFactory == nil {
53 1 : return nil
54 1 : }
55 1 : catalog, contents, err := remoteobjcat.Open(p.st.FS, p.st.FSDirName)
56 1 : if err != nil {
57 0 : return errors.Wrapf(err, "pebble: could not open remote object catalog")
58 0 : }
59 1 : p.remote.catalog = catalog
60 1 : p.remote.shared.checkRefsOnOpen = invariants.Enabled
61 1 :
62 1 : // The creator ID may or may not be initialized yet.
63 1 : if contents.CreatorID.IsSet() {
64 1 : p.remote.initShared(contents.CreatorID)
65 1 : p.st.Logger.Infof("remote storage configured; creatorID = %s", contents.CreatorID)
66 1 : } else {
67 1 : p.st.Logger.Infof("remote storage configured; no creatorID yet")
68 1 : }
69 :
70 1 : if p.st.Remote.CacheSizeBytes > 0 {
71 1 : const defaultBlockSize = 32 * 1024
72 1 : blockSize := p.st.Remote.CacheBlockSize
73 1 : if blockSize == 0 {
74 1 : blockSize = defaultBlockSize
75 1 : }
76 :
77 1 : const defaultShardingBlockSize = 1024 * 1024
78 1 : shardingBlockSize := p.st.Remote.ShardingBlockSize
79 1 : if shardingBlockSize == 0 {
80 1 : shardingBlockSize = defaultShardingBlockSize
81 1 : }
82 :
83 1 : numShards := p.st.Remote.CacheShardCount
84 1 : if numShards == 0 {
85 1 : numShards = 2 * runtime.GOMAXPROCS(0)
86 1 : }
87 :
88 1 : p.remote.cache, err = sharedcache.Open(
89 1 : p.st.FS, p.st.Logger, p.st.FSDirName, blockSize, shardingBlockSize, p.st.Remote.CacheSizeBytes, numShards)
90 1 : if err != nil {
91 0 : return errors.Wrapf(err, "pebble: could not open remote object cache")
92 0 : }
93 : }
94 :
95 1 : for _, meta := range contents.Objects {
96 1 : o := objstorage.ObjectMetadata{
97 1 : DiskFileNum: meta.FileNum,
98 1 : FileType: meta.FileType,
99 1 : }
100 1 : o.Remote.CreatorID = meta.CreatorID
101 1 : o.Remote.CreatorFileNum = meta.CreatorFileNum
102 1 : o.Remote.CleanupMethod = meta.CleanupMethod
103 1 : o.Remote.Locator = meta.Locator
104 1 : o.Remote.CustomObjectName = meta.CustomObjectName
105 1 : o.Remote.Storage, err = p.ensureStorageLocked(o.Remote.Locator)
106 1 : if err != nil {
107 0 : return errors.Wrapf(err, "creating remote.Storage object for locator '%s'", o.Remote.Locator)
108 0 : }
109 1 : if invariants.Enabled {
110 1 : o.AssertValid()
111 1 : }
112 1 : p.mu.knownObjects[o.DiskFileNum] = o
113 : }
114 1 : return nil
115 : }
116 :
117 : // initShared initializes the creator ID, allowing use of shared objects.
118 1 : func (ss *remoteSubsystem) initShared(creatorID objstorage.CreatorID) {
119 1 : ss.shared.initOnce.Do(func() {
120 1 : ss.shared.creatorID = creatorID
121 1 : ss.shared.initialized.Store(true)
122 1 : })
123 : }
124 :
125 1 : func (p *provider) sharedClose() error {
126 1 : if p.st.Remote.StorageFactory == nil {
127 1 : return nil
128 1 : }
129 1 : var err error
130 1 : if p.remote.cache != nil {
131 1 : err = p.remote.cache.Close()
132 1 : p.remote.cache = nil
133 1 : }
134 1 : if p.remote.catalog != nil {
135 1 : err = firstError(err, p.remote.catalog.Close())
136 1 : p.remote.catalog = nil
137 1 : }
138 1 : return err
139 : }
140 :
141 : // SetCreatorID is part of the objstorage.Provider interface.
142 1 : func (p *provider) SetCreatorID(creatorID objstorage.CreatorID) error {
143 1 : if p.st.Remote.StorageFactory == nil {
144 0 : return errors.AssertionFailedf("attempt to set CreatorID but remote storage not enabled")
145 0 : }
146 : // Note: this call is a cheap no-op if the creator ID was already set. This
147 : // call also checks if we are trying to change the ID.
148 1 : if err := p.remote.catalog.SetCreatorID(creatorID); err != nil {
149 0 : return err
150 0 : }
151 1 : if !p.remote.shared.initialized.Load() {
152 1 : p.st.Logger.Infof("remote storage creatorID set to %s", creatorID)
153 1 : p.remote.initShared(creatorID)
154 1 : }
155 1 : return nil
156 : }
157 :
158 : // IsSharedForeign is part of the objstorage.Provider interface.
159 1 : func (p *provider) IsSharedForeign(meta objstorage.ObjectMetadata) bool {
160 1 : if !p.remote.shared.initialized.Load() {
161 1 : return false
162 1 : }
163 1 : return meta.IsShared() && (meta.Remote.CreatorID != p.remote.shared.creatorID)
164 : }
165 :
166 1 : func (p *provider) remoteCheckInitialized() error {
167 1 : if p.st.Remote.StorageFactory == nil {
168 0 : return errors.Errorf("remote object support not configured")
169 0 : }
170 1 : return nil
171 : }
172 :
173 1 : func (p *provider) sharedCheckInitialized() error {
174 1 : if err := p.remoteCheckInitialized(); err != nil {
175 0 : return err
176 0 : }
177 1 : if !p.remote.shared.initialized.Load() {
178 0 : return errors.Errorf("remote object support not available: remote creator ID not yet set")
179 0 : }
180 1 : return nil
181 : }
182 :
183 1 : func (p *provider) sharedSync() error {
184 1 : // Serialize parallel sync operations. Note that ApplyBatch is already
185 1 : // serialized internally, but we want to make sure they get called with
186 1 : // batches in the right order.
187 1 : p.remote.catalogSyncMutex.Lock()
188 1 : defer p.remote.catalogSyncMutex.Unlock()
189 1 :
190 1 : batch := func() remoteobjcat.Batch {
191 1 : p.mu.Lock()
192 1 : defer p.mu.Unlock()
193 1 : res := p.mu.remote.catalogBatch.Copy()
194 1 : p.mu.remote.catalogBatch.Reset()
195 1 : return res
196 1 : }()
197 :
198 1 : if batch.IsEmpty() {
199 1 : return nil
200 1 : }
201 :
202 1 : if err := p.remote.catalog.ApplyBatch(batch); err != nil {
203 0 : // Put back the batch (for the next Sync), appending any operations that
204 0 : // happened in the meantime.
205 0 : p.mu.Lock()
206 0 : defer p.mu.Unlock()
207 0 : batch.Append(p.mu.remote.catalogBatch)
208 0 : p.mu.remote.catalogBatch = batch
209 0 : return err
210 0 : }
211 :
212 1 : return nil
213 : }
214 :
215 1 : func (p *provider) remotePath(meta objstorage.ObjectMetadata) string {
216 1 : if meta.Remote.Locator != "" {
217 0 : return fmt.Sprintf("remote-%s://%s", meta.Remote.Locator, remoteObjectName(meta))
218 0 : }
219 1 : return "remote://" + remoteObjectName(meta)
220 : }
221 :
222 : // sharedCreateRef creates a reference marker object.
223 1 : func (p *provider) sharedCreateRef(meta objstorage.ObjectMetadata) error {
224 1 : if err := p.sharedCheckInitialized(); err != nil {
225 0 : return err
226 0 : }
227 1 : if meta.Remote.CleanupMethod != objstorage.SharedRefTracking {
228 0 : return nil
229 0 : }
230 1 : refName := p.sharedObjectRefName(meta)
231 1 : writer, err := meta.Remote.Storage.CreateObject(refName)
232 1 : if err == nil {
233 1 : // The object is empty, just close the writer.
234 1 : err = writer.Close()
235 1 : }
236 1 : if err != nil {
237 0 : return errors.Wrapf(err, "creating marker object %q", errors.Safe(refName))
238 0 : }
239 1 : return nil
240 : }
241 :
242 : func (p *provider) sharedCreate(
243 : _ context.Context,
244 : fileType base.FileType,
245 : fileNum base.DiskFileNum,
246 : locator remote.Locator,
247 : opts objstorage.CreateOptions,
248 1 : ) (objstorage.Writable, objstorage.ObjectMetadata, error) {
249 1 : if err := p.sharedCheckInitialized(); err != nil {
250 0 : return nil, objstorage.ObjectMetadata{}, err
251 0 : }
252 1 : storage, err := p.ensureStorage(locator)
253 1 : if err != nil {
254 0 : return nil, objstorage.ObjectMetadata{}, err
255 0 : }
256 1 : meta := objstorage.ObjectMetadata{
257 1 : DiskFileNum: fileNum,
258 1 : FileType: fileType,
259 1 : }
260 1 : meta.Remote.CreatorID = p.remote.shared.creatorID
261 1 : meta.Remote.CreatorFileNum = fileNum
262 1 : meta.Remote.CleanupMethod = opts.SharedCleanupMethod
263 1 : meta.Remote.Locator = locator
264 1 : meta.Remote.Storage = storage
265 1 :
266 1 : objName := remoteObjectName(meta)
267 1 : writer, err := storage.CreateObject(objName)
268 1 : if err != nil {
269 0 : return nil, objstorage.ObjectMetadata{}, errors.Wrapf(err, "creating object %q", errors.Safe(objName))
270 0 : }
271 1 : return &sharedWritable{
272 1 : p: p,
273 1 : meta: meta,
274 1 : storageWriter: writer,
275 1 : }, meta, nil
276 : }
277 :
278 : func (p *provider) remoteOpenForReading(
279 : ctx context.Context, meta objstorage.ObjectMetadata, opts objstorage.OpenOptions,
280 1 : ) (objstorage.Readable, error) {
281 1 : if err := p.remoteCheckInitialized(); err != nil {
282 0 : return nil, err
283 0 : }
284 : // Verify we have a reference on this object; for performance reasons, we only
285 : // do this in testing scenarios.
286 1 : if p.remote.shared.checkRefsOnOpen && meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
287 1 : if err := p.sharedCheckInitialized(); err != nil {
288 0 : return nil, err
289 0 : }
290 1 : refName := p.sharedObjectRefName(meta)
291 1 : if _, err := meta.Remote.Storage.Size(refName); err != nil {
292 0 : if meta.Remote.Storage.IsNotExistError(err) {
293 0 : if opts.MustExist {
294 0 : p.st.Logger.Fatalf("marker object %q does not exist", errors.Safe(refName))
295 0 : // TODO(radu): maybe list references for the object.
296 0 : }
297 0 : return nil, errors.Errorf("marker object %q does not exist", errors.Safe(refName))
298 : }
299 0 : return nil, errors.Wrapf(err, "checking marker object %q", errors.Safe(refName))
300 : }
301 : }
302 1 : objName := remoteObjectName(meta)
303 1 : reader, size, err := meta.Remote.Storage.ReadObject(ctx, objName)
304 1 : if err != nil {
305 0 : if opts.MustExist && meta.Remote.Storage.IsNotExistError(err) {
306 0 : p.st.Logger.Fatalf("object %q does not exist", redact.SafeString(objName))
307 0 : // TODO(radu): maybe list references for the object.
308 0 : }
309 0 : return nil, err
310 : }
311 1 : return p.newRemoteReadable(reader, size, meta.DiskFileNum), nil
312 : }
313 :
314 1 : func (p *provider) remoteSize(meta objstorage.ObjectMetadata) (int64, error) {
315 1 : if err := p.remoteCheckInitialized(); err != nil {
316 0 : return 0, err
317 0 : }
318 1 : objName := remoteObjectName(meta)
319 1 : return meta.Remote.Storage.Size(objName)
320 : }
321 :
322 : // sharedUnref implements object "removal" with the remote backend. The ref
323 : // marker object is removed and the backing object is removed only if there are
324 : // no other ref markers.
325 1 : func (p *provider) sharedUnref(meta objstorage.ObjectMetadata) error {
326 1 : if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
327 0 : // Never delete objects in this mode.
328 0 : return nil
329 0 : }
330 1 : if p.isProtected(meta.DiskFileNum) {
331 0 : // TODO(radu): we need a mechanism to unref the object when it becomes
332 0 : // unprotected.
333 0 : return nil
334 0 : }
335 :
336 1 : refName := p.sharedObjectRefName(meta)
337 1 : // Tolerate a not-exists error.
338 1 : if err := meta.Remote.Storage.Delete(refName); err != nil && !meta.Remote.Storage.IsNotExistError(err) {
339 0 : return err
340 0 : }
341 1 : otherRefs, err := meta.Remote.Storage.List(sharedObjectRefPrefix(meta), "" /* delimiter */)
342 1 : if err != nil {
343 0 : return err
344 0 : }
345 1 : if len(otherRefs) == 0 {
346 1 : objName := remoteObjectName(meta)
347 1 : if err := meta.Remote.Storage.Delete(objName); err != nil && !meta.Remote.Storage.IsNotExistError(err) {
348 0 : return err
349 0 : }
350 : }
351 1 : return nil
352 : }
353 :
354 : // ensureStorageLocked populates the remote.Storage object for the given
355 : // locator, if necessary. p.mu must be held.
356 1 : func (p *provider) ensureStorageLocked(locator remote.Locator) (remote.Storage, error) {
357 1 : if p.mu.remote.storageObjects == nil {
358 1 : p.mu.remote.storageObjects = make(map[remote.Locator]remote.Storage)
359 1 : }
360 1 : if res, ok := p.mu.remote.storageObjects[locator]; ok {
361 1 : return res, nil
362 1 : }
363 1 : res, err := p.st.Remote.StorageFactory.CreateStorage(locator)
364 1 : if err != nil {
365 0 : return nil, err
366 0 : }
367 :
368 1 : p.mu.remote.storageObjects[locator] = res
369 1 : return res, nil
370 : }
371 :
372 : // ensureStorage populates the remote.Storage object for the given locator, if necessary.
373 1 : func (p *provider) ensureStorage(locator remote.Locator) (remote.Storage, error) {
374 1 : p.mu.Lock()
375 1 : defer p.mu.Unlock()
376 1 : return p.ensureStorageLocked(locator)
377 1 : }
|