Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/rest_streaming.py: 18%
65 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:45 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:45 +0000
1# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Helpers for server-side streaming in REST."""
17from collections import deque
18import string
19from typing import Deque
21import requests
24class ResponseIterator:
25 """Iterator over REST API responses.
27 Args:
28 response (requests.Response): An API response object.
29 response_message_cls (Callable[proto.Message]): A proto
30 class expected to be returned from an API.
31 """
33 def __init__(self, response: requests.Response, response_message_cls):
34 self._response = response
35 self._response_message_cls = response_message_cls
36 # Inner iterator over HTTP response's content.
37 self._response_itr = self._response.iter_content(decode_unicode=True)
38 # Contains a list of JSON responses ready to be sent to user.
39 self._ready_objs: Deque[str] = deque()
40 # Current JSON response being built.
41 self._obj = ""
42 # Keeps track of the nesting level within a JSON object.
43 self._level = 0
44 # Keeps track whether HTTP response is currently sending values
45 # inside of a string value.
46 self._in_string = False
47 # Whether an escape symbol "\" was encountered.
48 self._escape_next = False
50 def cancel(self):
51 """Cancel existing streaming operation."""
52 self._response.close()
54 def _process_chunk(self, chunk: str):
55 if self._level == 0:
56 if chunk[0] != "[":
57 raise ValueError(
58 "Can only parse array of JSON objects, instead got %s" % chunk
59 )
60 for char in chunk:
61 if char == "{":
62 if self._level == 1:
63 # Level 1 corresponds to the outermost JSON object
64 # (i.e. the one we care about).
65 self._obj = ""
66 if not self._in_string:
67 self._level += 1
68 self._obj += char
69 elif char == "}":
70 self._obj += char
71 if not self._in_string:
72 self._level -= 1
73 if not self._in_string and self._level == 1:
74 self._ready_objs.append(self._obj)
75 elif char == '"':
76 # Helps to deal with an escaped quotes inside of a string.
77 if not self._escape_next:
78 self._in_string = not self._in_string
79 self._obj += char
80 elif char in string.whitespace:
81 if self._in_string:
82 self._obj += char
83 elif char == "[":
84 if self._level == 0:
85 self._level += 1
86 else:
87 self._obj += char
88 elif char == "]":
89 if self._level == 1:
90 self._level -= 1
91 else:
92 self._obj += char
93 else:
94 self._obj += char
95 self._escape_next = not self._escape_next if char == "\\" else False
97 def __next__(self):
98 while not self._ready_objs:
99 try:
100 chunk = next(self._response_itr)
101 self._process_chunk(chunk)
102 except StopIteration as e:
103 if self._level > 0:
104 raise ValueError("Unfinished stream: %s" % self._obj)
105 raise e
106 return self._grab()
108 def _grab(self):
109 # Add extra quotes to make json.loads happy.
110 return self._response_message_cls.from_json(self._ready_objs.popleft())
112 def __iter__(self):
113 return self