Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/glom/streaming.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

109 statements  

1"""glom's helpers for streaming use cases. 

2 

3Specifier types which yield their results incrementally so that they 

4can be applied to targets which are themselves streaming (e.g. chunks 

5of rows from a database, lines from a file) without excessive memory 

6usage. 

7 

8glom's streaming functionality revolves around a single :class:`Iter` 

9Specifier type, which has methods to transform the target stream. 

10""" 

11 

12from itertools import islice, dropwhile, takewhile, chain 

13from functools import partial 

14try: 

15 from itertools import imap, ifilter 

16except ImportError: 

17 # py3 

18 imap = map 

19 ifilter = filter 

20 

21from boltons.iterutils import split_iter, chunked_iter, windowed_iter, unique_iter, first 

22from boltons.funcutils import FunctionBuilder 

23 

24from .core import glom, T, STOP, SKIP, _MISSING, Path, TargetRegistry, Call, Spec, Pipe, S, bbrepr, format_invocation 

25from .matching import Check 

26 

27class Iter: 

28 """``Iter()`` is glom's counterpart to Python's built-in :func:`iter()` 

29 function. Given an iterable target, ``Iter()`` yields the result 

30 of applying the passed spec to each element of the target, similar 

31 to the built-in ``[]`` spec, but streaming. 

32 

33 The following turns a list of strings into integers using Iter(), 

34 before deduplicating and converting it to a tuple: 

35 

36 >>> glom(['1', '2', '1', '3'], (Iter(int), set, tuple)) 

37 (1, 2, 3) 

38 

39 ``Iter()`` also has many useful methods which can be chained to 

40 compose a stream processing pipeline. The above can also be 

41 written as: 

42 

43 >>> glom(['1', '2', '1', '3'], (Iter().map(int).unique(), tuple)) 

44 (1, 2, 3) 

45 

46 ``Iter()`` also respects glom's :data:`~glom.SKIP` and 

47 :data:`~glom.STOP` singletons for filtering and breaking 

48 iteration. 

49 

50 Args: 

51 

52 subspec: A subspec to be applied on each element from the iterable. 

53 sentinel: Keyword-only argument, which, when found in the 

54 iterable stream, causes the iteration to stop. Same as with the 

55 built-in :func:`iter`. 

56 

57 """ 

58 def __init__(self, subspec=T, **kwargs): 

59 self.subspec = subspec 

60 self._iter_stack = kwargs.pop('_iter_stack', []) 

61 

62 self.sentinel = kwargs.pop('sentinel', STOP) 

63 if kwargs: 

64 raise TypeError('unexpected keyword arguments: %r' % sorted(kwargs)) 

65 return 

66 

67 def __repr__(self): 

68 base_args = () 

69 if self.subspec != T: 

70 base_args = (self.subspec,) 

71 base = format_invocation(self.__class__.__name__, base_args, repr=bbrepr) 

72 chunks = [base] 

73 for fname, args, _ in reversed(self._iter_stack): 

74 meth = getattr(self, fname) 

75 fb = FunctionBuilder.from_func(meth) 

76 fb.args = fb.args[1:] # drop self 

77 arg_names = fb.get_arg_names() 

78 # TODO: something fancier with defaults: 

79 kwargs = [] 

80 if len(args) > 1 and arg_names: 

81 args, kwargs = (), zip(arg_names, args) 

82 chunks.append('.' + format_invocation(fname, args, kwargs, repr=bbrepr)) 

83 return ''.join(chunks) 

84 

85 def glomit(self, target, scope): 

86 iterator = self._iterate(target, scope) 

87 

88 for _, _, callback in reversed(self._iter_stack): 

89 iterator = callback(iterator, scope) 

90 

91 return iter(iterator) 

92 

93 def _iterate(self, target, scope): 

94 iterate = scope[TargetRegistry].get_handler('iterate', target, path=scope[Path]) 

95 try: 

