Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/ijson/utils35.py: 28%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1'''
2Python3.5+ specific utilities
3'''
4import collections
6from ijson import utils, common, compat
9class utf8reader_async(compat.utf8reader):
10 """
11 Takes a utf8-encoded string asynchronous reader and asynchronously reads
12 bytes out of it
13 """
14 async def read(self, n):
15 data = await self.str_reader.read(n)
16 return data.encode('utf-8')
18async def _get_read(f):
19 """Returns an awaitable read function that reads the requested type"""
20 if type(await f.read(0)) == bytes:
21 return f.read
22 return compat._warn_and_return(utf8reader_async(f).read)
24class sendable_deque(collections.deque):
25 '''Like utils.sendable_list, but for deque objects'''
26 send = collections.deque.append
28class async_iterable:
29 '''
30 A utility class that implements an async iterator returning values
31 dispatched by a coroutine pipeline after *it* has received values coming
32 from an async file-like object.
33 '''
35 def __init__(self, f, buf_size, *coro_pipeline):
36 self.events = sendable_deque()
37 self.coro = utils.chain(self.events, *coro_pipeline)
38 self.coro_finished = False
39 self.f = f
40 self.buf_size = buf_size
41 self.read = None
43 def __aiter__(self):
44 return self
46 async def __anext__(self):
47 if not self.read:
48 self.read = await _get_read(self.f)
49 if self.events:
50 return self.events.popleft()
51 if self.coro_finished:
52 raise StopAsyncIteration
53 while True:
54 data = await self.read(self.buf_size)
55 try:
56 self.coro.send(data)
57 if self.events:
58 return self.events.popleft()
59 except StopIteration:
60 self.coro_finished = True
61 if self.events:
62 return self.events.popleft()
63 raise StopAsyncIteration
66def _make_basic_parse_async(backend):
67 def basic_parse_async(f, buf_size=64*1024, **config):
68 return async_iterable(f, buf_size,
69 *common._basic_parse_pipeline(backend, config)
70 )
71 return basic_parse_async
73def _make_parse_async(backend):
74 def parse_async(f, buf_size=64*1024, **config):
75 return async_iterable(f, buf_size,
76 *common._parse_pipeline(backend, config)
77 )
78 return parse_async
80def _make_items_async(backend):
81 def items_async(f, prefix, map_type=None, buf_size=64*1024, **config):
82 return async_iterable(f, buf_size,
83 *common._items_pipeline(backend, prefix, map_type, config)
84 )
85 return items_async
87def _make_kvitems_async(backend):
88 def kvitems_async(f, prefix, map_type=None, buf_size=64*1024, **config):
89 return async_iterable(f, buf_size,
90 *common._kvitems_pipeline(backend, prefix, map_type, config)
91 )
92 return kvitems_async