Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/glom/streaming.py: 33%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""glom's helpers for streaming use cases.
3Specifier types which yield their results incrementally so that they
4can be applied to targets which are themselves streaming (e.g. chunks
5of rows from a database, lines from a file) without excessive memory
6usage.
8glom's streaming functionality revolves around a single :class:`Iter`
9Specifier type, which has methods to transform the target stream.
10"""
12from itertools import islice, dropwhile, takewhile, chain
13from functools import partial
14try:
15 from itertools import imap, ifilter
16except ImportError:
17 # py3
18 imap = map
19 ifilter = filter
21from boltons.iterutils import split_iter, chunked_iter, windowed_iter, unique_iter, first
22from boltons.funcutils import FunctionBuilder
24from .core import glom, T, STOP, SKIP, _MISSING, Path, TargetRegistry, Call, Spec, Pipe, S, bbrepr, format_invocation
25from .matching import Check
27class Iter:
28 """``Iter()`` is glom's counterpart to Python's built-in :func:`iter()`
29 function. Given an iterable target, ``Iter()`` yields the result
30 of applying the passed spec to each element of the target, similar
31 to the built-in ``[]`` spec, but streaming.
33 The following turns a list of strings into integers using Iter(),
34 before deduplicating and converting it to a tuple:
36 >>> glom(['1', '2', '1', '3'], (Iter(int), set, tuple))
37 (1, 2, 3)
39 ``Iter()`` also has many useful methods which can be chained to
40 compose a stream processing pipeline. The above can also be
41 written as:
43 >>> glom(['1', '2', '1', '3'], (Iter().map(int).unique(), tuple))
44 (1, 2, 3)
46 ``Iter()`` also respects glom's :data:`~glom.SKIP` and
47 :data:`~glom.STOP` singletons for filtering and breaking
48 iteration.
50 Args:
52 subspec: A subspec to be applied on each element from the iterable.
53 sentinel: Keyword-only argument, which, when found in the
54 iterable stream, causes the iteration to stop. Same as with the
55 built-in :func:`iter`.
57 """
58 def __init__(self, subspec=T, **kwargs):
59 self.subspec = subspec
60 self._iter_stack = kwargs.pop('_iter_stack', [])
62 self.sentinel = kwargs.pop('sentinel', STOP)
63 if kwargs:
64 raise TypeError('unexpected keyword arguments: %r' % sorted(kwargs))
65 return
67 def __repr__(self):
68 base_args = ()
69 if self.subspec != T:
70 base_args = (self.subspec,)
71 base = format_invocation(self.__class__.__name__, base_args, repr=bbrepr)
72 chunks = [base]
73 for fname, args, _ in reversed(self._iter_stack):
74 meth = getattr(self, fname)
75 fb = FunctionBuilder.from_func(meth)
76 fb.args = fb.args[1:] # drop self
77 arg_names = fb.get_arg_names()
78 # TODO: something fancier with defaults:
79 kwargs = []
80 if len(args) > 1 and arg_names:
81 args, kwargs = (), zip(arg_names, args)
82 chunks.append('.' + format_invocation(fname, args, kwargs, repr=bbrepr))
83 return ''.join(chunks)
85 def glomit(self, target, scope):
86 iterator = self._iterate(target, scope)
88 for _, _, callback in reversed(self._iter_stack):
89 iterator = callback(iterator, scope)
91 return iter(iterator)
93 def _iterate(self, target, scope):
94 iterate = scope[TargetRegistry].get_handler('iterate', target, path=scope[Path])
95 try:
96 iterator = iterate(target)
97 except Exception as e:
98 raise TypeError('failed to iterate on instance of type %r at %r (got %r)'
99 % (target.__class__.__name__, Path(*scope[Path]), e))
101 base_path = scope[Path]
102 for i, t in enumerate(iterator):
103 scope[Path] = base_path + [i]
104 yld = (t if self.subspec is T else scope[glom](t, self.subspec, scope))
105 if yld is SKIP:
106 continue
107 elif yld is self.sentinel or yld is STOP:
108 # NB: sentinel defaults to STOP so I was torn whether
109 # to also check for STOP, and landed on the side of
110 # never letting STOP through.
111 return
112 yield yld
113 return
115 def _add_op(self, opname, args, callback):
116 return type(self)(subspec=self.subspec, _iter_stack=[(opname, args, callback)] + self._iter_stack)
118 def map(self, subspec):
119 """Return a new :class:`Iter()` spec which will apply the provided
120 *subspec* to each element of the iterable.
122 >>> glom(range(5), Iter().map(lambda x: x * 2).all())
123 [0, 2, 4, 6, 8]
125 Because a spec can be a callable, :meth:`Iter.map()` does
126 everything the built-in :func:`map` does, but with the full
127 power of glom specs.
129 >>> glom(['a', 'B', 'C'], Iter().map(T.islower()).all())
130 [True, False, False]
131 """
132 # whatever validation you want goes here
133 # TODO: DRY the self._add_op with a decorator?
134 return self._add_op(
135 'map',
136 (subspec,),
137 lambda iterable, scope: imap(
138 lambda t: scope[glom](t, subspec, scope), iterable))
140 def filter(self, key=T):
141 """Return a new :class:`Iter()` spec which will include only elements matching the
142 given *key*.
144 >>> glom(range(6), Iter().filter(lambda x: x % 2).all())
145 [1, 3, 5]
147 Because a spec can be a callable, :meth:`Iter.filter()` does
148 everything the built-in :func:`filter` does, but with the full
149 power of glom specs. For even more power, combine,
150 :meth:`Iter.filter()` with :class:`Check()`.
152 >>> # PROTIP: Python's ints know how many binary digits they require, using the bit_length method
153 >>> glom(range(9), Iter().filter(Check(T.bit_length(), one_of=(2, 4), default=SKIP)).all())
154 [2, 3, 8]
156 """
157 # NB: Check's validate function defaults to bool, and
158 # *default* is returned on access errors as well validation
159 # errors, so the lambda passed to ifilter below works fine.
160 check_spec = key if isinstance(key, Check) else Check(key, default=SKIP)
161 return self._add_op(
162 'filter',
163 (key,),
164 lambda iterable, scope: ifilter(
165 lambda t: scope[glom](t, check_spec, scope) is not SKIP, iterable))
167 def chunked(self, size, fill=_MISSING):
168 """Return a new :class:`Iter()` spec which groups elements in the iterable
169 into lists of length *size*.
171 If the optional *fill* argument is provided, iterables not
172 evenly divisible by *size* will be padded out by the *fill*
173 constant. Otherwise, the final chunk will be shorter than *size*.
175 >>> list(glom(range(10), Iter().chunked(3)))
176 [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
177 >>> list(glom(range(10), Iter().chunked(3, fill=None)))
178 [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, None, None]]
179 """
180 kw = {'size': size}
181 args = size,
182 if fill is not _MISSING:
183 kw['fill'] = fill
184 args += (fill,)
185 return self._add_op(
186 'chunked', args, lambda it, scope: chunked_iter(it, **kw))
188 def windowed(self, size):
189 """Return a new :class:`Iter()` spec which will yield a sliding window of
190 adjacent elements in the iterable. Each tuple yielded will be
191 of length *size*.
193 Useful for getting adjacent pairs and triples.
195 >>> list(glom(range(4), Iter().windowed(2)))
196 [(0, 1), (1, 2), (2, 3)]
197 """
198 return self._add_op(
199 'windowed', (size,), lambda it, scope: windowed_iter(it, size))
201 def split(self, sep=None, maxsplit=None):
202 """Return a new :class:`Iter()` spec which will lazily split an iterable based
203 on a separator (or list of separators), *sep*. Like
204 :meth:`str.split()`, but for all iterables.
206 ``split_iter()`` yields lists of non-separator values. A separator will
207 never appear in the output.
209 >>> target = [1, 2, None, None, 3, None, 4, None]
210 >>> list(glom(target, Iter().split()))
211 [[1, 2], [3], [4]]
213 Note that ``split_iter`` is based on :func:`str.split`, so if
214 *sep* is ``None``, ``split()`` **groups** separators. If empty lists
215 are desired between two contiguous ``None`` values, simply use
216 ``sep=[None]``:
218 >>> list(glom(target, Iter().split(sep=[None])))
219 [[1, 2], [], [3], [4], []]
221 A max number of splits may also be set:
223 >>> list(glom(target, Iter().split(maxsplit=2)))
224 [[1, 2], [3], [4, None]]
226 """
227 return self._add_op(
228 'split',
229 (sep, maxsplit),
230 lambda it, scope: split_iter(it, sep=sep, maxsplit=maxsplit))
232 def flatten(self):
233 """Returns a new :class:`Iter()` instance which combines iterables into a
234 single iterable.
236 >>> target = [[1, 2], [3, 4], [5]]
237 >>> list(glom(target, Iter().flatten()))
238 [1, 2, 3, 4, 5]
239 """
240 return self._add_op(
241 'flatten',
242 (),
243 lambda it, scope: chain.from_iterable(it))
245 def unique(self, key=T):
246 """Return a new :class:`Iter()` spec which lazily filters out duplicate
247 values, i.e., only the first appearance of a value in a stream will
248 be yielded.
250 >>> target = list('gloMolIcious')
251 >>> out = list(glom(target, Iter().unique(T.lower())))
252 >>> print(''.join(out))
253 gloMIcus
254 """
255 return self._add_op(
256 'unique',
257 (key,),
258 lambda it, scope: unique_iter(it, key=lambda t: scope[glom](t, key, scope)))
261 def slice(self, *args):
262 """Returns a new :class:`Iter()` spec which trims iterables in the
263 same manner as :func:`itertools.islice`.
265 >>> target = [0, 1, 2, 3, 4, 5]
266 >>> glom(target, Iter().slice(3).all())
267 [0, 1, 2]
268 >>> glom(target, Iter().slice(2, 4).all())
269 [2, 3]
271 This method accepts only positional arguments.
272 """
273 # TODO: make a kwarg-compatible version of this (islice takes no kwargs)
274 # TODO: also support slice syntax Iter()[::]
275 try:
276 islice([], *args)
277 except TypeError:
278 raise TypeError(f'invalid slice arguments: {args!r}')
279 return self._add_op('slice', args, lambda it, scope: islice(it, *args))
281 def limit(self, count):
282 """A convenient alias for :meth:`~Iter.slice`, which takes a single
283 argument, *count*, the max number of items to yield.
284 """
285 return self._add_op('limit', (count,), lambda it, scope: islice(it, count))
287 def takewhile(self, key=T):
288 """Returns a new :class:`Iter()` spec which stops the stream once
289 *key* becomes falsy.
291 >>> glom([3, 2, 0, 1], Iter().takewhile().all())
292 [3, 2]
294 :func:`itertools.takewhile` for more details.
295 """
296 return self._add_op(
297 'takewhile',
298 (key,),
299 lambda it, scope: takewhile(
300 lambda t: scope[glom](t, key, scope), it))
302 def dropwhile(self, key=T):
303 """Returns a new :class:`Iter()` spec which drops stream items until
304 *key* becomes falsy.
306 >>> glom([0, 0, 3, 2, 0], Iter().dropwhile(lambda t: t < 1).all())
307 [3, 2, 0]
309 Note that while similar to :meth:`Iter.filter()`, the filter
310 only applies to the beginning of the stream. In a way,
311 :meth:`Iter.dropwhile` can be thought of as
312 :meth:`~str.lstrip()` for streams. See
313 :func:`itertools.dropwhile` for more details.
315 """
317 return self._add_op(
318 'dropwhile',
319 (key,),
320 lambda it, scope: dropwhile(
321 lambda t: scope[glom](t, key, scope), it))
323 # Terminal methods follow
325 def all(self):
326 """A convenience method which returns a new spec which turns an
327 iterable into a list.
329 >>> glom(range(5), Iter(lambda t: t * 2).all())
330 [0, 2, 4, 6, 8]
332 Note that this spec will always consume the whole iterable, and as
333 such, the spec returned is *not* an :class:`Iter()` instance.
334 """
335 return Pipe(self, list)
337 def first(self, key=T, default=None):
338 """A convenience method for lazily yielding a single truthy item from
339 an iterable.
341 >>> target = [False, 1, 2, 3]
342 >>> glom(target, Iter().first())
343 1
345 This method takes a condition, *key*, which can also be a
346 glomspec, as well as a *default*, in case nothing matches the
347 condition.
349 As this spec yields at most one item, and not an iterable, the
350 spec returned from this method is not an :class:`Iter()` instance.
351 """
352 return (self, First(key=key, default=default))
355class First:
356 """Get the first element of an iterable which matches *key*, if there
357 is one, otherwise return *default* (``None`` if unset).
359 >>> is_odd = lambda x: x % 2
360 >>> glom([0, 1, 2, 3], First(is_odd))
361 1
362 >>> glom([0, 2, 4], First(is_odd, default=False))
363 False
364 """
365 # The impl of this ain't pretty and basically just exists for a
366 # nicer-looking repr. (Iter(), First()) is the equivalent of doing
367 # (Iter().filter(spec), Call(first, args=(T,), kwargs={'default':
368 # default}))
369 __slots__ = ('_spec', '_default', '_first')
371 def __init__(self, key=T, default=None):
372 self._spec = key
373 self._default = default
375 spec_glom = Spec(Call(partial, args=(Spec(self._spec).glom,), kwargs={'scope': S}))
376 self._first = Call(first, args=(T,), kwargs={'default': default, 'key': spec_glom})
378 def glomit(self, target, scope):
379 return self._first.glomit(target, scope)
381 def __repr__(self):
382 cn = self.__class__.__name__
383 if self._default is None:
384 return f'{cn}({bbrepr(self._spec)})'
385 return f'{cn}({bbrepr(self._spec)}, default={bbrepr(self._default)})'