1# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helpers for server-side streaming in REST."""
16
17from typing import Union
18
19import proto
20import requests
21import google.protobuf.message
22from google.api_core._rest_streaming_base import BaseResponseIterator
23
24
25class ResponseIterator(BaseResponseIterator):
26 """Iterator over REST API responses.
27
28 Args:
29 response (requests.Response): An API response object.
30 response_message_cls (Union[proto.Message, google.protobuf.message.Message]): A response
31 class expected to be returned from an API.
32
33 Raises:
34 ValueError:
35 - If `response_message_cls` is not a subclass of `proto.Message` or `google.protobuf.message.Message`.
36 """
37
38 def __init__(
39 self,
40 response: requests.Response,
41 response_message_cls: Union[proto.Message, google.protobuf.message.Message],
42 ):
43 self._response = response
44 # Inner iterator over HTTP response's content.
45 self._response_itr = self._response.iter_content(decode_unicode=True)
46 super(ResponseIterator, self).__init__(
47 response_message_cls=response_message_cls
48 )
49
50 def cancel(self):
51 """Cancel existing streaming operation."""
52 self._response.close()
53
54 def __next__(self):
55 while not self._ready_objs:
56 try:
57 chunk = next(self._response_itr)
58 self._process_chunk(chunk)
59 except StopIteration as e:
60 if self._level > 0:
61 raise ValueError("Unfinished stream: %s" % self._obj)
62 raise e
63 return self._grab()
64
65 def __iter__(self):
66 return self