Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websockets/streams.py: 75%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

60 statements  

1from __future__ import annotations 

2 

3from collections.abc import Generator 

4 

5 

6class StreamReader: 

7 """ 

8 Generator-based stream reader. 

9 

10 This class doesn't support concurrent calls to :meth:`read_line`, 

11 :meth:`read_exact`, or :meth:`read_to_eof`. Make sure calls are 

12 serialized. 

13 

14 """ 

15 

16 def __init__(self) -> None: 

17 self.buffer = bytearray() 

18 self.eof = False 

19 

20 def read_line(self, m: int) -> Generator[None, None, bytes]: 

21 """ 

22 Read a LF-terminated line from the stream. 

23 

24 This is a generator-based coroutine. 

25 

26 The return value includes the LF character. 

27 

28 Args: 

29 m: Maximum number bytes to read; this is a security limit. 

30 

31 Raises: 

32 EOFError: If the stream ends without a LF. 

33 RuntimeError: If the stream ends in more than ``m`` bytes. 

34 

35 """ 

36 n = 0 # number of bytes to read 

37 p = 0 # number of bytes without a newline 

38 while True: 

39 n = self.buffer.find(b"\n", p) + 1 

40 if n > 0: 

41 break 

42 p = len(self.buffer) 

43 if p > m: 

44 raise RuntimeError(f"read {p} bytes, expected no more than {m} bytes") 

45 if self.eof: 

46 raise EOFError(f"stream ends after {p} bytes, before end of line") 

47 yield 

48 if n > m: 

49 raise RuntimeError(f"read {n} bytes, expected no more than {m} bytes") 

50 r = self.buffer[:n] 

51 del self.buffer[:n] 

52 return r 

53 

54 def read_exact(self, n: int) -> Generator[None, None, bytes]: 

55 """ 

56 Read a given number of bytes from the stream. 

57 

58 This is a generator-based coroutine. 

59 

60 Args: 

61 n: How many bytes to read. 

62 

63 Raises: 

64 EOFError: If the stream ends in less than ``n`` bytes. 

65 

66 """ 

67 assert n >= 0 

68 while len(self.buffer) < n: 

69 if self.eof: 

70 p = len(self.buffer) 

71 raise EOFError(f"stream ends after {p} bytes, expected {n} bytes") 

72 yield 

73 r = self.buffer[:n] 

74 del self.buffer[:n] 

75 return r 

76 

77 def read_to_eof(self, m: int) -> Generator[None, None, bytes]: 

78 """ 

79 Read all bytes from the stream. 

80 

81 This is a generator-based coroutine. 

82 

83 Args: 

84 m: Maximum number bytes to read; this is a security limit. 

85 

86 Raises: 

87 RuntimeError: If the stream ends in more than ``m`` bytes. 

88 

89 """ 

90 while not self.eof: 

91 p = len(self.buffer) 

92 if p > m: 

93 raise RuntimeError(f"read {p} bytes, expected no more than {m} bytes") 

94 yield 

95 r = self.buffer[:] 

96 del self.buffer[:] 

97 return r 

98 

99 def at_eof(self) -> Generator[None, None, bool]: 

100 """ 

101 Tell whether the stream has ended and all data was read. 

102 

103 This is a generator-based coroutine. 

104 

105 """ 

106 while True: 

107 if self.buffer: 

108 return False 

109 if self.eof: 

110 return True 

111 # When all data was read but the stream hasn't ended, we can't 

112 # tell if until either feed_data() or feed_eof() is called. 

113 yield 

114 

115 def feed_data(self, data: bytes) -> None: 

116 """ 

117 Write data to the stream. 

118 

119 :meth:`feed_data` cannot be called after :meth:`feed_eof`. 

120 

121 Args: 

122 data: Data to write. 

123 

124 Raises: 

125 EOFError: If the stream has ended. 

126 

127 """ 

128 if self.eof: 

129 raise EOFError("stream ended") 

130 self.buffer += data 

131 

132 def feed_eof(self) -> None: 

133 """ 

134 End the stream. 

135 

136 :meth:`feed_eof` cannot be called more than once. 

137 

138 Raises: 

139 EOFError: If the stream has ended. 

140 

141 """ 

142 if self.eof: 

143 raise EOFError("stream ended") 

144 self.eof = True 

145 

146 def discard(self) -> None: 

147 """ 

148 Discard all buffered data, but don't end the stream. 

149 

150 """ 

151 del self.buffer[:]