Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package objstorageprovider
6 :
7 : import (
8 : "bytes"
9 : "encoding/binary"
10 : "io"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/objstorage"
15 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
16 : "github.com/cockroachdb/pebble/objstorage/remote"
17 : )
18 :
19 : const (
20 : tagCreatorID = 1
21 : tagCreatorFileNum = 2
22 : tagCleanupMethod = 3
23 : // tagRefCheckID encodes the information for a ref marker that needs to be
24 : // checked when attaching this object to another provider. This is set to the
25 : // creator ID and FileNum for the provider that encodes the backing, and
26 : // allows the "target" provider to check that the "source" provider kept its
27 : // reference on the object alive.
28 : tagRefCheckID = 4
29 : // tagLocator encodes the remote.Locator; if absent the locator is "". It is
30 : // followed by the locator string length and the locator string.
31 : tagLocator = 5
32 : // tagLocator encodes a custom object name (if present). It is followed by the
33 : // custom name string length and the string.
34 : tagCustomObjectName = 6
35 :
36 : // Any new tags that don't have the tagNotSafeToIgnoreMask bit set must be
37 : // followed by the length of the data (so they can be skipped).
38 :
39 : // Any new tags that have the tagNotSafeToIgnoreMask bit set cause errors if
40 : // they are encountered by earlier code that doesn't know the tag.
41 : tagNotSafeToIgnoreMask = 64
42 : )
43 :
44 : func (p *provider) encodeRemoteObjectBacking(
45 : meta *objstorage.ObjectMetadata,
46 2 : ) (objstorage.RemoteObjectBacking, error) {
47 2 : if !meta.IsRemote() {
48 1 : return nil, errors.AssertionFailedf("object %s not on remote storage", meta.DiskFileNum)
49 1 : }
50 :
51 2 : buf := make([]byte, 0, binary.MaxVarintLen64*4)
52 2 : buf = binary.AppendUvarint(buf, tagCreatorID)
53 2 : buf = binary.AppendUvarint(buf, uint64(meta.Remote.CreatorID))
54 2 : // TODO(radu): encode file type as well?
55 2 : buf = binary.AppendUvarint(buf, tagCreatorFileNum)
56 2 : buf = binary.AppendUvarint(buf, uint64(meta.Remote.CreatorFileNum.FileNum()))
57 2 : buf = binary.AppendUvarint(buf, tagCleanupMethod)
58 2 : buf = binary.AppendUvarint(buf, uint64(meta.Remote.CleanupMethod))
59 2 : if meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
60 2 : buf = binary.AppendUvarint(buf, tagRefCheckID)
61 2 : buf = binary.AppendUvarint(buf, uint64(p.remote.shared.creatorID))
62 2 : buf = binary.AppendUvarint(buf, uint64(meta.DiskFileNum.FileNum()))
63 2 : }
64 2 : if meta.Remote.Locator != "" {
65 1 : buf = binary.AppendUvarint(buf, tagLocator)
66 1 : buf = encodeString(buf, string(meta.Remote.Locator))
67 1 : }
68 2 : if meta.Remote.CustomObjectName != "" {
69 1 : buf = binary.AppendUvarint(buf, tagCustomObjectName)
70 1 : buf = encodeString(buf, meta.Remote.CustomObjectName)
71 1 : }
72 2 : return buf, nil
73 : }
74 :
75 : type remoteObjectBackingHandle struct {
76 : backing objstorage.RemoteObjectBacking
77 : fileNum base.DiskFileNum
78 : p *provider
79 : }
80 :
81 2 : func (s *remoteObjectBackingHandle) Get() (objstorage.RemoteObjectBacking, error) {
82 2 : if s.backing == nil {
83 1 : return nil, errors.Errorf("RemoteObjectBackingHandle.Get() called after Close()")
84 1 : }
85 2 : return s.backing, nil
86 : }
87 :
88 1 : func (s *remoteObjectBackingHandle) Close() {
89 1 : if s.backing != nil {
90 1 : s.backing = nil
91 1 : s.p.unprotectObject(s.fileNum)
92 1 : }
93 : }
94 :
95 : var _ objstorage.RemoteObjectBackingHandle = (*remoteObjectBackingHandle)(nil)
96 :
97 : // RemoteObjectBacking is part of the objstorage.Provider interface.
98 : func (p *provider) RemoteObjectBacking(
99 : meta *objstorage.ObjectMetadata,
100 2 : ) (objstorage.RemoteObjectBackingHandle, error) {
101 2 : backing, err := p.encodeRemoteObjectBacking(meta)
102 2 : if err != nil {
103 1 : return nil, err
104 1 : }
105 2 : p.protectObject(meta.DiskFileNum)
106 2 : return &remoteObjectBackingHandle{
107 2 : backing: backing,
108 2 : fileNum: meta.DiskFileNum,
109 2 : p: p,
110 2 : }, nil
111 : }
112 :
113 : // CreateExternalObjectBacking is part of the objstorage.Provider interface.
114 : func (p *provider) CreateExternalObjectBacking(
115 : locator remote.Locator, objName string,
116 1 : ) (objstorage.RemoteObjectBacking, error) {
117 1 : var meta objstorage.ObjectMetadata
118 1 : meta.Remote.Locator = locator
119 1 : meta.Remote.CustomObjectName = objName
120 1 : meta.Remote.CleanupMethod = objstorage.SharedNoCleanup
121 1 : return p.encodeRemoteObjectBacking(&meta)
122 1 : }
123 :
124 : type decodedBacking struct {
125 : meta objstorage.ObjectMetadata
126 : // refToCheck is set only when meta.Remote.CleanupMethod is RefTracking
127 : refToCheck struct {
128 : creatorID objstorage.CreatorID
129 : fileNum base.DiskFileNum
130 : }
131 : }
132 :
133 : // decodeRemoteObjectBacking decodes the remote object metadata.
134 : //
135 : // Note that the meta.Remote.Storage field is not set.
136 : func decodeRemoteObjectBacking(
137 : fileType base.FileType, fileNum base.DiskFileNum, buf objstorage.RemoteObjectBacking,
138 2 : ) (decodedBacking, error) {
139 2 : var creatorID, creatorFileNum, cleanupMethod, refCheckCreatorID, refCheckFileNum uint64
140 2 : var locator, customObjName string
141 2 : br := bytes.NewReader(buf)
142 2 : for {
143 2 : tag, err := binary.ReadUvarint(br)
144 2 : if err == io.EOF {
145 2 : break
146 : }
147 2 : if err != nil {
148 0 : return decodedBacking{}, err
149 0 : }
150 2 : switch tag {
151 2 : case tagCreatorID:
152 2 : creatorID, err = binary.ReadUvarint(br)
153 :
154 2 : case tagCreatorFileNum:
155 2 : creatorFileNum, err = binary.ReadUvarint(br)
156 :
157 2 : case tagCleanupMethod:
158 2 : cleanupMethod, err = binary.ReadUvarint(br)
159 :
160 2 : case tagRefCheckID:
161 2 : refCheckCreatorID, err = binary.ReadUvarint(br)
162 2 : if err == nil {
163 2 : refCheckFileNum, err = binary.ReadUvarint(br)
164 2 : }
165 :
166 1 : case tagLocator:
167 1 : locator, err = decodeString(br)
168 :
169 1 : case tagCustomObjectName:
170 1 : customObjName, err = decodeString(br)
171 :
172 1 : default:
173 1 : // Ignore unknown tags, unless they're not safe to ignore.
174 1 : if tag&tagNotSafeToIgnoreMask != 0 {
175 1 : return decodedBacking{}, errors.Newf("unknown tag %d", tag)
176 1 : }
177 1 : var dataLen uint64
178 1 : dataLen, err = binary.ReadUvarint(br)
179 1 : if err == nil {
180 1 : _, err = br.Seek(int64(dataLen), io.SeekCurrent)
181 1 : }
182 : }
183 2 : if err != nil {
184 0 : return decodedBacking{}, err
185 0 : }
186 : }
187 2 : if customObjName == "" {
188 2 : if creatorID == 0 {
189 0 : return decodedBacking{}, errors.Newf("remote object backing missing creator ID")
190 0 : }
191 2 : if creatorFileNum == 0 {
192 0 : return decodedBacking{}, errors.Newf("remote object backing missing creator file num")
193 0 : }
194 : }
195 2 : var res decodedBacking
196 2 : res.meta.DiskFileNum = fileNum
197 2 : res.meta.FileType = fileType
198 2 : res.meta.Remote.CreatorID = objstorage.CreatorID(creatorID)
199 2 : res.meta.Remote.CreatorFileNum = base.FileNum(creatorFileNum).DiskFileNum()
200 2 : res.meta.Remote.CleanupMethod = objstorage.SharedCleanupMethod(cleanupMethod)
201 2 : if res.meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
202 2 : if refCheckCreatorID == 0 || refCheckFileNum == 0 {
203 0 : return decodedBacking{}, errors.Newf("remote object backing missing ref to check")
204 0 : }
205 2 : res.refToCheck.creatorID = objstorage.CreatorID(refCheckCreatorID)
206 2 : res.refToCheck.fileNum = base.FileNum(refCheckFileNum).DiskFileNum()
207 : }
208 2 : res.meta.Remote.Locator = remote.Locator(locator)
209 2 : res.meta.Remote.CustomObjectName = customObjName
210 2 : return res, nil
211 : }
212 :
213 1 : func encodeString(buf []byte, s string) []byte {
214 1 : buf = binary.AppendUvarint(buf, uint64(len(s)))
215 1 : buf = append(buf, []byte(s)...)
216 1 : return buf
217 1 : }
218 :
219 1 : func decodeString(br io.ByteReader) (string, error) {
220 1 : length, err := binary.ReadUvarint(br)
221 1 : if err != nil || length == 0 {
222 0 : return "", err
223 0 : }
224 1 : buf := make([]byte, length)
225 1 : for i := range buf {
226 1 : buf[i], err = br.ReadByte()
227 1 : if err != nil {
228 0 : return "", err
229 0 : }
230 : }
231 1 : return string(buf), nil
232 : }
233 :
234 : // AttachRemoteObjects is part of the objstorage.Provider interface.
235 : func (p *provider) AttachRemoteObjects(
236 : objs []objstorage.RemoteObjectToAttach,
237 2 : ) ([]objstorage.ObjectMetadata, error) {
238 2 : decoded := make([]decodedBacking, len(objs))
239 2 : for i, o := range objs {
240 2 : var err error
241 2 : decoded[i], err = decodeRemoteObjectBacking(o.FileType, o.FileNum, o.Backing)
242 2 : if err != nil {
243 0 : return nil, err
244 0 : }
245 2 : decoded[i].meta.Remote.Storage, err = p.ensureStorage(decoded[i].meta.Remote.Locator)
246 2 : if err != nil {
247 1 : return nil, err
248 1 : }
249 : }
250 :
251 : // Create the reference marker objects.
252 : // TODO(radu): parallelize this.
253 2 : for _, d := range decoded {
254 2 : if d.meta.Remote.CleanupMethod != objstorage.SharedRefTracking {
255 1 : continue
256 : }
257 2 : if err := p.sharedCreateRef(d.meta); err != nil {
258 0 : // TODO(radu): clean up references previously created in this loop.
259 0 : return nil, err
260 0 : }
261 : // Check the "origin"'s reference.
262 2 : refName := sharedObjectRefName(d.meta, d.refToCheck.creatorID, d.refToCheck.fileNum)
263 2 : if _, err := d.meta.Remote.Storage.Size(refName); err != nil {
264 1 : _ = p.sharedUnref(d.meta)
265 1 : // TODO(radu): clean up references previously created in this loop.
266 1 : if d.meta.Remote.Storage.IsNotExistError(err) {
267 1 : return nil, errors.Errorf("origin marker object %q does not exist;"+
268 1 : " object probably removed from the provider which created the backing", refName)
269 1 : }
270 0 : return nil, errors.Wrapf(err, "checking origin's marker object %s", refName)
271 : }
272 : }
273 :
274 2 : func() {
275 2 : p.mu.Lock()
276 2 : defer p.mu.Unlock()
277 2 : for _, d := range decoded {
278 2 : p.mu.remote.catalogBatch.AddObject(remoteobjcat.RemoteObjectMetadata{
279 2 : FileNum: d.meta.DiskFileNum,
280 2 : FileType: d.meta.FileType,
281 2 : CreatorID: d.meta.Remote.CreatorID,
282 2 : CreatorFileNum: d.meta.Remote.CreatorFileNum,
283 2 : CleanupMethod: d.meta.Remote.CleanupMethod,
284 2 : Locator: d.meta.Remote.Locator,
285 2 : CustomObjectName: d.meta.Remote.CustomObjectName,
286 2 : })
287 2 : }
288 : }()
289 2 : if err := p.sharedSync(); err != nil {
290 0 : return nil, err
291 0 : }
292 :
293 2 : metas := make([]objstorage.ObjectMetadata, len(decoded))
294 2 : for i, d := range decoded {
295 2 : metas[i] = d.meta
296 2 : }
297 :
298 2 : p.mu.Lock()
299 2 : defer p.mu.Unlock()
300 2 : for _, meta := range metas {
301 2 : p.mu.knownObjects[meta.DiskFileNum] = meta
302 2 : }
303 2 : return metas, nil
304 : }
|