1# Copyright 2017 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Configure bucket notification resources to interact with Google Cloud Pub/Sub. 
    16 
    17See [Cloud Pub/Sub Notifications for Google Cloud Storage](https://cloud.google.com/storage/docs/pubsub-notifications) 
    18""" 
    19 
    20import re 
    21 
    22from google.api_core.exceptions import NotFound 
    23 
    24from google.cloud.storage._opentelemetry_tracing import create_trace_span 
    25from google.cloud.storage.constants import _DEFAULT_TIMEOUT 
    26from google.cloud.storage.retry import DEFAULT_RETRY 
    27 
    28 
    29OBJECT_FINALIZE_EVENT_TYPE = "OBJECT_FINALIZE" 
    30OBJECT_METADATA_UPDATE_EVENT_TYPE = "OBJECT_METADATA_UPDATE" 
    31OBJECT_DELETE_EVENT_TYPE = "OBJECT_DELETE" 
    32OBJECT_ARCHIVE_EVENT_TYPE = "OBJECT_ARCHIVE" 
    33 
    34JSON_API_V1_PAYLOAD_FORMAT = "JSON_API_V1" 
    35NONE_PAYLOAD_FORMAT = "NONE" 
    36 
    37_TOPIC_REF_FMT = "//pubsub.googleapis.com/projects/{}/topics/{}" 
    38_PROJECT_PATTERN = r"(?P<project>[a-z][a-z0-9-]{4,28}[a-z0-9])" 
    39_TOPIC_NAME_PATTERN = r"(?P<name>[A-Za-z](\w|[-_.~+%])+)" 
    40_TOPIC_REF_PATTERN = _TOPIC_REF_FMT.format(_PROJECT_PATTERN, _TOPIC_NAME_PATTERN) 
    41_TOPIC_REF_RE = re.compile(_TOPIC_REF_PATTERN) 
    42_BAD_TOPIC = ( 
    43    "Resource has invalid topic: {}; see " 
    44    "https://cloud.google.com/storage/docs/json_api/v1/" 
    45    "notifications/insert#topic" 
    46) 
    47 
    48 
    49class BucketNotification(object): 
    50    """Represent a single notification resource for a bucket. 
    51 
    52    See: https://cloud.google.com/storage/docs/json_api/v1/notifications 
    53 
    54    :type bucket: :class:`google.cloud.storage.bucket.Bucket` 
    55    :param bucket: Bucket to which the notification is bound. 
    56 
    57    :type topic_name: str 
    58    :param topic_name: 
    59        (Optional) Topic name to which notifications are published. 
    60 
    61    :type topic_project: str 
    62    :param topic_project: 
    63        (Optional) Project ID of topic to which notifications are published. 
    64        If not passed, uses the project ID of the bucket's client. 
    65 
    66    :type custom_attributes: dict 
    67    :param custom_attributes: 
    68        (Optional) Additional attributes passed with notification events. 
    69 
    70    :type event_types: list(str) 
    71    :param event_types: 
    72        (Optional) Event types for which notification events are published. 
    73 
    74    :type blob_name_prefix: str 
    75    :param blob_name_prefix: 
    76        (Optional) Prefix of blob names for which notification events are 
    77        published. 
    78 
    79    :type payload_format: str 
    80    :param payload_format: 
    81        (Optional) Format of payload for notification events. 
    82 
    83    :type notification_id: str 
    84    :param notification_id: 
    85        (Optional) The ID of the notification. 
    86    """ 
    87 
    88    def __init__( 
    89        self, 
    90        bucket, 
    91        topic_name=None, 
    92        topic_project=None, 
    93        custom_attributes=None, 
    94        event_types=None, 
    95        blob_name_prefix=None, 
    96        payload_format=NONE_PAYLOAD_FORMAT, 
    97        notification_id=None, 
    98    ): 
    99        self._bucket = bucket 
    100        self._topic_name = topic_name 
    101 
    102        if topic_project is None: 
    103            topic_project = bucket.client.project 
    104 
    105        if topic_project is None: 
    106            raise ValueError("Client project not set:  pass an explicit topic_project.") 
    107 
    108        self._topic_project = topic_project 
    109 
    110        self._properties = {} 
    111 
    112        if custom_attributes is not None: 
    113            self._properties["custom_attributes"] = custom_attributes 
    114 
    115        if event_types is not None: 
    116            self._properties["event_types"] = event_types 
    117 
    118        if blob_name_prefix is not None: 
    119            self._properties["object_name_prefix"] = blob_name_prefix 
    120 
    121        if notification_id is not None: 
    122            self._properties["id"] = notification_id 
    123 
    124        self._properties["payload_format"] = payload_format 
    125 
    126    @classmethod 
    127    def from_api_repr(cls, resource, bucket): 
    128        """Construct an instance from the JSON repr returned by the server. 
    129 
    130        See: https://cloud.google.com/storage/docs/json_api/v1/notifications 
    131 
    132        :type resource: dict 
    133        :param resource: JSON repr of the notification 
    134 
    135        :type bucket: :class:`google.cloud.storage.bucket.Bucket` 
    136        :param bucket: Bucket to which the notification is bound. 
    137 
    138        :rtype: :class:`BucketNotification` 
    139        :returns: the new notification instance 
    140        """ 
    141        topic_path = resource.get("topic") 
    142        if topic_path is None: 
    143            raise ValueError("Resource has no topic") 
    144 
    145        name, project = _parse_topic_path(topic_path) 
    146        instance = cls(bucket, name, topic_project=project) 
    147        instance._properties = resource 
    148 
    149        return instance 
    150 
    151    @property 
    152    def bucket(self): 
    153        """Bucket to which the notification is bound.""" 
    154        return self._bucket 
    155 
    156    @property 
    157    def topic_name(self): 
    158        """Topic name to which notifications are published.""" 
    159        return self._topic_name 
    160 
    161    @property 
    162    def topic_project(self): 
    163        """Project ID of topic to which notifications are published.""" 
    164        return self._topic_project 
    165 
    166    @property 
    167    def custom_attributes(self): 
    168        """Custom attributes passed with notification events.""" 
    169        return self._properties.get("custom_attributes") 
    170 
    171    @property 
    172    def event_types(self): 
    173        """Event types for which notification events are published.""" 
    174        return self._properties.get("event_types") 
    175 
    176    @property 
    177    def blob_name_prefix(self): 
    178        """Prefix of blob names for which notification events are published.""" 
    179        return self._properties.get("object_name_prefix") 
    180 
    181    @property 
    182    def payload_format(self): 
    183        """Format of payload of notification events.""" 
    184        return self._properties.get("payload_format") 
    185 
    186    @property 
    187    def notification_id(self): 
    188        """Server-set ID of notification resource.""" 
    189        return self._properties.get("id") 
    190 
    191    @property 
    192    def etag(self): 
    193        """Server-set ETag of notification resource.""" 
    194        return self._properties.get("etag") 
    195 
    196    @property 
    197    def self_link(self): 
    198        """Server-set ETag of notification resource.""" 
    199        return self._properties.get("selfLink") 
    200 
    201    @property 
    202    def client(self): 
    203        """The client bound to this notfication.""" 
    204        return self.bucket.client 
    205 
    206    @property 
    207    def path(self): 
    208        """The URL path for this notification.""" 
    209        return f"/b/{self.bucket.name}/notificationConfigs/{self.notification_id}" 
    210 
    211    def _require_client(self, client): 
    212        """Check client or verify over-ride. 
    213 
    214        :type client: :class:`~google.cloud.storage.client.Client` or 
    215                      ``NoneType`` 
    216        :param client: the client to use. 
    217 
    218        :rtype: :class:`google.cloud.storage.client.Client` 
    219        :returns: The client passed in or the bucket's client. 
    220        """ 
    221        if client is None: 
    222            client = self.client 
    223        return client 
    224 
    225    def _set_properties(self, response): 
    226        """Helper for :meth:`reload`. 
    227 
    228        :type response: dict 
    229        :param response: resource mapping from server 
    230        """ 
    231        self._properties.clear() 
    232        self._properties.update(response) 
    233 
    234    def create(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=None): 
    235        """API wrapper: create the notification. 
    236 
    237        See: 
    238        https://cloud.google.com/storage/docs/json_api/v1/notifications/insert 
    239 
    240        If :attr:`user_project` is set on the bucket, bills the API request 
    241        to that project. 
    242 
    243        :type client: :class:`~google.cloud.storage.client.Client` 
    244        :param client: (Optional) The client to use.  If not passed, falls back 
    245                       to the ``client`` stored on the notification's bucket. 
    246        :type timeout: float or tuple 
    247        :param timeout: 
    248            (Optional) The amount of time, in seconds, to wait 
    249            for the server response.  See: :ref:`configuring_timeouts` 
    250 
    251        :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy 
    252        :param retry: 
    253            (Optional) How to retry the RPC. See: :ref:`configuring_retries` 
    254 
    255        :raises ValueError: if the notification already exists. 
    256        """ 
    257        with create_trace_span(name="Storage.BucketNotification.create"): 
    258            if self.notification_id is not None: 
    259                raise ValueError( 
    260                    f"notification_id already set to {self.notification_id}; must be None to create a Notification."  # noqa: E702 
    261                ) 
    262 
    263            client = self._require_client(client) 
    264 
    265            query_params = {} 
    266            if self.bucket.user_project is not None: 
    267                query_params["userProject"] = self.bucket.user_project 
    268 
    269            path = f"/b/{self.bucket.name}/notificationConfigs" 
    270            properties = self._properties.copy() 
    271 
    272            if self.topic_name is None: 
    273                properties["topic"] = _TOPIC_REF_FMT.format(self.topic_project, "") 
    274            else: 
    275                properties["topic"] = _TOPIC_REF_FMT.format( 
    276                    self.topic_project, self.topic_name 
    277                ) 
    278 
    279            self._properties = client._post_resource( 
    280                path, 
    281                properties, 
    282                query_params=query_params, 
    283                timeout=timeout, 
    284                retry=retry, 
    285            ) 
    286 
    287    def exists(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY): 
    288        """Test whether this notification exists. 
    289 
    290        See: 
    291        https://cloud.google.com/storage/docs/json_api/v1/notifications/get 
    292 
    293        If :attr:`user_project` is set on the bucket, bills the API request 
    294        to that project. 
    295 
    296        :type client: :class:`~google.cloud.storage.client.Client` or 
    297                      ``NoneType`` 
    298        :param client: (Optional) The client to use.  If not passed, falls back 
    299                       to the ``client`` stored on the current bucket. 
    300        :type timeout: float or tuple 
    301        :param timeout: 
    302            (Optional) The amount of time, in seconds, to wait 
    303            for the server response.  See: :ref:`configuring_timeouts` 
    304 
    305        :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy 
    306        :param retry: 
    307            (Optional) How to retry the RPC. See: :ref:`configuring_retries` 
    308 
    309        :rtype: bool 
    310        :returns: True, if the notification exists, else False. 
    311        :raises ValueError: if the notification has no ID. 
    312        """ 
    313        with create_trace_span(name="Storage.BucketNotification.exists"): 
    314            if self.notification_id is None: 
    315                raise ValueError( 
    316                    "Notification ID not set: set an explicit notification_id" 
    317                ) 
    318 
    319            client = self._require_client(client) 
    320 
    321            query_params = {} 
    322            if self.bucket.user_project is not None: 
    323                query_params["userProject"] = self.bucket.user_project 
    324 
    325            try: 
    326                client._get_resource( 
    327                    self.path, 
    328                    query_params=query_params, 
    329                    timeout=timeout, 
    330                    retry=retry, 
    331                ) 
    332            except NotFound: 
    333                return False 
    334            else: 
    335                return True 
    336 
    337    def reload(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY): 
    338        """Update this notification from the server configuration. 
    339 
    340        See: 
    341        https://cloud.google.com/storage/docs/json_api/v1/notifications/get 
    342 
    343        If :attr:`user_project` is set on the bucket, bills the API request 
    344        to that project. 
    345 
    346        :type client: :class:`~google.cloud.storage.client.Client` or 
    347                      ``NoneType`` 
    348        :param client: (Optional) The client to use.  If not passed, falls back 
    349                       to the ``client`` stored on the current bucket. 
    350        :type timeout: float or tuple 
    351        :param timeout: 
    352            (Optional) The amount of time, in seconds, to wait 
    353            for the server response.  See: :ref:`configuring_timeouts` 
    354 
    355        :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy 
    356        :param retry: 
    357            (Optional) How to retry the RPC. See: :ref:`configuring_retries` 
    358 
    359 
    360        :raises ValueError: if the notification has no ID. 
    361        """ 
    362        with create_trace_span(name="Storage.BucketNotification.reload"): 
    363            if self.notification_id is None: 
    364                raise ValueError( 
    365                    "Notification ID not set: set an explicit notification_id" 
    366                ) 
    367 
    368            client = self._require_client(client) 
    369 
    370            query_params = {} 
    371            if self.bucket.user_project is not None: 
    372                query_params["userProject"] = self.bucket.user_project 
    373 
    374            response = client._get_resource( 
    375                self.path, 
    376                query_params=query_params, 
    377                timeout=timeout, 
    378                retry=retry, 
    379            ) 
    380            self._set_properties(response) 
    381 
    382    def delete(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY): 
    383        """Delete this notification. 
    384 
    385        See: 
    386        https://cloud.google.com/storage/docs/json_api/v1/notifications/delete 
    387 
    388        If :attr:`user_project` is set on the bucket, bills the API request 
    389        to that project. 
    390 
    391        :type client: :class:`~google.cloud.storage.client.Client` or 
    392                      ``NoneType`` 
    393        :param client: (Optional) The client to use.  If not passed, falls back 
    394                       to the ``client`` stored on the current bucket. 
    395        :type timeout: float or tuple 
    396        :param timeout: 
    397            (Optional) The amount of time, in seconds, to wait 
    398            for the server response.  See: :ref:`configuring_timeouts` 
    399 
    400        :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy 
    401        :param retry: 
    402            (Optional) How to retry the RPC. See: :ref:`configuring_retries` 
    403 
    404        :raises: :class:`google.api_core.exceptions.NotFound`: 
    405            if the notification does not exist. 
    406        :raises ValueError: if the notification has no ID. 
    407        """ 
    408        with create_trace_span(name="Storage.BucketNotification.delete"): 
    409            if self.notification_id is None: 
    410                raise ValueError( 
    411                    "Notification ID not set: set an explicit notification_id" 
    412                ) 
    413 
    414            client = self._require_client(client) 
    415 
    416            query_params = {} 
    417            if self.bucket.user_project is not None: 
    418                query_params["userProject"] = self.bucket.user_project 
    419 
    420            client._delete_resource( 
    421                self.path, 
    422                query_params=query_params, 
    423                timeout=timeout, 
    424                retry=retry, 
    425            ) 
    426 
    427 
    428def _parse_topic_path(topic_path): 
    429    """Verify that a topic path is in the correct format. 
    430 
    431    Expected to be of the form: 
    432 
    433        //pubsub.googleapis.com/projects/{project}/topics/{topic} 
    434 
    435    where the ``project`` value must be "6 to 30 lowercase letters, digits, 
    436    or hyphens. It must start with a letter. Trailing hyphens are prohibited." 
    437    (see [`resource manager docs`](https://cloud.google.com/resource-manager/reference/rest/v1beta1/projects#Project.FIELDS.project_id)) 
    438    and ``topic`` must have length at least two, 
    439    must start with a letter and may only contain alphanumeric characters or 
    440    ``-``, ``_``, ``.``, ``~``, ``+`` or ``%`` (i.e characters used for URL 
    441    encoding, see [`topic spec`](https://cloud.google.com/storage/docs/json_api/v1/notifications/insert#topic)). 
    442 
    443    Args: 
    444        topic_path (str): The topic path to be verified. 
    445 
    446    Returns: 
    447        Tuple[str, str]: The ``project`` and ``topic`` parsed from the 
    448        ``topic_path``. 
    449 
    450    Raises: 
    451        ValueError: If the topic path is invalid. 
    452    """ 
    453    match = _TOPIC_REF_RE.match(topic_path) 
    454    if match is None: 
    455        raise ValueError(_BAD_TOPIC.format(topic_path)) 
    456 
    457    return match.group("name"), match.group("project")