1import collections.abc
2import functools
3import inspect
4import itertools
5import operator
6import time
7import types
8import warnings
9
10import more_itertools
11
12
13def compose(*funcs):
14 """
15 Compose any number of unary functions into a single unary function.
16
17 Comparable to
18 `function composition <https://en.wikipedia.org/wiki/Function_composition>`_
19 in mathematics:
20
21 ``h = g ∘ f`` implies ``h(x) = g(f(x))``.
22
23 In Python, ``h = compose(g, f)``.
24
25 >>> import textwrap
26 >>> expected = str.strip(textwrap.dedent(compose.__doc__))
27 >>> strip_and_dedent = compose(str.strip, textwrap.dedent)
28 >>> strip_and_dedent(compose.__doc__) == expected
29 True
30
31 Compose also allows the innermost function to take arbitrary arguments.
32
33 >>> round_three = lambda x: round(x, ndigits=3)
34 >>> f = compose(round_three, int.__truediv__)
35 >>> [f(3*x, x+1) for x in range(1,10)]
36 [1.5, 2.0, 2.25, 2.4, 2.5, 2.571, 2.625, 2.667, 2.7]
37 """
38
39 def compose_two(f1, f2):
40 return lambda *args, **kwargs: f1(f2(*args, **kwargs))
41
42 return functools.reduce(compose_two, funcs)
43
44
45def once(func):
46 """
47 Decorate func so it's only ever called the first time.
48
49 This decorator can ensure that an expensive or non-idempotent function
50 will not be expensive on subsequent calls and is idempotent.
51
52 >>> add_three = once(lambda a: a+3)
53 >>> add_three(3)
54 6
55 >>> add_three(9)
56 6
57 >>> add_three('12')
58 6
59
60 To reset the stored value, simply clear the property ``saved_result``.
61
62 >>> del add_three.saved_result
63 >>> add_three(9)
64 12
65 >>> add_three(8)
66 12
67
68 Or invoke 'reset()' on it.
69
70 >>> add_three.reset()
71 >>> add_three(-3)
72 0
73 >>> add_three(0)
74 0
75 """
76
77 @functools.wraps(func)
78 def wrapper(*args, **kwargs):
79 if not hasattr(wrapper, 'saved_result'):
80 wrapper.saved_result = func(*args, **kwargs)
81 return wrapper.saved_result
82
83 wrapper.reset = lambda: vars(wrapper).__delitem__('saved_result')
84 return wrapper
85
86
87def method_cache(method, cache_wrapper=functools.lru_cache()):
88 """
89 Wrap lru_cache to support storing the cache data in the object instances.
90
91 Abstracts the common paradigm where the method explicitly saves an
92 underscore-prefixed protected property on first call and returns that
93 subsequently.
94
95 >>> class MyClass:
96 ... calls = 0
97 ...
98 ... @method_cache
99 ... def method(self, value):
100 ... self.calls += 1
101 ... return value
102
103 >>> a = MyClass()
104 >>> a.method(3)
105 3
106 >>> for x in range(75):
107 ... res = a.method(x)
108 >>> a.calls
109 75
110
111 Note that the apparent behavior will be exactly like that of lru_cache
112 except that the cache is stored on each instance, so values in one
113 instance will not flush values from another, and when an instance is
114 deleted, so are the cached values for that instance.
115
116 >>> b = MyClass()
117 >>> for x in range(35):
118 ... res = b.method(x)
119 >>> b.calls
120 35
121 >>> a.method(0)
122 0
123 >>> a.calls
124 75
125
126 Note that if method had been decorated with ``functools.lru_cache()``,
127 a.calls would have been 76 (due to the cached value of 0 having been
128 flushed by the 'b' instance).
129
130 Clear the cache with ``.cache_clear()``
131
132 >>> a.method.cache_clear()
133
134 Same for a method that hasn't yet been called.
135
136 >>> c = MyClass()
137 >>> c.method.cache_clear()
138
139 Another cache wrapper may be supplied:
140
141 >>> cache = functools.lru_cache(maxsize=2)
142 >>> MyClass.method2 = method_cache(lambda self: 3, cache_wrapper=cache)
143 >>> a = MyClass()
144 >>> a.method2()
145 3
146
147 Caution - do not subsequently wrap the method with another decorator, such
148 as ``@property``, which changes the semantics of the function.
149
150 See also
151 http://code.activestate.com/recipes/577452-a-memoize-decorator-for-instance-methods/
152 for another implementation and additional justification.
153 """
154
155 def wrapper(self, *args, **kwargs):
156 # it's the first call, replace the method with a cached, bound method
157 bound_method = types.MethodType(method, self)
158 cached_method = cache_wrapper(bound_method)
159 setattr(self, method.__name__, cached_method)
160 return cached_method(*args, **kwargs)
161
162 # Support cache clear even before cache has been created.
163 wrapper.cache_clear = lambda: None
164
165 return _special_method_cache(method, cache_wrapper) or wrapper
166
167
168def _special_method_cache(method, cache_wrapper):
169 """
170 Because Python treats special methods differently, it's not
171 possible to use instance attributes to implement the cached
172 methods.
173
174 Instead, install the wrapper method under a different name
175 and return a simple proxy to that wrapper.
176
177 https://github.com/jaraco/jaraco.functools/issues/5
178 """
179 name = method.__name__
180 special_names = '__getattr__', '__getitem__'
181
182 if name not in special_names:
183 return None
184
185 wrapper_name = '__cached' + name
186
187 def proxy(self, /, *args, **kwargs):
188 if wrapper_name not in vars(self):
189 bound = types.MethodType(method, self)
190 cache = cache_wrapper(bound)
191 setattr(self, wrapper_name, cache)
192 else:
193 cache = getattr(self, wrapper_name)
194 return cache(*args, **kwargs)
195
196 return proxy
197
198
199def apply(transform):
200 """
201 Decorate a function with a transform function that is
202 invoked on results returned from the decorated function.
203
204 >>> @apply(reversed)
205 ... def get_numbers(start):
206 ... "doc for get_numbers"
207 ... return range(start, start+3)
208 >>> list(get_numbers(4))
209 [6, 5, 4]
210 >>> get_numbers.__doc__
211 'doc for get_numbers'
212 """
213
214 def wrap(func):
215 return functools.wraps(func)(compose(transform, func))
216
217 return wrap
218
219
220def result_invoke(action):
221 r"""
222 Decorate a function with an action function that is
223 invoked on the results returned from the decorated
224 function (for its side effect), then return the original
225 result.
226
227 >>> @result_invoke(print)
228 ... def add_two(a, b):
229 ... return a + b
230 >>> x = add_two(2, 3)
231 5
232 >>> x
233 5
234 """
235
236 def wrap(func):
237 @functools.wraps(func)
238 def wrapper(*args, **kwargs):
239 result = func(*args, **kwargs)
240 action(result)
241 return result
242
243 return wrapper
244
245 return wrap
246
247
248def invoke(f, /, *args, **kwargs):
249 """
250 Call a function for its side effect after initialization.
251
252 The benefit of using the decorator instead of simply invoking a function
253 after defining it is that it makes explicit the author's intent for the
254 function to be called immediately. Whereas if one simply calls the
255 function immediately, it's less obvious if that was intentional or
256 incidental. It also avoids repeating the name - the two actions, defining
257 the function and calling it immediately are modeled separately, but linked
258 by the decorator construct.
259
260 The benefit of having a function construct (opposed to just invoking some
261 behavior inline) is to serve as a scope in which the behavior occurs. It
262 avoids polluting the global namespace with local variables, provides an
263 anchor on which to attach documentation (docstring), keeps the behavior
264 logically separated (instead of conceptually separated or not separated at
265 all), and provides potential to re-use the behavior for testing or other
266 purposes.
267
268 This function is named as a pithy way to communicate, "call this function
269 primarily for its side effect", or "while defining this function, also
270 take it aside and call it". It exists because there's no Python construct
271 for "define and call" (nor should there be, as decorators serve this need
272 just fine). The behavior happens immediately and synchronously.
273
274 >>> @invoke
275 ... def func(): print("called")
276 called
277 >>> func()
278 called
279
280 Use functools.partial to pass parameters to the initial call
281
282 >>> @functools.partial(invoke, name='bingo')
283 ... def func(name): print('called with', name)
284 called with bingo
285 """
286 f(*args, **kwargs)
287 return f
288
289
290class Throttler:
291 """Rate-limit a function (or other callable)."""
292
293 def __init__(self, func, max_rate=float('Inf')):
294 if isinstance(func, Throttler):
295 func = func.func
296 self.func = func
297 self.max_rate = max_rate
298 self.reset()
299
300 def reset(self):
301 self.last_called = 0
302
303 def __call__(self, *args, **kwargs):
304 self._wait()
305 return self.func(*args, **kwargs)
306
307 def _wait(self):
308 """Ensure at least 1/max_rate seconds from last call."""
309 elapsed = time.time() - self.last_called
310 must_wait = 1 / self.max_rate - elapsed
311 time.sleep(max(0, must_wait))
312 self.last_called = time.time()
313
314 def __get__(self, obj, owner=None):
315 return first_invoke(self._wait, functools.partial(self.func, obj))
316
317
318def first_invoke(func1, func2):
319 """
320 Return a function that when invoked will invoke func1 without
321 any parameters (for its side effect) and then invoke func2
322 with whatever parameters were passed, returning its result.
323 """
324
325 def wrapper(*args, **kwargs):
326 func1()
327 return func2(*args, **kwargs)
328
329 return wrapper
330
331
332method_caller = first_invoke(
333 lambda: warnings.warn(
334 '`jaraco.functools.method_caller` is deprecated, '
335 'use `operator.methodcaller` instead',
336 DeprecationWarning,
337 stacklevel=3,
338 ),
339 operator.methodcaller,
340)
341
342
343def retry_call(func, cleanup=lambda: None, retries=0, trap=()):
344 """
345 Given a callable func, trap the indicated exceptions
346 for up to 'retries' times, invoking cleanup on the
347 exception. On the final attempt, allow any exceptions
348 to propagate.
349 """
350 attempts = itertools.count() if retries == float('inf') else range(retries)
351 for _ in attempts:
352 try:
353 return func()
354 except trap:
355 cleanup()
356
357 return func()
358
359
360def retry(*r_args, **r_kwargs):
361 """
362 Decorator wrapper for retry_call. Accepts arguments to retry_call
363 except func and then returns a decorator for the decorated function.
364
365 Ex:
366
367 >>> @retry(retries=3)
368 ... def my_func(a, b):
369 ... "this is my funk"
370 ... print(a, b)
371 >>> my_func.__doc__
372 'this is my funk'
373 """
374
375 def decorate(func):
376 @functools.wraps(func)
377 def wrapper(*f_args, **f_kwargs):
378 bound = functools.partial(func, *f_args, **f_kwargs)
379 return retry_call(bound, *r_args, **r_kwargs)
380
381 return wrapper
382
383 return decorate
384
385
386def print_yielded(func):
387 """
388 Convert a generator into a function that prints all yielded elements.
389
390 >>> @print_yielded
391 ... def x():
392 ... yield 3; yield None
393 >>> x()
394 3
395 None
396 """
397 print_all = functools.partial(map, print)
398 print_results = compose(more_itertools.consume, print_all, func)
399 return functools.wraps(func)(print_results)
400
401
402def pass_none(func):
403 """
404 Wrap func so it's not called if its first param is None.
405
406 >>> print_text = pass_none(print)
407 >>> print_text('text')
408 text
409 >>> print_text(None)
410 """
411
412 @functools.wraps(func)
413 def wrapper(param, /, *args, **kwargs):
414 if param is not None:
415 return func(param, *args, **kwargs)
416 return None
417
418 return wrapper
419
420
421def assign_params(func, namespace):
422 """
423 Assign parameters from namespace where func solicits.
424
425 >>> def func(x, y=3):
426 ... print(x, y)
427 >>> assigned = assign_params(func, dict(x=2, z=4))
428 >>> assigned()
429 2 3
430
431 The usual errors are raised if a function doesn't receive
432 its required parameters:
433
434 >>> assigned = assign_params(func, dict(y=3, z=4))
435 >>> assigned()
436 Traceback (most recent call last):
437 TypeError: func() ...argument...
438
439 It even works on methods:
440
441 >>> class Handler:
442 ... def meth(self, arg):
443 ... print(arg)
444 >>> assign_params(Handler().meth, dict(arg='crystal', foo='clear'))()
445 crystal
446 """
447 sig = inspect.signature(func)
448 params = sig.parameters.keys()
449 call_ns = {k: namespace[k] for k in params if k in namespace}
450 return functools.partial(func, **call_ns)
451
452
453def save_method_args(method):
454 """
455 Wrap a method such that when it is called, the args and kwargs are
456 saved on the method.
457
458 >>> class MyClass:
459 ... @save_method_args
460 ... def method(self, a, b):
461 ... print(a, b)
462 >>> my_ob = MyClass()
463 >>> my_ob.method(1, 2)
464 1 2
465 >>> my_ob._saved_method.args
466 (1, 2)
467 >>> my_ob._saved_method.kwargs
468 {}
469 >>> my_ob.method(a=3, b='foo')
470 3 foo
471 >>> my_ob._saved_method.args
472 ()
473 >>> my_ob._saved_method.kwargs == dict(a=3, b='foo')
474 True
475
476 The arguments are stored on the instance, allowing for
477 different instance to save different args.
478
479 >>> your_ob = MyClass()
480 >>> your_ob.method({str('x'): 3}, b=[4])
481 {'x': 3} [4]
482 >>> your_ob._saved_method.args
483 ({'x': 3},)
484 >>> my_ob._saved_method.args
485 ()
486 """
487 args_and_kwargs = collections.namedtuple('args_and_kwargs', 'args kwargs')
488
489 @functools.wraps(method)
490 def wrapper(self, /, *args, **kwargs):
491 attr_name = '_saved_' + method.__name__
492 attr = args_and_kwargs(args, kwargs)
493 setattr(self, attr_name, attr)
494 return method(self, *args, **kwargs)
495
496 return wrapper
497
498
499def except_(*exceptions, replace=None, use=None):
500 """
501 Replace the indicated exceptions, if raised, with the indicated
502 literal replacement or evaluated expression (if present).
503
504 >>> safe_int = except_(ValueError)(int)
505 >>> safe_int('five')
506 >>> safe_int('5')
507 5
508
509 Specify a literal replacement with ``replace``.
510
511 >>> safe_int_r = except_(ValueError, replace=0)(int)
512 >>> safe_int_r('five')
513 0
514
515 Provide an expression to ``use`` to pass through particular parameters.
516
517 >>> safe_int_pt = except_(ValueError, use='args[0]')(int)
518 >>> safe_int_pt('five')
519 'five'
520
521 """
522
523 def decorate(func):
524 @functools.wraps(func)
525 def wrapper(*args, **kwargs):
526 try:
527 return func(*args, **kwargs)
528 except exceptions:
529 try:
530 return eval(use)
531 except TypeError:
532 return replace
533
534 return wrapper
535
536 return decorate
537
538
539def identity(x):
540 """
541 Return the argument.
542
543 >>> o = object()
544 >>> identity(o) is o
545 True
546 """
547 return x
548
549
550def bypass_when(check, *, _op=identity):
551 """
552 Decorate a function to return its parameter when ``check``.
553
554 >>> bypassed = [] # False
555
556 >>> @bypass_when(bypassed)
557 ... def double(x):
558 ... return x * 2
559 >>> double(2)
560 4
561 >>> bypassed[:] = [object()] # True
562 >>> double(2)
563 2
564 """
565
566 def decorate(func):
567 @functools.wraps(func)
568 def wrapper(param, /):
569 return param if _op(check) else func(param)
570
571 return wrapper
572
573 return decorate
574
575
576def bypass_unless(check):
577 """
578 Decorate a function to return its parameter unless ``check``.
579
580 >>> enabled = [object()] # True
581
582 >>> @bypass_unless(enabled)
583 ... def double(x):
584 ... return x * 2
585 >>> double(2)
586 4
587 >>> del enabled[:] # False
588 >>> double(2)
589 2
590 """
591 return bypass_when(check, _op=operator.not_)
592
593
594@functools.singledispatch
595def _splat_inner(args, func):
596 """Splat args to func."""
597 return func(*args)
598
599
600@_splat_inner.register
601def _(args: collections.abc.Mapping, func):
602 """Splat kargs to func as kwargs."""
603 return func(**args)
604
605
606def splat(func):
607 """
608 Wrap func to expect its parameters to be passed positionally in a tuple.
609
610 Has a similar effect to that of ``itertools.starmap`` over
611 simple ``map``.
612
613 >>> pairs = [(-1, 1), (0, 2)]
614 >>> more_itertools.consume(itertools.starmap(print, pairs))
615 -1 1
616 0 2
617 >>> more_itertools.consume(map(splat(print), pairs))
618 -1 1
619 0 2
620
621 The approach generalizes to other iterators that don't have a "star"
622 equivalent, such as a "starfilter".
623
624 >>> list(filter(splat(operator.add), pairs))
625 [(0, 2)]
626
627 Splat also accepts a mapping argument.
628
629 >>> def is_nice(msg, code):
630 ... return "smile" in msg or code == 0
631 >>> msgs = [
632 ... dict(msg='smile!', code=20),
633 ... dict(msg='error :(', code=1),
634 ... dict(msg='unknown', code=0),
635 ... ]
636 >>> for msg in filter(splat(is_nice), msgs):
637 ... print(msg)
638 {'msg': 'smile!', 'code': 20}
639 {'msg': 'unknown', 'code': 0}
640 """
641 return functools.wraps(func)(functools.partial(_splat_inner, func=func))