Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/ijson/utils35.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

57 statements  

1''' 

2Python3.5+ specific utilities 

3''' 

4import collections 

5 

6from ijson import utils, common, compat 

7 

8 

9class utf8reader_async(compat.utf8reader): 

10 """ 

11 Takes a utf8-encoded string asynchronous reader and asynchronously reads 

12 bytes out of it 

13 """ 

14 async def read(self, n): 

15 data = await self.str_reader.read(n) 

16 return data.encode('utf-8') 

17 

18async def _get_read(f): 

19 """Returns an awaitable read function that reads the requested type""" 

20 if type(await f.read(0)) == bytes: 

21 return f.read 

22 return compat._warn_and_return(utf8reader_async(f).read) 

23 

24class sendable_deque(collections.deque): 

25 '''Like utils.sendable_list, but for deque objects''' 

26 send = collections.deque.append 

27 

28class async_iterable: 

29 ''' 

30 A utility class that implements an async iterator returning values 

31 dispatched by a coroutine pipeline after *it* has received values coming 

32 from an async file-like object. 

33 ''' 

34 

35 def __init__(self, f, buf_size, *coro_pipeline): 

36 self.events = sendable_deque() 

37 self.coro = utils.chain(self.events, *coro_pipeline) 

38 self.coro_finished = False 

39 self.f = f 

40 self.buf_size = buf_size 

41 self.read = None 

42 

43 def __aiter__(self): 

44 return self 

45 

46 async def __anext__(self): 

47 if not self.read: 

48 self.read = await _get_read(self.f) 

49 if self.events: 

50 return self.events.popleft() 

51 if self.coro_finished: 

52 raise StopAsyncIteration 

53 while True: 

54 data = await self.read(self.buf_size) 

55 try: 

56 self.coro.send(data) 

57 if self.events: 

58 return self.events.popleft() 

59 except StopIteration: 

60 self.coro_finished = True 

61 if self.events: 

62 return self.events.popleft() 

63 raise StopAsyncIteration 

64 

65 

66def _make_basic_parse_async(backend): 

67 def basic_parse_async(f, buf_size=64*1024, **config): 

68 return async_iterable(f, buf_size, 

69 *common._basic_parse_pipeline(backend, config) 

70 ) 

71 return basic_parse_async 

72 

73def _make_parse_async(backend): 

74 def parse_async(f, buf_size=64*1024, **config): 

75 return async_iterable(f, buf_size, 

76 *common._parse_pipeline(backend, config) 

77 ) 

78 return parse_async 

79 

80def _make_items_async(backend): 

81 def items_async(f, prefix, map_type=None, buf_size=64*1024, **config): 

82 return async_iterable(f, buf_size, 

83 *common._items_pipeline(backend, prefix, map_type, config) 

84 ) 

85 return items_async 

86 

87def _make_kvitems_async(backend): 

88 def kvitems_async(f, prefix, map_type=None, buf_size=64*1024, **config): 

89 return async_iterable(f, buf_size, 

90 *common._kvitems_pipeline(backend, prefix, map_type, config) 

91 ) 

92 return kvitems_async