Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/api_core/rest_streaming.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

26 statements  

1# Copyright 2021 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for server-side streaming in REST.""" 

16 

17from typing import Union 

18 

19import proto 

20import requests 

21import google.protobuf.message 

22from google.api_core._rest_streaming_base import BaseResponseIterator 

23 

24 

25class ResponseIterator(BaseResponseIterator): 

26 """Iterator over REST API responses. 

27 

28 Args: 

29 response (requests.Response): An API response object. 

30 response_message_cls (Union[proto.Message, google.protobuf.message.Message]): A response 

31 class expected to be returned from an API. 

32 

33 Raises: 

34 ValueError: 

35 - If `response_message_cls` is not a subclass of `proto.Message` or `google.protobuf.message.Message`. 

36 """ 

37 

38 def __init__( 

39 self, 

40 response: requests.Response, 

41 response_message_cls: Union[proto.Message, google.protobuf.message.Message], 

42 ): 

43 self._response = response 

44 # Inner iterator over HTTP response's content. 

45 self._response_itr = self._response.iter_content(decode_unicode=True) 

46 super(ResponseIterator, self).__init__( 

47 response_message_cls=response_message_cls 

48 ) 

49 

50 def cancel(self): 

51 """Cancel existing streaming operation.""" 

52 self._response.close() 

53 

54 def __next__(self): 

55 while not self._ready_objs: 

56 try: 

57 chunk = next(self._response_itr) 

58 self._process_chunk(chunk) 

59 except StopIteration as e: 

60 if self._level > 0: 

61 raise ValueError("Unfinished stream: %s" % self._obj) 

62 raise e 

63 return self._grab() 

64 

65 def __iter__(self): 

66 return self