96 iterator = iterate(target) 

97 except Exception as e: 

98 raise TypeError('failed to iterate on instance of type %r at %r (got %r)' 

99 % (target.__class__.__name__, Path(*scope[Path]), e)) 

100 

101 base_path = scope[Path] 

102 for i, t in enumerate(iterator): 

103 scope[Path] = base_path + [i] 

104 yld = (t if self.subspec is T else scope[glom](t, self.subspec, scope)) 

105 if yld is SKIP: 

106 continue 

107 elif yld is self.sentinel or yld is STOP: 

108 # NB: sentinel defaults to STOP so I was torn whether 

109 # to also check for STOP, and landed on the side of 

110 # never letting STOP through. 

111 return 

112 yield yld 

113 return 

114 

115 def _add_op(self, opname, args, callback): 

116 return type(self)(subspec=self.subspec, _iter_stack=[(opname, args, callback)] + self._iter_stack) 

117 

118 def map(self, subspec): 

119 """Return a new :class:`Iter()` spec which will apply the provided 

120 *subspec* to each element of the iterable. 

121 

122 >>> glom(range(5), Iter().map(lambda x: x * 2).all()) 

123 [0, 2, 4, 6, 8] 

124 

125 Because a spec can be a callable, :meth:`Iter.map()` does 

126 everything the built-in :func:`map` does, but with the full 

127 power of glom specs. 

128 

129 >>> glom(['a', 'B', 'C'], Iter().map(T.islower()).all()) 

130 [True, False, False] 

131 """ 

132 # whatever validation you want goes here 

133 # TODO: DRY the self._add_op with a decorator? 

134 return self._add_op( 

135 'map', 

136 (subspec,), 

137 lambda iterable, scope: imap( 

138 lambda t: scope[glom](t, subspec, scope), iterable)) 

139 

140 def filter(self, key=T): 

141 """Return a new :class:`Iter()` spec which will include only elements matching the 

142 given *key*. 

143 

144 >>> glom(range(6), Iter().filter(lambda x: x % 2).all()) 

145 [1, 3, 5] 

146 

147 Because a spec can be a callable, :meth:`Iter.filter()` does 

148 everything the built-in :func:`filter` does, but with the full 

149 power of glom specs. For even more power, combine, 

150 :meth:`Iter.filter()` with :class:`Check()`. 

151 

152 >>> # PROTIP: Python's ints know how many binary digits they require, using the bit_length method 

153 >>> glom(range(9), Iter().filter(Check(T.bit_length(), one_of=(2, 4), default=SKIP)).all()) 

154 [2, 3, 8] 

155 

156 """ 

157 # NB: Check's validate function defaults to bool, and 

158 # *default* is returned on access errors as well validation 

159 # errors, so the lambda passed to ifilter below works fine. 

160 check_spec = key if isinstance(key, Check) else Check(key, default=SKIP) 

161 return self._add_op( 

162 'filter', 

163 (key,), 

164 lambda iterable, scope: ifilter( 

165 lambda t: scope[glom](t, check_spec, scope) is not SKIP, iterable)) 

166 

167 def chunked(self, size, fill=_MISSING): 

168 """Return a new :class:`Iter()` spec which groups elements in the iterable 

169 into lists of length *size*. 

170 

171 If the optional *fill* argument is provided, iterables not 

172 evenly divisible by *size* will be padded out by the *fill* 

173 constant. Otherwise, the final chunk will be shorter than *size*. 

174 

175 >>> list(glom(range(10), Iter().chunked(3))) 

176 [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]] 

177 >>> list(glom(range(10), Iter().chunked(3, fill=None))) 

178 [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, None, None]] 

179 """ 

180 kw = {'size': size} 

181 args = size, 

182 if fill is not _MISSING: 

183 kw['fill'] = fill 

184 args += (fill,) 

185 return self._add_op( 

186 'chunked', args, lambda it, scope: chunked_iter(it, **kw)) 

