Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/websockets/streams.py: 76%
58 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:20 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:20 +0000
1from __future__ import annotations
3from typing import Generator
6class StreamReader:
7 """
8 Generator-based stream reader.
10 This class doesn't support concurrent calls to :meth:`read_line`,
11 :meth:`read_exact`, or :meth:`read_to_eof`. Make sure calls are
12 serialized.
14 """
16 def __init__(self) -> None:
17 self.buffer = bytearray()
18 self.eof = False
20 def read_line(self, m: int) -> Generator[None, None, bytes]:
21 """
22 Read a LF-terminated line from the stream.
24 This is a generator-based coroutine.
26 The return value includes the LF character.
28 Args:
29 m: maximum number bytes to read; this is a security limit.
31 Raises:
32 EOFError: if the stream ends without a LF.
33 RuntimeError: if the stream ends in more than ``m`` bytes.
35 """
36 n = 0 # number of bytes to read
37 p = 0 # number of bytes without a newline
38 while True:
39 n = self.buffer.find(b"\n", p) + 1
40 if n > 0:
41 break
42 p = len(self.buffer)
43 if p > m:
44 raise RuntimeError(f"read {p} bytes, expected no more than {m} bytes")
45 if self.eof:
46 raise EOFError(f"stream ends after {p} bytes, before end of line")
47 yield
48 if n > m:
49 raise RuntimeError(f"read {n} bytes, expected no more than {m} bytes")
50 r = self.buffer[:n]
51 del self.buffer[:n]
52 return r
54 def read_exact(self, n: int) -> Generator[None, None, bytes]:
55 """
56 Read a given number of bytes from the stream.
58 This is a generator-based coroutine.
60 Args:
61 n: how many bytes to read.
63 Raises:
64 EOFError: if the stream ends in less than ``n`` bytes.
66 """
67 assert n >= 0
68 while len(self.buffer) < n:
69 if self.eof:
70 p = len(self.buffer)
71 raise EOFError(f"stream ends after {p} bytes, expected {n} bytes")
72 yield
73 r = self.buffer[:n]
74 del self.buffer[:n]
75 return r
77 def read_to_eof(self, m: int) -> Generator[None, None, bytes]:
78 """
79 Read all bytes from the stream.
81 This is a generator-based coroutine.
83 Args:
84 m: maximum number bytes to read; this is a security limit.
86 Raises:
87 RuntimeError: if the stream ends in more than ``m`` bytes.
89 """
90 while not self.eof:
91 p = len(self.buffer)
92 if p > m:
93 raise RuntimeError(f"read {p} bytes, expected no more than {m} bytes")
94 yield
95 r = self.buffer[:]
96 del self.buffer[:]
97 return r
99 def at_eof(self) -> Generator[None, None, bool]:
100 """
101 Tell whether the stream has ended and all data was read.
103 This is a generator-based coroutine.
105 """
106 while True:
107 if self.buffer:
108 return False
109 if self.eof:
110 return True
111 # When all data was read but the stream hasn't ended, we can't
112 # tell if until either feed_data() or feed_eof() is called.
113 yield
115 def feed_data(self, data: bytes) -> None:
116 """
117 Write data to the stream.
119 :meth:`feed_data` cannot be called after :meth:`feed_eof`.
121 Args:
122 data: data to write.
124 Raises:
125 EOFError: if the stream has ended.
127 """
128 if self.eof:
129 raise EOFError("stream ended")
130 self.buffer += data
132 def feed_eof(self) -> None:
133 """
134 End the stream.
136 :meth:`feed_eof` cannot be called more than once.
138 Raises:
139 EOFError: if the stream has ended.
141 """
142 if self.eof:
143 raise EOFError("stream ended")
144 self.eof = True
146 def discard(self) -> None:
147 """
148 Discard all buffered data, but don't end the stream.
150 """
151 del self.buffer[:]