Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/storage/notification.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

141 statements  

1# Copyright 2017 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Configure bucket notification resources to interact with Google Cloud Pub/Sub. 

16 

17See [Cloud Pub/Sub Notifications for Google Cloud Storage](https://cloud.google.com/storage/docs/pubsub-notifications) 

18""" 

19 

20import re 

21 

22from google.api_core.exceptions import NotFound 

23 

24from google.cloud.storage._opentelemetry_tracing import create_trace_span 

25from google.cloud.storage.constants import _DEFAULT_TIMEOUT 

26from google.cloud.storage.retry import DEFAULT_RETRY 

27 

28 

29OBJECT_FINALIZE_EVENT_TYPE = "OBJECT_FINALIZE" 

30OBJECT_METADATA_UPDATE_EVENT_TYPE = "OBJECT_METADATA_UPDATE" 

31OBJECT_DELETE_EVENT_TYPE = "OBJECT_DELETE" 

32OBJECT_ARCHIVE_EVENT_TYPE = "OBJECT_ARCHIVE" 

33 

34JSON_API_V1_PAYLOAD_FORMAT = "JSON_API_V1" 

35NONE_PAYLOAD_FORMAT = "NONE" 

36 

37_TOPIC_REF_FMT = "//pubsub.googleapis.com/projects/{}/topics/{}" 

38_PROJECT_PATTERN = r"(?P<project>[a-z][a-z0-9-]{4,28}[a-z0-9])" 

39_TOPIC_NAME_PATTERN = r"(?P<name>[A-Za-z](\w|[-_.~+%])+)" 

40_TOPIC_REF_PATTERN = _TOPIC_REF_FMT.format(_PROJECT_PATTERN, _TOPIC_NAME_PATTERN) 

41_TOPIC_REF_RE = re.compile(_TOPIC_REF_PATTERN) 

42_BAD_TOPIC = ( 

43 "Resource has invalid topic: {}; see " 

44 "https://cloud.google.com/storage/docs/json_api/v1/" 

45 "notifications/insert#topic" 

46) 

47 

48 

49class BucketNotification(object): 

50 """Represent a single notification resource for a bucket. 

51 

52 See: https://cloud.google.com/storage/docs/json_api/v1/notifications 

53 

54 :type bucket: :class:`google.cloud.storage.bucket.Bucket` 

55 :param bucket: Bucket to which the notification is bound. 

56 

57 :type topic_name: str 

58 :param topic_name: 

59 (Optional) Topic name to which notifications are published. 

60 

61 :type topic_project: str 

62 :param topic_project: 

63 (Optional) Project ID of topic to which notifications are published. 

64 If not passed, uses the project ID of the bucket's client. 

65 

66 :type custom_attributes: dict 

67 :param custom_attributes: 

68 (Optional) Additional attributes passed with notification events. 

69 

70 :type event_types: list(str) 

71 :param event_types: 

72 (Optional) Event types for which notification events are published. 

73 

74 :type blob_name_prefix: str 

75 :param blob_name_prefix: 

76 (Optional) Prefix of blob names for which notification events are 

77 published. 

78 

79 :type payload_format: str 

80 :param payload_format: 

81 (Optional) Format of payload for notification events. 

82 

83 :type notification_id: str 

84 :param notification_id: 

85 (Optional) The ID of the notification. 

86 """ 

87 

88 def __init__( 

89 self, 

90 bucket, 

91 topic_name=None, 

92 topic_project=None, 

93 custom_attributes=None, 

94 event_types=None, 

95 blob_name_prefix=None, 

96 payload_format=NONE_PAYLOAD_FORMAT, 

97 notification_id=None, 

98 ): 

99 self._bucket = bucket 

100 self._topic_name = topic_name 

101 

102 if topic_project is None: 

103 topic_project = bucket.client.project 

104 

105 if topic_project is None: 

106 raise ValueError("Client project not set: pass an explicit topic_project.") 

107 

108 self._topic_project = topic_project 

109 

110 self._properties = {} 

111 

112 if custom_attributes is not None: 

113 self._properties["custom_attributes"] = custom_attributes 

114 

115 if event_types is not None: 

116 self._properties["event_types"] = event_types 

117 

118 if blob_name_prefix is not None: 

119 self._properties["object_name_prefix"] = blob_name_prefix 

120 

121 if notification_id is not None: 

122 self._properties["id"] = notification_id 

123 

124 self._properties["payload_format"] = payload_format 

125 

126 @classmethod 

127 def from_api_repr(cls, resource, bucket): 

128 """Construct an instance from the JSON repr returned by the server. 

129 

130 See: https://cloud.google.com/storage/docs/json_api/v1/notifications 

131 

132 :type resource: dict 

133 :param resource: JSON repr of the notification 

134 

135 :type bucket: :class:`google.cloud.storage.bucket.Bucket` 

136 :param bucket: Bucket to which the notification is bound. 

137 

138 :rtype: :class:`BucketNotification` 

139 :returns: the new notification instance 

140 """ 

141 topic_path = resource.get("topic") 

142 if topic_path is None: 

143 raise ValueError("Resource has no topic") 

144 

145 name, project = _parse_topic_path(topic_path) 

146 instance = cls(bucket, name, topic_project=project) 

147 instance._properties = resource 

148 

149 return instance 

150 

151 @property 

152 def bucket(self): 

153 """Bucket to which the notification is bound.""" 

154 return self._bucket 

155 

156 @property 

157 def topic_name(self): 

158 """Topic name to which notifications are published.""" 

159 return self._topic_name 

160 

161 @property 

162 def topic_project(self): 

163 """Project ID of topic to which notifications are published.""" 

164 return self._topic_project 

165 

166 @property 

167 def custom_attributes(self): 

168 """Custom attributes passed with notification events.""" 

169 return self._properties.get("custom_attributes") 

170 

171 @property 

172 def event_types(self): 

173 """Event types for which notification events are published.""" 

174 return self._properties.get("event_types") 

175 

176 @property 

177 def blob_name_prefix(self): 

178 """Prefix of blob names for which notification events are published.""" 

179 return self._properties.get("object_name_prefix") 

180 

181 @property 

182 def payload_format(self): 

183 """Format of payload of notification events.""" 

184 return self._properties.get("payload_format") 

185 

186 @property 

187 def notification_id(self): 

188 """Server-set ID of notification resource.""" 

189 return self._properties.get("id") 

190 

191 @property 

192 def etag(self): 

193 """Server-set ETag of notification resource.""" 

194 return self._properties.get("etag") 

195 

196 @property 

197 def self_link(self): 

198 """Server-set ETag of notification resource.""" 

199 return self._properties.get("selfLink") 

200 

201 @property 

202 def client(self): 

203 """The client bound to this notfication.""" 

204 return self.bucket.client 

205 

206 @property 

207 def path(self): 

208 """The URL path for this notification.""" 

209 return f"/b/{self.bucket.name}/notificationConfigs/{self.notification_id}" 

210 

211 def _require_client(self, client): 

212 """Check client or verify over-ride. 

213 

214 :type client: :class:`~google.cloud.storage.client.Client` or 

215 ``NoneType`` 

216 :param client: the client to use. 

217 

218 :rtype: :class:`google.cloud.storage.client.Client` 

219 :returns: The client passed in or the bucket's client. 

220 """ 

221 if client is None: 

222 client = self.client 

223 return client 

224 

225 def _set_properties(self, response): 

226 """Helper for :meth:`reload`. 

227 

228 :type response: dict 

229 :param response: resource mapping from server 

230 """ 

231 self._properties.clear() 

232 self._properties.update(response) 

233 

234 def create(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=None): 

235 """API wrapper: create the notification. 

236 

237 See: 

238 https://cloud.google.com/storage/docs/json_api/v1/notifications/insert 

239 

240 If :attr:`user_project` is set on the bucket, bills the API request 

241 to that project. 

242 

243 :type client: :class:`~google.cloud.storage.client.Client` 

244 :param client: (Optional) The client to use. If not passed, falls back 

245 to the ``client`` stored on the notification's bucket. 

246 :type timeout: float or tuple 

247 :param timeout: 

248 (Optional) The amount of time, in seconds, to wait 

249 for the server response. See: :ref:`configuring_timeouts` 

250 

251 :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy 

252 :param retry: 

253 (Optional) How to retry the RPC. See: :ref:`configuring_retries` 

254 

255 :raises ValueError: if the notification already exists. 

256 """ 

257 with create_trace_span(name="Storage.BucketNotification.create"): 

258 if self.notification_id is not None: 

259 raise ValueError( 

260 f"notification_id already set to {self.notification_id}; must be None to create a Notification." 

261 ) 

262 

263 client = self._require_client(client) 

264 

265 query_params = {} 

266 if self.bucket.user_project is not None: 

267 query_params["userProject"] = self.bucket.user_project 

268 

269 path = f"/b/{self.bucket.name}/notificationConfigs" 

270 properties = self._properties.copy() 

271 

272 if self.topic_name is None: 

273 properties["topic"] = _TOPIC_REF_FMT.format(self.topic_project, "") 

274 else: 

275 properties["topic"] = _TOPIC_REF_FMT.format( 

276 self.topic_project, self.topic_name 

277 ) 

278 

279 self._properties = client._post_resource( 

280 path, 

281 properties, 

282 query_params=query_params, 

283 timeout=timeout, 

284 retry=retry, 

285 ) 

286 

287 def exists(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY): 

288 """Test whether this notification exists. 

289 

290 See: 

291 https://cloud.google.com/storage/docs/json_api/v1/notifications/get 

292 

293 If :attr:`user_project` is set on the bucket, bills the API request 

294 to that project. 

295 

296 :type client: :class:`~google.cloud.storage.client.Client` or 

297 ``NoneType`` 

298 :param client: (Optional) The client to use. If not passed, falls back 

299 to the ``client`` stored on the current bucket. 

300 :type timeout: float or tuple 

301 :param timeout: 

302 (Optional) The amount of time, in seconds, to wait 

303 for the server response. See: :ref:`configuring_timeouts` 

304 

305 :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy 

306 :param retry: 

307 (Optional) How to retry the RPC. See: :ref:`configuring_retries` 

308 

309 :rtype: bool 

310 :returns: True, if the notification exists, else False. 

311 :raises ValueError: if the notification has no ID. 

312 """ 

313 with create_trace_span(name="Storage.BucketNotification.exists"): 

314 if self.notification_id is None: 

315 raise ValueError( 

316 "Notification ID not set: set an explicit notification_id" 

317 ) 

318 

319 client = self._require_client(client) 

320 

321 query_params = {} 

322 if self.bucket.user_project is not None: 

323 query_params["userProject"] = self.bucket.user_project 

324 

325 try: 

326 client._get_resource( 

327 self.path, 

328 query_params=query_params, 

329 timeout=timeout, 

330 retry=retry, 

331 ) 

332 except NotFound: 

333 return False 

334 else: 

335 return True 

336 

337 def reload(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY): 

338 """Update this notification from the server configuration. 

339 

340 See: 

341 https://cloud.google.com/storage/docs/json_api/v1/notifications/get 

342 

343 If :attr:`user_project` is set on the bucket, bills the API request 

344 to that project. 

345 

346 :type client: :class:`~google.cloud.storage.client.Client` or 

347 ``NoneType`` 

348 :param client: (Optional) The client to use. If not passed, falls back 

349 to the ``client`` stored on the current bucket. 

350 :type timeout: float or tuple 

351 :param timeout: 

352 (Optional) The amount of time, in seconds, to wait 

353 for the server response. See: :ref:`configuring_timeouts` 

354 

355 :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy 

356 :param retry: 

357 (Optional) How to retry the RPC. See: :ref:`configuring_retries` 

358 

359 

360 :raises ValueError: if the notification has no ID. 

361 """ 

362 with create_trace_span(name="Storage.BucketNotification.reload"): 

363 if self.notification_id is None: 

364 raise ValueError( 

365 "Notification ID not set: set an explicit notification_id" 

366 ) 

367 

368 client = self._require_client(client) 

369 

370 query_params = {} 

371 if self.bucket.user_project is not None: 

372 query_params["userProject"] = self.bucket.user_project 

373 

374 response = client._get_resource( 

375 self.path, 

376 query_params=query_params, 

377 timeout=timeout, 

378 retry=retry, 

379 ) 

380 self._set_properties(response) 

381 

382 def delete(self, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY): 

383 """Delete this notification. 

384 

385 See: 

386 https://cloud.google.com/storage/docs/json_api/v1/notifications/delete 

387 

388 If :attr:`user_project` is set on the bucket, bills the API request 

389 to that project. 

390 

391 :type client: :class:`~google.cloud.storage.client.Client` or 

392 ``NoneType`` 

393 :param client: (Optional) The client to use. If not passed, falls back 

394 to the ``client`` stored on the current bucket. 

395 :type timeout: float or tuple 

396 :param timeout: 

397 (Optional) The amount of time, in seconds, to wait 

398 for the server response. See: :ref:`configuring_timeouts` 

399 

400 :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy 

401 :param retry: 

402 (Optional) How to retry the RPC. See: :ref:`configuring_retries` 

403 

404 :raises: :class:`google.api_core.exceptions.NotFound`: 

405 if the notification does not exist. 

406 :raises ValueError: if the notification has no ID. 

407 """ 

408 with create_trace_span(name="Storage.BucketNotification.delete"): 

409 if self.notification_id is None: 

410 raise ValueError( 

411 "Notification ID not set: set an explicit notification_id" 

412 ) 

413 

414 client = self._require_client(client) 

415 

416 query_params = {} 

417 if self.bucket.user_project is not None: 

418 query_params["userProject"] = self.bucket.user_project 

419 

420 client._delete_resource( 

421 self.path, 

422 query_params=query_params, 

423 timeout=timeout, 

424 retry=retry, 

425 ) 

426 

427 

428def _parse_topic_path(topic_path): 

429 """Verify that a topic path is in the correct format. 

430 

431 Expected to be of the form: 

432 

433 //pubsub.googleapis.com/projects/{project}/topics/{topic} 

434 

435 where the ``project`` value must be "6 to 30 lowercase letters, digits, 

436 or hyphens. It must start with a letter. Trailing hyphens are prohibited." 

437 (see [`resource manager docs`](https://cloud.google.com/resource-manager/reference/rest/v1beta1/projects#Project.FIELDS.project_id)) 

438 and ``topic`` must have length at least two, 

439 must start with a letter and may only contain alphanumeric characters or 

440 ``-``, ``_``, ``.``, ``~``, ``+`` or ``%`` (i.e characters used for URL 

441 encoding, see [`topic spec`](https://cloud.google.com/storage/docs/json_api/v1/notifications/insert#topic)). 

442 

443 Args: 

444 topic_path (str): The topic path to be verified. 

445 

446 Returns: 

447 Tuple[str, str]: The ``project`` and ``topic`` parsed from the 

448 ``topic_path``. 

449 

450 Raises: 

451 ValueError: If the topic path is invalid. 

452 """ 

453 match = _TOPIC_REF_RE.match(topic_path) 

454 if match is None: 

455 raise ValueError(_BAD_TOPIC.format(topic_path)) 

456 

457 return match.group("name"), match.group("project")