187 

188 def windowed(self, size): 

189 """Return a new :class:`Iter()` spec which will yield a sliding window of 

190 adjacent elements in the iterable. Each tuple yielded will be 

191 of length *size*. 

192 

193 Useful for getting adjacent pairs and triples. 

194 

195 >>> list(glom(range(4), Iter().windowed(2))) 

196 [(0, 1), (1, 2), (2, 3)] 

197 """ 

198 return self._add_op( 

199 'windowed', (size,), lambda it, scope: windowed_iter(it, size)) 

200 

201 def split(self, sep=None, maxsplit=None): 

202 """Return a new :class:`Iter()` spec which will lazily split an iterable based 

203 on a separator (or list of separators), *sep*. Like 

204 :meth:`str.split()`, but for all iterables. 

205 

206 ``split_iter()`` yields lists of non-separator values. A separator will 

207 never appear in the output. 

208 

209 >>> target = [1, 2, None, None, 3, None, 4, None] 

210 >>> list(glom(target, Iter().split())) 

211 [[1, 2], [3], [4]] 

212 

213 Note that ``split_iter`` is based on :func:`str.split`, so if 

214 *sep* is ``None``, ``split()`` **groups** separators. If empty lists 

215 are desired between two contiguous ``None`` values, simply use 

216 ``sep=[None]``: 

217 

218 >>> list(glom(target, Iter().split(sep=[None]))) 

219 [[1, 2], [], [3], [4], []] 

220 

221 A max number of splits may also be set: 

222 

223 >>> list(glom(target, Iter().split(maxsplit=2))) 

224 [[1, 2], [3], [4, None]] 

225 

226 """ 

227 return self._add_op( 

228 'split', 

229 (sep, maxsplit), 

230 lambda it, scope: split_iter(it, sep=sep, maxsplit=maxsplit)) 

231 

232 def flatten(self): 

233 """Returns a new :class:`Iter()` instance which combines iterables into a 

234 single iterable. 

235 

236 >>> target = [[1, 2], [3, 4], [5]] 

237 >>> list(glom(target, Iter().flatten())) 

238 [1, 2, 3, 4, 5] 

239 """ 

240 return self._add_op( 

241 'flatten', 

242 (), 

243 lambda it, scope: chain.from_iterable(it)) 

244 

245 def unique(self, key=T): 

246 """Return a new :class:`Iter()` spec which lazily filters out duplicate 

247 values, i.e., only the first appearance of a value in a stream will 

248 be yielded. 

249 

250 >>> target = list('gloMolIcious') 

251 >>> out = list(glom(target, Iter().unique(T.lower()))) 

252 >>> print(''.join(out)) 

253 gloMIcus 

254 """ 

255 return self._add_op( 

256 'unique', 

257 (key,), 

258 lambda it, scope: unique_iter(it, key=lambda t: scope[glom](t, key, scope))) 

259 

260 

261 def slice(self, *args): 

262 """Returns a new :class:`Iter()` spec which trims iterables in the 

263 same manner as :func:`itertools.islice`. 

264 

265 >>> target = [0, 1, 2, 3, 4, 5] 

266 >>> glom(target, Iter().slice(3).all()) 

267 [0, 1, 2] 

268 >>> glom(target, Iter().slice(2, 4).all()) 

269 [2, 3] 

270 

271 This method accepts only positional arguments. 

272 """ 

273 # TODO: make a kwarg-compatible version of this (islice takes no kwargs) 

274 # TODO: also support slice syntax Iter()[::] 

275 try: 

276 islice([], *args) 

277 except TypeError: 

278 raise TypeError(f'invalid slice arguments: {args!r}') 

279 return self._add_op('slice', args, lambda it, scope: islice(it, *args)) 

280 

281 def limit(self, count): 

282 """A convenient alias for :meth:`~Iter.slice`, which takes a single 

283 argument, *count*, the max number of items to yield. 

284 """ 

