1# Copyright 2017 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Configure bucket notification resources to interact with Google Cloud Pub/Sub.
16
17See [Cloud Pub/Sub Notifications for Google Cloud Storage](https://cloud.google.com/storage/docs/pubsub-notifications)
18"""
19
20import re
21
22from google.api_core.exceptions import NotFound
23
24from google.cloud.storage._opentelemetry_tracing import create_trace_span
25from google.cloud.storage.constants import _DEFAULT_TIMEOUT
26from google.cloud.storage.retry import DEFAULT_RETRY
27
28
29OBJECT_FINALIZE_EVENT_TYPE = "OBJECT_FINALIZE"
30OBJECT_METADATA_UPDATE_EVENT_TYPE = "OBJECT_METADATA_UPDATE"
31OBJECT_DELETE_EVENT_TYPE = "OBJECT_DELETE"
32OBJECT_ARCHIVE_EVENT_TYPE = "OBJECT_ARCHIVE"
33
34JSON_API_V1_PAYLOAD_FORMAT = "JSON_API_V1"
35NONE_PAYLOAD_FORMAT = "NONE"
36
37_TOPIC_REF_FMT = "//pubsub.googleapis.com/projects/{}/topics/{}"
38_PROJECT_PATTERN = r"(?P<project>[a-z][a-z0-9-]{4,28}[a-z0-9])"
39_TOPIC_NAME_PATTERN = r"(?P<name>[A-Za-z](\w|[-_.~+%])+)"
40_TOPIC_REF_PATTERN = _TOPIC_REF_FMT.format(_PROJECT_PATTERN, _TOPIC_NAME_PATTERN)
41_TOPIC_REF_RE = re.compile(_TOPIC_REF_PATTERN)
42_BAD_TOPIC = (
43 "Resource has invalid topic: {}; see "
44 "https://cloud.google.com/storage/docs/json_api/v1/"
45 "notifications/insert#topic"
46)
47
48
49class BucketNotification(object):
50 """Represent a single notification resource for a bucket.
51
52 See: https://cloud.google.com/storage/docs/json_api/v1/notifications
53
54 :type bucket: :class:`google.cloud.storage.bucket.Bucket`
55 :param bucket: Bucket to which the notification is bound.
56
57 :type topic_name: str
58 :param topic_name:
59 (Optional) Topic name to which notifications are published.
60
61 :type topic_project: str
62 :param topic_project:
63 (Optional) Project ID of topic to which notifications are published.
64 If not passed, uses the project ID of the bucket's client.
65
66 :type custom_attributes: dict
67 :param custom_attributes:
68 (Optional) Additional attributes passed with notification events.
69
70 :type event_types: list(str)
71 :param event_types:
72 (Optional) Event types for which notification events are published.
73
74 :type blob_name_prefix: str
75 :param blob_name_prefix:
76 (Optional) Prefix of blob names for which notification events are
77 published.
78
79 :type payload_format: str
80 :param payload_format:
81 (Optional) Format of payload for notification events.
82
83 :type notification_id: str
84 :param notification_id:
85 (Optional) The ID of the notification.
86 """
87
88 def __init__(
89 self,
90 bucket,
91 topic_name=None,
92 topic_project=None,
93 custom_attributes=None,
94 event_types=None,
95 blob_name_prefix=None,
96 payload_format=NONE_PAYLOAD_FORMAT,
97 notification_id=None,
98 ):
99 self._bucket = bucket
100 self._topic_name = topic_name
101
102 if topic_project is None:
103 topic_project = bucket.client.project
104
105 if topic_project is None:
106 raise ValueError("Client project not set: pass an explicit topic_project.")
107
108 self._topic_project = topic_project
109
110 self._properties = {}
111
112 if custom_attributes is not None:
113 self._properties["custom_attributes"] = custom_attributes
114
115 if event_types is not None:
116 self._properties["event_types"] = event_types
117
118 if blob_name_prefix is not None:
119 self._properties["object_name_prefix"] = blob_name_prefix
120
121 if notification_id is not None:
122 self._properties["id"] = notification_id
123
124 self._properties["payload_format"] = payload_format
125
126 @classmethod
127 def from_api_repr(cls, resource, bucket):
128 """Construct an instance from the JSON repr returned by the server.
129
130 See: https://cloud.google.com/storage/docs/json_api/v1/notifications
131
132 :type resource: dict
133 :param resource: JSON repr of the notification
134
135 :type bucket: :class:`google.cloud.storage.bucket.Bucket`
136 :param bucket: Bucket to which the notification is bound.
137
138 :rtype: :class:`BucketNotification`
139 :returns: the new notification instance
140 """
141 topic_path = resource.get("topic")
142 if topic_path is None:
143 raise ValueError("Resource has no topic")
144
145 name, project = _parse_topic_path(topic_path)
146 instance = cls(bucket, name, topic_project=project)
147 instance._properties = resource
148
149 return instance
150
151 @property
152 def bucket(self):
153 """Bucket to which the notification is bound."""
154 return self._bucket
155
156 @property
157 def topic_name(self):
158 """Topic name to which notifications are published."""
159 return self._topic_name
160
161 @property
162 def topic_project(self):
163 """Project ID of topic to which notifications are published."""
164 return self._topic_project
165
166 @property
167 def custom_attributes(self):
168 """Custom attributes passed with notification events."""
169 return self._properties.get("custom_attributes")
170
171 @property
172 def event_types(self):
173 """Event types for which notification events are published."""
174 return self._properties.get("event_types")
175
176 @property
177 def blob_name_prefix(self):
178 """Prefix of blob names for which notification events are published."""
179 return self._properties.get("object_name_prefix")
180
181 @property
182 def payload_format(self):
183 """Format of payload of notification events."""
184 return self._properties.get("payload_format")
185
186 @property
187 def notification_id(self):
188 """Server-set ID of notification resource."""
189 return self._properties.get("id")
190
191 @property
192 def etag(self):
193 """Server-set ETag of notification resource."""
194 return self._properties.get("etag")
195
196 @property
197 def self_link(self):
198 """Server-set ETag of notification resource."""
199 return self._properties.get("selfLink")
200
201 @property
202 def client(self):
203 """The client bound to this notfication."""
204 return self.bucket.client
205
206 @property
207 def path(self):
208 """The URL path for this notification."""
209 return f"/b/{self.bucket.name}/notificationConfigs/{self.notification_id}"
210
211 def _require_client(self, client):
212 """Check client or verify over-ride.
213
214 :type client: :class:`~google.cloud.storage.client.Client` or
215 ``NoneType``
216 :param client: the client to use.
217
218 :rtype: :class:`google.cloud.storage.client.Client`
219 :returns: The client passed in or the bucket's client.
220 """
221 if client is None:
222 client = self.client
223 return client
224
225 def _set_properties(self, response):
226 """Helper for :meth:`reload`.
227
228 :type response: dict
229 :param response: resource mapping from server
230 """
231 self._properties.clear()
232 self._properties.update(response)
233
234 def create(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=None):
235 """API wrapper: create the notification.
236
237 See:
238 https://cloud.google.com/storage/docs/json_api/v1/notifications/insert
239
240 If :attr:`user_project` is set on the bucket, bills the API request
241 to that project.
242
243 :type client: :class:`~google.cloud.storage.client.Client`
244 :param client: (Optional) The client to use. If not passed, falls back
245 to the ``client`` stored on the notification's bucket.
246 :type timeout: float or tuple
247 :param timeout:
248 (Optional) The amount of time, in seconds, to wait
249 for the server response. See: :ref:`configuring_timeouts`
250
251 :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy
252 :param retry:
253 (Optional) How to retry the RPC. See: :ref:`configuring_retries`
254
255 :raises ValueError: if the notification already exists.
256 """
257 with create_trace_span(name="Storage.BucketNotification.create"):
258 if self.notification_id is not None:
259 raise ValueError(
260 f"notification_id already set to {self.notification_id}; must be None to create a Notification."
261 )
262
263 client = self._require_client(client)
264
265 query_params = {}
266 if self.bucket.user_project is not None:
267 query_params["userProject"] = self.bucket.user_project
268
269 path = f"/b/{self.bucket.name}/notificationConfigs"
270 properties = self._properties.copy()
271
272 if self.topic_name is None:
273 properties["topic"] = _TOPIC_REF_FMT.format(self.topic_project, "")
274 else:
275 properties["topic"] = _TOPIC_REF_FMT.format(
276 self.topic_project, self.topic_name
277 )
278
279 self._properties = client._post_resource(
280 path,
281 properties,
282 query_params=query_params,
283 timeout=timeout,
284 retry=retry,
285 )
286
287 def exists(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY):
288 """Test whether this notification exists.
289
290 See:
291 https://cloud.google.com/storage/docs/json_api/v1/notifications/get
292
293 If :attr:`user_project` is set on the bucket, bills the API request
294 to that project.
295
296 :type client: :class:`~google.cloud.storage.client.Client` or
297 ``NoneType``
298 :param client: (Optional) The client to use. If not passed, falls back
299 to the ``client`` stored on the current bucket.
300 :type timeout: float or tuple
301 :param timeout:
302 (Optional) The amount of time, in seconds, to wait
303 for the server response. See: :ref:`configuring_timeouts`
304
305 :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy
306 :param retry:
307 (Optional) How to retry the RPC. See: :ref:`configuring_retries`
308
309 :rtype: bool
310 :returns: True, if the notification exists, else False.
311 :raises ValueError: if the notification has no ID.
312 """
313 with create_trace_span(name="Storage.BucketNotification.exists"):
314 if self.notification_id is None:
315 raise ValueError(
316 "Notification ID not set: set an explicit notification_id"
317 )
318
319 client = self._require_client(client)
320
321 query_params = {}
322 if self.bucket.user_project is not None:
323 query_params["userProject"] = self.bucket.user_project
324
325 try:
326 client._get_resource(
327 self.path,
328 query_params=query_params,
329 timeout=timeout,
330 retry=retry,
331 )
332 except NotFound:
333 return False
334 else:
335 return True
336
337 def reload(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY):
338 """Update this notification from the server configuration.
339
340 See:
341 https://cloud.google.com/storage/docs/json_api/v1/notifications/get
342
343 If :attr:`user_project` is set on the bucket, bills the API request
344 to that project.
345
346 :type client: :class:`~google.cloud.storage.client.Client` or
347 ``NoneType``
348 :param client: (Optional) The client to use. If not passed, falls back
349 to the ``client`` stored on the current bucket.
350 :type timeout: float or tuple
351 :param timeout:
352 (Optional) The amount of time, in seconds, to wait
353 for the server response. See: :ref:`configuring_timeouts`
354
355 :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy
356 :param retry:
357 (Optional) How to retry the RPC. See: :ref:`configuring_retries`
358
359
360 :raises ValueError: if the notification has no ID.
361 """
362 with create_trace_span(name="Storage.BucketNotification.reload"):
363 if self.notification_id is None:
364 raise ValueError(
365 "Notification ID not set: set an explicit notification_id"
366 )
367
368 client = self._require_client(client)
369
370 query_params = {}
371 if self.bucket.user_project is not None:
372 query_params["userProject"] = self.bucket.user_project
373
374 response = client._get_resource(
375 self.path,
376 query_params=query_params,
377 timeout=timeout,
378 retry=retry,
379 )
380 self._set_properties(response)
381
382 def delete(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY):
383 """Delete this notification.
384
385 See:
386 https://cloud.google.com/storage/docs/json_api/v1/notifications/delete
387
388 If :attr:`user_project` is set on the bucket, bills the API request
389 to that project.
390
391 :type client: :class:`~google.cloud.storage.client.Client` or
392 ``NoneType``
393 :param client: (Optional) The client to use. If not passed, falls back
394 to the ``client`` stored on the current bucket.
395 :type timeout: float or tuple
396 :param timeout:
397 (Optional) The amount of time, in seconds, to wait
398 for the server response. See: :ref:`configuring_timeouts`
399
400 :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy
401 :param retry:
402 (Optional) How to retry the RPC. See: :ref:`configuring_retries`
403
404 :raises: :class:`google.api_core.exceptions.NotFound`:
405 if the notification does not exist.
406 :raises ValueError: if the notification has no ID.
407 """
408 with create_trace_span(name="Storage.BucketNotification.delete"):
409 if self.notification_id is None:
410 raise ValueError(
411 "Notification ID not set: set an explicit notification_id"
412 )
413
414 client = self._require_client(client)
415
416 query_params = {}
417 if self.bucket.user_project is not None:
418 query_params["userProject"] = self.bucket.user_project
419
420 client._delete_resource(
421 self.path,
422 query_params=query_params,
423 timeout=timeout,
424 retry=retry,
425 )
426
427
428def _parse_topic_path(topic_path):
429 """Verify that a topic path is in the correct format.
430
431 Expected to be of the form:
432
433 //pubsub.googleapis.com/projects/{project}/topics/{topic}
434
435 where the ``project`` value must be "6 to 30 lowercase letters, digits,
436 or hyphens. It must start with a letter. Trailing hyphens are prohibited."
437 (see [`resource manager docs`](https://cloud.google.com/resource-manager/reference/rest/v1beta1/projects#Project.FIELDS.project_id))
438 and ``topic`` must have length at least two,
439 must start with a letter and may only contain alphanumeric characters or
440 ``-``, ``_``, ``.``, ``~``, ``+`` or ``%`` (i.e characters used for URL
441 encoding, see [`topic spec`](https://cloud.google.com/storage/docs/json_api/v1/notifications/insert#topic)).
442
443 Args:
444 topic_path (str): The topic path to be verified.
445
446 Returns:
447 Tuple[str, str]: The ``project`` and ``topic`` parsed from the
448 ``topic_path``.
449
450 Raises:
451 ValueError: If the topic path is invalid.
452 """
453 match = _TOPIC_REF_RE.match(topic_path)
454 if match is None:
455 raise ValueError(_BAD_TOPIC.format(topic_path))
456
457 return match.group("name"), match.group("project")