285 return self._add_op('limit', (count,), lambda it, scope: islice(it, count)) 

286 

287 def takewhile(self, key=T): 

288 """Returns a new :class:`Iter()` spec which stops the stream once 

289 *key* becomes falsy. 

290 

291 >>> glom([3, 2, 0, 1], Iter().takewhile().all()) 

292 [3, 2] 

293 

294 :func:`itertools.takewhile` for more details. 

295 """ 

296 return self._add_op( 

297 'takewhile', 

298 (key,), 

299 lambda it, scope: takewhile( 

300 lambda t: scope[glom](t, key, scope), it)) 

301 

302 def dropwhile(self, key=T): 

303 """Returns a new :class:`Iter()` spec which drops stream items until 

304 *key* becomes falsy. 

305 

306 >>> glom([0, 0, 3, 2, 0], Iter().dropwhile(lambda t: t < 1).all()) 

307 [3, 2, 0] 

308 

309 Note that while similar to :meth:`Iter.filter()`, the filter 

310 only applies to the beginning of the stream. In a way, 

311 :meth:`Iter.dropwhile` can be thought of as 

312 :meth:`~str.lstrip()` for streams. See 

313 :func:`itertools.dropwhile` for more details. 

314 

315 """ 

316 

317 return self._add_op( 

318 'dropwhile', 

319 (key,), 

320 lambda it, scope: dropwhile( 

321 lambda t: scope[glom](t, key, scope), it)) 

322 

323 # Terminal methods follow 

324 

325 def all(self): 

326 """A convenience method which returns a new spec which turns an 

327 iterable into a list. 

328 

329 >>> glom(range(5), Iter(lambda t: t * 2).all()) 

330 [0, 2, 4, 6, 8] 

331 

332 Note that this spec will always consume the whole iterable, and as 

333 such, the spec returned is *not* an :class:`Iter()` instance. 

334 """ 

335 return Pipe(self, list) 

336 

337 def first(self, key=T, default=None): 

338 """A convenience method for lazily yielding a single truthy item from 

339 an iterable. 

340 

341 >>> target = [False, 1, 2, 3] 

342 >>> glom(target, Iter().first()) 

343 1 

344 

345 This method takes a condition, *key*, which can also be a 

346 glomspec, as well as a *default*, in case nothing matches the 

347 condition. 

348 

349 As this spec yields at most one item, and not an iterable, the 

350 spec returned from this method is not an :class:`Iter()` instance. 

351 """ 

352 return (self, First(key=key, default=default)) 

353 

354 

355class First: 

356 """Get the first element of an iterable which matches *key*, if there 

357 is one, otherwise return *default* (``None`` if unset). 

358 

359 >>> is_odd = lambda x: x % 2 

360 >>> glom([0, 1, 2, 3], First(is_odd)) 

361 1 

362 >>> glom([0, 2, 4], First(is_odd, default=False)) 

363 False 

364 """ 

365 # The impl of this ain't pretty and basically just exists for a 

366 # nicer-looking repr. (Iter(), First()) is the equivalent of doing 

367 # (Iter().filter(spec), Call(first, args=(T,), kwargs={'default': 

368 # default})) 

369 __slots__ = ('_spec', '_default', '_first') 

370 

371 def __init__(self, key=T, default=None): 

372 self._spec = key 

373 self._default = default 

374 

375 spec_glom = Spec(Call(partial, args=(Spec(self._spec).glom,), kwargs={'scope': S})) 

376 self._first = Call(first, args=(T,), kwargs={'default': default, 'key': spec_glom}) 

377 

378 def glomit(self, target, scope): 

379 return self._first.glomit(target, scope) 

380 

381 def __repr__(self): 

382 cn = self.__class__.__name__ 

383 if self._default is None: 

384 return f'{cn}({bbrepr(self._spec)})' 

385 return f'{cn}({bbrepr(self._spec)}, default={bbrepr(self._default)})'