Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/mock/mock.py: 29%
1697 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:37 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:37 +0000
1# mock.py
2# Test tools for mocking and patching.
3# Maintained by Michael Foord
4# Backport for other versions of Python available from
5# https://pypi.org/project/mock
7__all__ = (
8 'Mock',
9 'MagicMock',
10 'patch',
11 'sentinel',
12 'DEFAULT',
13 'ANY',
14 'call',
15 'create_autospec',
16 'AsyncMock',
17 'ThreadingMock',
18 'FILTER_DIR',
19 'NonCallableMock',
20 'NonCallableMagicMock',
21 'mock_open',
22 'PropertyMock',
23 'seal',
24)
27import asyncio
28import contextlib
29import io
30import inspect
31import pprint
32import sys
33import threading
34import builtins
35from types import CodeType, ModuleType, MethodType
36from unittest.util import safe_repr
37from functools import wraps, partial
38from threading import RLock
41from mock import IS_PYPY
42from .backports import iscoroutinefunction
45class InvalidSpecError(Exception):
46 """Indicates that an invalid value was used as a mock spec."""
49_builtins = {name for name in dir(builtins) if not name.startswith('_')}
51FILTER_DIR = True
53# Workaround for issue #12370
54# Without this, the __class__ properties wouldn't be set correctly
55_safe_super = super
57def _is_async_obj(obj):
58 if _is_instance_mock(obj) and not isinstance(obj, AsyncMock):
59 return False
60 if hasattr(obj, '__func__'):
61 obj = getattr(obj, '__func__')
62 return iscoroutinefunction(obj) or inspect.isawaitable(obj)
65def _is_async_func(func):
66 if getattr(func, '__code__', None):
67 return iscoroutinefunction(func)
68 else:
69 return False
72def _is_instance_mock(obj):
73 # can't use isinstance on Mock objects because they override __class__
74 # The base class for all mocks is NonCallableMock
75 return issubclass(type(obj), NonCallableMock)
78def _is_exception(obj):
79 return (
80 isinstance(obj, BaseException) or
81 isinstance(obj, type) and issubclass(obj, BaseException)
82 )
85def _extract_mock(obj):
86 # Autospecced functions will return a FunctionType with "mock" attribute
87 # which is the actual mock object that needs to be used.
88 if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'):
89 return obj.mock
90 else:
91 return obj
94def _get_signature_object(func, as_instance, eat_self):
95 """
96 Given an arbitrary, possibly callable object, try to create a suitable
97 signature object.
98 Return a (reduced func, signature) tuple, or None.
99 """
100 if isinstance(func, type) and not as_instance:
101 # If it's a type and should be modelled as a type, use __init__.
102 func = func.__init__
103 # Skip the `self` argument in __init__
104 eat_self = True
105 elif isinstance(func, (classmethod, staticmethod)):
106 if isinstance(func, classmethod):
107 # Skip the `cls` argument of a class method
108 eat_self = True
109 # Use the original decorated method to extract the correct function signature
110 func = func.__func__
111 elif not isinstance(func, FunctionTypes):
112 # If we really want to model an instance of the passed type,
113 # __call__ should be looked up, not __init__.
114 try:
115 func = func.__call__
116 except AttributeError:
117 return None
118 if eat_self:
119 sig_func = partial(func, None)
120 else:
121 sig_func = func
122 try:
123 return func, inspect.signature(sig_func)
124 except ValueError:
125 # Certain callable types are not supported by inspect.signature()
126 return None
129def _check_signature(func, mock, skipfirst, instance=False):
130 sig = _get_signature_object(func, instance, skipfirst)
131 if sig is None:
132 return
133 func, sig = sig
134 def checksig(_mock_self, *args, **kwargs):
135 sig.bind(*args, **kwargs)
136 _copy_func_details(func, checksig)
137 type(mock)._mock_check_sig = checksig
138 type(mock).__signature__ = sig
141def _copy_func_details(func, funcopy):
142 # we explicitly don't copy func.__dict__ into this copy as it would
143 # expose original attributes that should be mocked
144 for attribute in (
145 '__name__', '__doc__', '__text_signature__',
146 '__module__', '__defaults__', '__kwdefaults__',
147 ):
148 try:
149 setattr(funcopy, attribute, getattr(func, attribute))
150 except AttributeError:
151 pass
154def _callable(obj):
155 if isinstance(obj, type):
156 return True
157 if isinstance(obj, (staticmethod, classmethod, MethodType)):
158 return _callable(obj.__func__)
159 if getattr(obj, '__call__', None) is not None:
160 return True
161 return False
164def _is_list(obj):
165 # checks for list or tuples
166 # XXXX badly named!
167 return type(obj) in (list, tuple)
170def _instance_callable(obj):
171 """Given an object, return True if the object is callable.
172 For classes, return True if instances would be callable."""
173 if not isinstance(obj, type):
174 # already an instance
175 return getattr(obj, '__call__', None) is not None
177 # *could* be broken by a class overriding __mro__ or __dict__ via
178 # a metaclass
179 for base in (obj,) + obj.__mro__:
180 if base.__dict__.get('__call__') is not None:
181 return True
182 return False
185def _set_signature(mock, original, instance=False):
186 # creates a function with signature (*args, **kwargs) that delegates to a
187 # mock. It still does signature checking by calling a lambda with the same
188 # signature as the original.
190 skipfirst = isinstance(original, type)
191 result = _get_signature_object(original, instance, skipfirst)
192 if result is None:
193 return mock
194 func, sig = result
195 def checksig(*args, **kwargs):
196 sig.bind(*args, **kwargs)
197 _copy_func_details(func, checksig)
199 name = original.__name__
200 if not name.isidentifier():
201 name = 'funcopy'
202 context = {'_checksig_': checksig, 'mock': mock}
203 src = """def %s(*args, **kwargs):
204 _checksig_(*args, **kwargs)
205 return mock(*args, **kwargs)""" % name
206 exec (src, context)
207 funcopy = context[name]
208 _setup_func(funcopy, mock, sig)
209 return funcopy
211def _set_async_signature(mock, original, instance=False, is_async_mock=False):
212 # creates an async function with signature (*args, **kwargs) that delegates to a
213 # mock. It still does signature checking by calling a lambda with the same
214 # signature as the original.
216 skipfirst = isinstance(original, type)
217 func, sig = _get_signature_object(original, instance, skipfirst)
218 def checksig(*args, **kwargs):
219 sig.bind(*args, **kwargs)
220 _copy_func_details(func, checksig)
222 name = original.__name__
223 context = {'_checksig_': checksig, 'mock': mock}
224 src = """async def %s(*args, **kwargs):
225 _checksig_(*args, **kwargs)
226 return await mock(*args, **kwargs)""" % name
227 exec (src, context)
228 funcopy = context[name]
229 _setup_func(funcopy, mock, sig)
230 _setup_async_mock(funcopy)
231 return funcopy
234def _setup_func(funcopy, mock, sig):
235 funcopy.mock = mock
237 def assert_called_with(*args, **kwargs):
238 return mock.assert_called_with(*args, **kwargs)
239 def assert_called(*args, **kwargs):
240 return mock.assert_called(*args, **kwargs)
241 def assert_not_called(*args, **kwargs):
242 return mock.assert_not_called(*args, **kwargs)
243 def assert_called_once(*args, **kwargs):
244 return mock.assert_called_once(*args, **kwargs)
245 def assert_called_once_with(*args, **kwargs):
246 return mock.assert_called_once_with(*args, **kwargs)
247 def assert_has_calls(*args, **kwargs):
248 return mock.assert_has_calls(*args, **kwargs)
249 def assert_any_call(*args, **kwargs):
250 return mock.assert_any_call(*args, **kwargs)
251 def reset_mock():
252 funcopy.method_calls = _CallList()
253 funcopy.mock_calls = _CallList()
254 mock.reset_mock()
255 ret = funcopy.return_value
256 if _is_instance_mock(ret) and not ret is mock:
257 ret.reset_mock()
259 funcopy.called = False
260 funcopy.call_count = 0
261 funcopy.call_args = None
262 funcopy.call_args_list = _CallList()
263 funcopy.method_calls = _CallList()
264 funcopy.mock_calls = _CallList()
266 funcopy.return_value = mock.return_value
267 funcopy.side_effect = mock.side_effect
268 funcopy._mock_children = mock._mock_children
270 funcopy.assert_called_with = assert_called_with
271 funcopy.assert_called_once_with = assert_called_once_with
272 funcopy.assert_has_calls = assert_has_calls
273 funcopy.assert_any_call = assert_any_call
274 funcopy.reset_mock = reset_mock
275 funcopy.assert_called = assert_called
276 funcopy.assert_not_called = assert_not_called
277 funcopy.assert_called_once = assert_called_once
278 funcopy.__signature__ = sig
280 mock._mock_delegate = funcopy
283def _setup_async_mock(mock):
284 mock._is_coroutine = asyncio.coroutines._is_coroutine
285 mock.await_count = 0
286 mock.await_args = None
287 mock.await_args_list = _CallList()
289 # Mock is not configured yet so the attributes are set
290 # to a function and then the corresponding mock helper function
291 # is called when the helper is accessed similar to _setup_func.
292 def wrapper(attr, *args, **kwargs):
293 return getattr(mock.mock, attr)(*args, **kwargs)
295 for attribute in ('assert_awaited',
296 'assert_awaited_once',
297 'assert_awaited_with',
298 'assert_awaited_once_with',
299 'assert_any_await',
300 'assert_has_awaits',
301 'assert_not_awaited'):
303 # setattr(mock, attribute, wrapper) causes late binding
304 # hence attribute will always be the last value in the loop
305 # Use partial(wrapper, attribute) to ensure the attribute is bound
306 # correctly.
307 setattr(mock, attribute, partial(wrapper, attribute))
310def _is_magic(name):
311 return '__%s__' % name[2:-2] == name
314class _SentinelObject(object):
315 "A unique, named, sentinel object."
316 def __init__(self, name):
317 self.name = name
319 def __repr__(self):
320 return 'sentinel.%s' % self.name
322 def __reduce__(self):
323 return 'sentinel.%s' % self.name
326class _Sentinel(object):
327 """Access attributes to return a named object, usable as a sentinel."""
328 def __init__(self):
329 self._sentinels = {}
331 def __getattr__(self, name):
332 if name == '__bases__':
333 # Without this help(unittest.mock) raises an exception
334 raise AttributeError
335 return self._sentinels.setdefault(name, _SentinelObject(name))
337 def __reduce__(self):
338 return 'sentinel'
341sentinel = _Sentinel()
343DEFAULT = sentinel.DEFAULT
344_missing = sentinel.MISSING
345_deleted = sentinel.DELETED
348_allowed_names = {
349 'return_value', '_mock_return_value', 'side_effect',
350 '_mock_side_effect', '_mock_parent', '_mock_new_parent',
351 '_mock_name', '_mock_new_name'
352}
355def _delegating_property(name):
356 _allowed_names.add(name)
357 _the_name = '_mock_' + name
358 def _get(self, name=name, _the_name=_the_name):
359 sig = self._mock_delegate
360 if sig is None:
361 return getattr(self, _the_name)
362 return getattr(sig, name)
363 def _set(self, value, name=name, _the_name=_the_name):
364 sig = self._mock_delegate
365 if sig is None:
366 self.__dict__[_the_name] = value
367 else:
368 setattr(sig, name, value)
370 return property(_get, _set)
374class _CallList(list):
376 def __contains__(self, value):
377 if not isinstance(value, list):
378 return list.__contains__(self, value)
379 len_value = len(value)
380 len_self = len(self)
381 if len_value > len_self:
382 return False
384 for i in range(0, len_self - len_value + 1):
385 sub_list = self[i:i+len_value]
386 if sub_list == value:
387 return True
388 return False
390 def __repr__(self):
391 return pprint.pformat(list(self))
394def _check_and_set_parent(parent, value, name, new_name):
395 value = _extract_mock(value)
397 if not _is_instance_mock(value):
398 return False
399 if ((value._mock_name or value._mock_new_name) or
400 (value._mock_parent is not None) or
401 (value._mock_new_parent is not None)):
402 return False
404 _parent = parent
405 while _parent is not None:
406 # setting a mock (value) as a child or return value of itself
407 # should not modify the mock
408 if _parent is value:
409 return False
410 _parent = _parent._mock_new_parent
412 if new_name:
413 value._mock_new_parent = parent
414 value._mock_new_name = new_name
415 if name:
416 value._mock_parent = parent
417 value._mock_name = name
418 return True
420# Internal class to identify if we wrapped an iterator object or not.
421class _MockIter(object):
422 def __init__(self, obj):
423 self.obj = iter(obj)
424 def __next__(self):
425 return next(self.obj)
427class Base(object):
428 _mock_return_value = DEFAULT
429 _mock_side_effect = None
430 def __init__(self, *args, **kwargs):
431 pass
435class NonCallableMock(Base):
436 """A non-callable version of `Mock`"""
438 # Store a mutex as a class attribute in order to protect concurrent access
439 # to mock attributes. Using a class attribute allows all NonCallableMock
440 # instances to share the mutex for simplicity.
441 #
442 # See https://github.com/python/cpython/issues/98624 for why this is
443 # necessary.
444 _lock = RLock()
446 def __new__(
447 cls, spec=None, wraps=None, name=None, spec_set=None,
448 parent=None, _spec_state=None, _new_name='', _new_parent=None,
449 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs
450 ):
451 # every instance has its own class
452 # so we can create magic methods on the
453 # class without stomping on other mocks
454 bases = (cls,)
455 if not issubclass(cls, AsyncMockMixin):
456 # Check if spec is an async object or function
457 spec_arg = spec_set or spec
458 if spec_arg is not None and _is_async_obj(spec_arg):
459 bases = (AsyncMockMixin, cls)
460 new = type(cls.__name__, bases, {'__doc__': cls.__doc__})
461 instance = _safe_super(NonCallableMock, cls).__new__(new)
462 return instance
465 def __init__(
466 self, spec=None, wraps=None, name=None, spec_set=None,
467 parent=None, _spec_state=None, _new_name='', _new_parent=None,
468 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs
469 ):
470 if _new_parent is None:
471 _new_parent = parent
473 __dict__ = self.__dict__
474 __dict__['_mock_parent'] = parent
475 __dict__['_mock_name'] = name
476 __dict__['_mock_new_name'] = _new_name
477 __dict__['_mock_new_parent'] = _new_parent
478 __dict__['_mock_sealed'] = False
480 if spec_set is not None:
481 spec = spec_set
482 spec_set = True
483 if _eat_self is None:
484 _eat_self = parent is not None
486 self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self)
488 __dict__['_mock_children'] = {}
489 __dict__['_mock_wraps'] = wraps
490 __dict__['_mock_delegate'] = None
492 __dict__['_mock_called'] = False
493 __dict__['_mock_call_args'] = None
494 __dict__['_mock_call_count'] = 0
495 __dict__['_mock_call_args_list'] = _CallList()
496 __dict__['_mock_mock_calls'] = _CallList()
498 __dict__['method_calls'] = _CallList()
499 __dict__['_mock_unsafe'] = unsafe
501 if kwargs:
502 self.configure_mock(**kwargs)
504 _safe_super(NonCallableMock, self).__init__(
505 spec, wraps, name, spec_set, parent,
506 _spec_state
507 )
510 def attach_mock(self, mock, attribute):
511 """
512 Attach a mock as an attribute of this one, replacing its name and
513 parent. Calls to the attached mock will be recorded in the
514 `method_calls` and `mock_calls` attributes of this one."""
515 inner_mock = _extract_mock(mock)
517 inner_mock._mock_parent = None
518 inner_mock._mock_new_parent = None
519 inner_mock._mock_name = ''
520 inner_mock._mock_new_name = None
522 setattr(self, attribute, mock)
525 def mock_add_spec(self, spec, spec_set=False):
526 """Add a spec to a mock. `spec` can either be an object or a
527 list of strings. Only attributes on the `spec` can be fetched as
528 attributes from the mock.
530 If `spec_set` is True then only attributes on the spec can be set."""
531 self._mock_add_spec(spec, spec_set)
534 def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False,
535 _eat_self=False):
536 if _is_instance_mock(spec):
537 raise InvalidSpecError(f'Cannot spec a Mock object. [object={spec!r}]')
539 _spec_class = None
540 _spec_signature = None
541 _spec_asyncs = []
543 if spec is not None and not _is_list(spec):
544 if isinstance(spec, type):
545 _spec_class = spec
546 else:
547 _spec_class = type(spec)
548 res = _get_signature_object(spec,
549 _spec_as_instance, _eat_self)
550 _spec_signature = res and res[1]
552 spec_list = dir(spec)
554 for attr in spec_list:
555 static_attr = inspect.getattr_static(spec, attr, None)
556 unwrapped_attr = static_attr
557 try:
558 unwrapped_attr = inspect.unwrap(unwrapped_attr)
559 except ValueError:
560 pass
561 if iscoroutinefunction(unwrapped_attr):
562 _spec_asyncs.append(attr)
564 spec = spec_list
566 __dict__ = self.__dict__
567 __dict__['_spec_class'] = _spec_class
568 __dict__['_spec_set'] = spec_set
569 __dict__['_spec_signature'] = _spec_signature
570 __dict__['_mock_methods'] = spec
571 __dict__['_spec_asyncs'] = _spec_asyncs
573 def __get_return_value(self):
574 ret = self._mock_return_value
575 if self._mock_delegate is not None:
576 ret = self._mock_delegate.return_value
578 if ret is DEFAULT:
579 ret = self._get_child_mock(
580 _new_parent=self, _new_name='()'
581 )
582 self.return_value = ret
583 return ret
586 def __set_return_value(self, value):
587 if self._mock_delegate is not None:
588 self._mock_delegate.return_value = value
589 else:
590 self._mock_return_value = value
591 _check_and_set_parent(self, value, None, '()')
593 __return_value_doc = "The value to be returned when the mock is called."
594 return_value = property(__get_return_value, __set_return_value,
595 __return_value_doc)
598 @property
599 def __class__(self):
600 if self._spec_class is None:
601 return type(self)
602 return self._spec_class
604 called = _delegating_property('called')
605 call_count = _delegating_property('call_count')
606 call_args = _delegating_property('call_args')
607 call_args_list = _delegating_property('call_args_list')
608 mock_calls = _delegating_property('mock_calls')
611 def __get_side_effect(self):
612 delegated = self._mock_delegate
613 if delegated is None:
614 return self._mock_side_effect
615 sf = delegated.side_effect
616 if (sf is not None and not callable(sf)
617 and not isinstance(sf, _MockIter) and not _is_exception(sf)):
618 sf = _MockIter(sf)
619 delegated.side_effect = sf
620 return sf
622 def __set_side_effect(self, value):
623 value = _try_iter(value)
624 delegated = self._mock_delegate
625 if delegated is None:
626 self._mock_side_effect = value
627 else:
628 delegated.side_effect = value
630 side_effect = property(__get_side_effect, __set_side_effect)
633 def reset_mock(self, visited=None,*, return_value=False, side_effect=False):
634 "Restore the mock object to its initial state."
635 if visited is None:
636 visited = []
637 if id(self) in visited:
638 return
639 visited.append(id(self))
641 self.called = False
642 self.call_args = None
643 self.call_count = 0
644 self.mock_calls = _CallList()
645 self.call_args_list = _CallList()
646 self.method_calls = _CallList()
648 if return_value:
649 self._mock_return_value = DEFAULT
650 if side_effect:
651 self._mock_side_effect = None
653 for child in self._mock_children.values():
654 if isinstance(child, _SpecState) or child is _deleted:
655 continue
656 child.reset_mock(visited, return_value=return_value, side_effect=side_effect)
658 ret = self._mock_return_value
659 if _is_instance_mock(ret) and ret is not self:
660 ret.reset_mock(visited)
663 def configure_mock(self, **kwargs):
664 """Set attributes on the mock through keyword arguments.
666 Attributes plus return values and side effects can be set on child
667 mocks using standard dot notation and unpacking a dictionary in the
668 method call:
670 >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError}
671 >>> mock.configure_mock(**attrs)"""
672 for arg, val in sorted(kwargs.items(),
673 # we sort on the number of dots so that
674 # attributes are set before we set attributes on
675 # attributes
676 key=lambda entry: entry[0].count('.')):
677 args = arg.split('.')
678 final = args.pop()
679 obj = self
680 for entry in args:
681 obj = getattr(obj, entry)
682 setattr(obj, final, val)
685 def __getattr__(self, name):
686 if name in {'_mock_methods', '_mock_unsafe'}:
687 raise AttributeError(name)
688 elif self._mock_methods is not None:
689 if name not in self._mock_methods or name in _all_magics:
690 raise AttributeError("Mock object has no attribute %r" % name)
691 elif _is_magic(name):
692 raise AttributeError(name)
693 if not self._mock_unsafe and (not self._mock_methods or name not in self._mock_methods):
694 if name.startswith(('assert', 'assret', 'asert', 'aseert', 'assrt')) or name in _ATTRIB_DENY_LIST:
695 raise AttributeError(
696 f"{name!r} is not a valid assertion. Use a spec "
697 f"for the mock if {name!r} is meant to be an attribute.")
699 with NonCallableMock._lock:
700 result = self._mock_children.get(name)
701 if result is _deleted:
702 raise AttributeError(name)
703 elif result is None:
704 wraps = None
705 if self._mock_wraps is not None:
706 # XXXX should we get the attribute without triggering code
707 # execution?
708 wraps = getattr(self._mock_wraps, name)
710 result = self._get_child_mock(
711 parent=self, name=name, wraps=wraps, _new_name=name,
712 _new_parent=self
713 )
714 self._mock_children[name] = result
716 elif isinstance(result, _SpecState):
717 try:
718 result = create_autospec(
719 result.spec, result.spec_set, result.instance,
720 result.parent, result.name
721 )
722 except InvalidSpecError:
723 target_name = self.__dict__['_mock_name'] or self
724 raise InvalidSpecError(
725 f'Cannot autospec attr {name!r} from target '
726 f'{target_name!r} as it has already been mocked out. '
727 f'[target={self!r}, attr={result.spec!r}]')
728 self._mock_children[name] = result
730 return result
733 def _extract_mock_name(self):
734 _name_list = [self._mock_new_name]
735 _parent = self._mock_new_parent
736 last = self
738 dot = '.'
739 if _name_list == ['()']:
740 dot = ''
742 while _parent is not None:
743 last = _parent
745 _name_list.append(_parent._mock_new_name + dot)
746 dot = '.'
747 if _parent._mock_new_name == '()':
748 dot = ''
750 _parent = _parent._mock_new_parent
752 _name_list = list(reversed(_name_list))
753 _first = last._mock_name or 'mock'
754 if len(_name_list) > 1:
755 if _name_list[1] not in ('()', '().'):
756 _first += '.'
757 _name_list[0] = _first
758 return ''.join(_name_list)
760 def __repr__(self):
761 name = self._extract_mock_name()
763 name_string = ''
764 if name not in ('mock', 'mock.'):
765 name_string = ' name=%r' % name
767 spec_string = ''
768 if self._spec_class is not None:
769 spec_string = ' spec=%r'
770 if self._spec_set:
771 spec_string = ' spec_set=%r'
772 spec_string = spec_string % self._spec_class.__name__
773 return "<%s%s%s id='%s'>" % (
774 type(self).__name__,
775 name_string,
776 spec_string,
777 id(self)
778 )
781 def __dir__(self):
782 """Filter the output of `dir(mock)` to only useful members."""
783 if not FILTER_DIR:
784 return object.__dir__(self)
786 extras = self._mock_methods or []
787 from_type = dir(type(self))
788 from_dict = list(self.__dict__)
789 from_child_mocks = [
790 m_name for m_name, m_value in self._mock_children.items()
791 if m_value is not _deleted]
793 from_type = [e for e in from_type if not e.startswith('_')]
794 from_dict = [e for e in from_dict if not e.startswith('_') or
795 _is_magic(e)]
796 return sorted(set(extras + from_type + from_dict + from_child_mocks))
799 def __setattr__(self, name, value):
800 if name in _allowed_names:
801 # property setters go through here
802 return object.__setattr__(self, name, value)
803 elif (self._spec_set and self._mock_methods is not None and
804 name not in self._mock_methods and
805 name not in self.__dict__):
806 raise AttributeError("Mock object has no attribute '%s'" % name)
807 elif name in _unsupported_magics:
808 msg = 'Attempting to set unsupported magic method %r.' % name
809 raise AttributeError(msg)
810 elif name in _all_magics:
811 if self._mock_methods is not None and name not in self._mock_methods:
812 raise AttributeError("Mock object has no attribute '%s'" % name)
814 if not _is_instance_mock(value):
815 setattr(type(self), name, _get_method(name, value))
816 original = value
817 value = lambda *args, **kw: original(self, *args, **kw)
818 else:
819 # only set _new_name and not name so that mock_calls is tracked
820 # but not method calls
821 _check_and_set_parent(self, value, None, name)
822 setattr(type(self), name, value)
823 self._mock_children[name] = value
824 elif name == '__class__':
825 self._spec_class = value
826 return
827 else:
828 if _check_and_set_parent(self, value, name, name):
829 self._mock_children[name] = value
831 if self._mock_sealed and not hasattr(self, name):
832 mock_name = f'{self._extract_mock_name()}.{name}'
833 raise AttributeError(f'Cannot set {mock_name}')
835 return object.__setattr__(self, name, value)
838 def __delattr__(self, name):
839 if name in _all_magics and name in type(self).__dict__:
840 delattr(type(self), name)
841 if name not in self.__dict__:
842 # for magic methods that are still MagicProxy objects and
843 # not set on the instance itself
844 return
846 obj = self._mock_children.get(name, _missing)
847 if name in self.__dict__:
848 _safe_super(NonCallableMock, self).__delattr__(name)
849 elif obj is _deleted:
850 raise AttributeError(name)
851 if obj is not _missing:
852 del self._mock_children[name]
853 self._mock_children[name] = _deleted
856 def _format_mock_call_signature(self, args, kwargs):
857 name = self._mock_name or 'mock'
858 return _format_call_signature(name, args, kwargs)
861 def _format_mock_failure_message(self, args, kwargs, action='call'):
862 message = 'expected %s not found.\nExpected: %s\nActual: %s'
863 expected_string = self._format_mock_call_signature(args, kwargs)
864 call_args = self.call_args
865 actual_string = self._format_mock_call_signature(*call_args)
866 return message % (action, expected_string, actual_string)
869 def _get_call_signature_from_name(self, name):
870 """
871 * If call objects are asserted against a method/function like obj.meth1
872 then there could be no name for the call object to lookup. Hence just
873 return the spec_signature of the method/function being asserted against.
874 * If the name is not empty then remove () and split by '.' to get
875 list of names to iterate through the children until a potential
876 match is found. A child mock is created only during attribute access
877 so if we get a _SpecState then no attributes of the spec were accessed
878 and can be safely exited.
879 """
880 if not name:
881 return self._spec_signature
883 sig = None
884 names = name.replace('()', '').split('.')
885 children = self._mock_children
887 for name in names:
888 child = children.get(name)
889 if child is None or isinstance(child, _SpecState):
890 break
891 else:
892 # If an autospecced object is attached using attach_mock the
893 # child would be a function with mock object as attribute from
894 # which signature has to be derived.
895 child = _extract_mock(child)
896 children = child._mock_children
897 sig = child._spec_signature
899 return sig
902 def _call_matcher(self, _call):
903 """
904 Given a call (or simply an (args, kwargs) tuple), return a
905 comparison key suitable for matching with other calls.
906 This is a best effort method which relies on the spec's signature,
907 if available, or falls back on the arguments themselves.
908 """
910 if isinstance(_call, tuple) and len(_call) > 2:
911 sig = self._get_call_signature_from_name(_call[0])
912 else:
913 sig = self._spec_signature
915 if sig is not None:
916 if len(_call) == 2:
917 name = ''
918 args, kwargs = _call
919 else:
920 name, args, kwargs = _call
921 try:
922 bound_call = sig.bind(*args, **kwargs)
923 return call(name, bound_call.args, bound_call.kwargs)
924 except TypeError as e:
925 return e.with_traceback(None)
926 else:
927 return _call
929 def assert_not_called(_mock_self):
930 """assert that the mock was never called.
931 """
932 self = _mock_self
933 if self.call_count != 0:
934 msg = ("Expected '%s' to not have been called. Called %s times.%s"
935 % (self._mock_name or 'mock',
936 self.call_count,
937 self._calls_repr()))
938 raise AssertionError(msg)
940 def assert_called(_mock_self):
941 """assert that the mock was called at least once
942 """
943 self = _mock_self
944 if self.call_count == 0:
945 msg = ("Expected '%s' to have been called." %
946 (self._mock_name or 'mock'))
947 raise AssertionError(msg)
949 def assert_called_once(_mock_self):
950 """assert that the mock was called only once.
951 """
952 self = _mock_self
953 if not self.call_count == 1:
954 msg = ("Expected '%s' to have been called once. Called %s times.%s"
955 % (self._mock_name or 'mock',
956 self.call_count,
957 self._calls_repr()))
958 raise AssertionError(msg)
960 def assert_called_with(_mock_self, *args, **kwargs):
961 """assert that the last call was made with the specified arguments.
963 Raises an AssertionError if the args and keyword args passed in are
964 different to the last call to the mock."""
965 self = _mock_self
966 if self.call_args is None:
967 expected = self._format_mock_call_signature(args, kwargs)
968 actual = 'not called.'
969 error_message = ('expected call not found.\nExpected: %s\nActual: %s'
970 % (expected, actual))
971 raise AssertionError(error_message)
973 def _error_message():
974 msg = self._format_mock_failure_message(args, kwargs)
975 return msg
976 expected = self._call_matcher(_Call((args, kwargs), two=True))
977 actual = self._call_matcher(self.call_args)
978 if actual != expected:
979 cause = expected if isinstance(expected, Exception) else None
980 raise AssertionError(_error_message()) from cause
983 def assert_called_once_with(_mock_self, *args, **kwargs):
984 """assert that the mock was called exactly once and that that call was
985 with the specified arguments."""
986 self = _mock_self
987 if not self.call_count == 1:
988 msg = ("Expected '%s' to be called once. Called %s times.%s"
989 % (self._mock_name or 'mock',
990 self.call_count,
991 self._calls_repr()))
992 raise AssertionError(msg)
993 return self.assert_called_with(*args, **kwargs)
996 def assert_has_calls(self, calls, any_order=False):
997 """assert the mock has been called with the specified calls.
998 The `mock_calls` list is checked for the calls.
1000 If `any_order` is False (the default) then the calls must be
1001 sequential. There can be extra calls before or after the
1002 specified calls.
1004 If `any_order` is True then the calls can be in any order, but
1005 they must all appear in `mock_calls`."""
1006 expected = [self._call_matcher(c) for c in calls]
1007 cause = next((e for e in expected if isinstance(e, Exception)), None)
1008 all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls)
1009 if not any_order:
1010 if expected not in all_calls:
1011 if cause is None:
1012 problem = 'Calls not found.'
1013 else:
1014 problem = ('Error processing expected calls.\n'
1015 'Errors: {}').format(
1016 [e if isinstance(e, Exception) else None
1017 for e in expected])
1018 raise AssertionError(
1019 f'{problem}\n'
1020 f'Expected: {_CallList(calls)}'
1021 f'{self._calls_repr(prefix="Actual").rstrip(".")}'
1022 ) from cause
1023 return
1025 all_calls = list(all_calls)
1027 not_found = []
1028 for kall in expected:
1029 try:
1030 all_calls.remove(kall)
1031 except ValueError:
1032 not_found.append(kall)
1033 if not_found:
1034 raise AssertionError(
1035 '%r does not contain all of %r in its call list, '
1036 'found %r instead' % (self._mock_name or 'mock',
1037 tuple(not_found), all_calls)
1038 ) from cause
1041 def assert_any_call(self, *args, **kwargs):
1042 """assert the mock has been called with the specified arguments.
1044 The assert passes if the mock has *ever* been called, unlike
1045 `assert_called_with` and `assert_called_once_with` that only pass if
1046 the call is the most recent one."""
1047 expected = self._call_matcher(_Call((args, kwargs), two=True))
1048 cause = expected if isinstance(expected, Exception) else None
1049 actual = [self._call_matcher(c) for c in self.call_args_list]
1050 if cause or expected not in _AnyComparer(actual):
1051 expected_string = self._format_mock_call_signature(args, kwargs)
1052 raise AssertionError(
1053 '%s call not found' % expected_string
1054 ) from cause
1057 def _get_child_mock(self, **kw):
1058 """Create the child mocks for attributes and return value.
1059 By default child mocks will be the same type as the parent.
1060 Subclasses of Mock may want to override this to customize the way
1061 child mocks are made.
1063 For non-callable mocks the callable variant will be used (rather than
1064 any custom subclass)."""
1065 if self._mock_sealed:
1066 attribute = f".{kw['name']}" if "name" in kw else "()"
1067 mock_name = self._extract_mock_name() + attribute
1068 raise AttributeError(mock_name)
1070 _new_name = kw.get("_new_name")
1071 if _new_name in self.__dict__['_spec_asyncs']:
1072 return AsyncMock(**kw)
1074 _type = type(self)
1075 if issubclass(_type, MagicMock) and _new_name in _async_method_magics:
1076 # Any asynchronous magic becomes an AsyncMock
1077 klass = AsyncMock
1078 elif issubclass(_type, AsyncMockMixin):
1079 if (_new_name in _all_sync_magics or
1080 self._mock_methods and _new_name in self._mock_methods):
1081 # Any synchronous method on AsyncMock becomes a MagicMock
1082 klass = MagicMock
1083 else:
1084 klass = AsyncMock
1085 elif not issubclass(_type, CallableMixin):
1086 if issubclass(_type, NonCallableMagicMock):
1087 klass = MagicMock
1088 elif issubclass(_type, NonCallableMock):
1089 klass = Mock
1090 else:
1091 klass = _type.__mro__[1]
1092 return klass(**kw)
1095 def _calls_repr(self, prefix="Calls"):
1096 """Renders self.mock_calls as a string.
1098 Example: "\nCalls: [call(1), call(2)]."
1100 If self.mock_calls is empty, an empty string is returned. The
1101 output will be truncated if very long.
1102 """
1103 if not self.mock_calls:
1104 return ""
1105 return f"\n{prefix}: {safe_repr(self.mock_calls)}."
1108try:
1109 removeprefix = str.removeprefix
1110except AttributeError:
1111 # Py 3.8 and earlier:
1112 def removeprefix(name, prefix):
1113 return name[len(prefix):]
1115# Denylist for forbidden attribute names in safe mode
1116_ATTRIB_DENY_LIST = frozenset({
1117 removeprefix(name, "assert_")
1118 for name in dir(NonCallableMock)
1119 if name.startswith("assert_")
1120})
1123class _AnyComparer(list):
1124 """A list which checks if it contains a call which may have an
1125 argument of ANY, flipping the components of item and self from
1126 their traditional locations so that ANY is guaranteed to be on
1127 the left."""
1128 def __contains__(self, item):
1129 for _call in self:
1130 assert len(item) == len(_call)
1131 if all([
1132 expected == actual
1133 for expected, actual in zip(item, _call)
1134 ]):
1135 return True
1136 return False
1139def _try_iter(obj):
1140 if obj is None:
1141 return obj
1142 if _is_exception(obj):
1143 return obj
1144 if _callable(obj):
1145 return obj
1146 try:
1147 return iter(obj)
1148 except TypeError:
1149 # XXXX backwards compatibility
1150 # but this will blow up on first call - so maybe we should fail early?
1151 return obj
1154class CallableMixin(Base):
1156 def __init__(self, spec=None, side_effect=None, return_value=DEFAULT,
1157 wraps=None, name=None, spec_set=None, parent=None,
1158 _spec_state=None, _new_name='', _new_parent=None, **kwargs):
1159 self.__dict__['_mock_return_value'] = return_value
1160 _safe_super(CallableMixin, self).__init__(
1161 spec, wraps, name, spec_set, parent,
1162 _spec_state, _new_name, _new_parent, **kwargs
1163 )
1165 self.side_effect = side_effect
1168 def _mock_check_sig(self, *args, **kwargs):
1169 # stub method that can be replaced with one with a specific signature
1170 pass
1173 def __call__(_mock_self, *args, **kwargs):
1174 # can't use self in-case a function / method we are mocking uses self
1175 # in the signature
1176 _mock_self._mock_check_sig(*args, **kwargs)
1177 _mock_self._increment_mock_call(*args, **kwargs)
1178 return _mock_self._mock_call(*args, **kwargs)
1181 def _mock_call(_mock_self, *args, **kwargs):
1182 return _mock_self._execute_mock_call(*args, **kwargs)
1184 def _increment_mock_call(_mock_self, *args, **kwargs):
1185 self = _mock_self
1186 self.called = True
1187 self.call_count += 1
1189 # handle call_args
1190 # needs to be set here so assertions on call arguments pass before
1191 # execution in the case of awaited calls
1192 _call = _Call((args, kwargs), two=True)
1193 self.call_args = _call
1194 self.call_args_list.append(_call)
1196 # initial stuff for method_calls:
1197 do_method_calls = self._mock_parent is not None
1198 method_call_name = self._mock_name
1200 # initial stuff for mock_calls:
1201 mock_call_name = self._mock_new_name
1202 is_a_call = mock_call_name == '()'
1203 self.mock_calls.append(_Call(('', args, kwargs)))
1205 # follow up the chain of mocks:
1206 _new_parent = self._mock_new_parent
1207 while _new_parent is not None:
1209 # handle method_calls:
1210 if do_method_calls:
1211 _new_parent.method_calls.append(_Call((method_call_name, args, kwargs)))
1212 do_method_calls = _new_parent._mock_parent is not None
1213 if do_method_calls:
1214 method_call_name = _new_parent._mock_name + '.' + method_call_name
1216 # handle mock_calls:
1217 this_mock_call = _Call((mock_call_name, args, kwargs))
1218 _new_parent.mock_calls.append(this_mock_call)
1220 if _new_parent._mock_new_name:
1221 if is_a_call:
1222 dot = ''
1223 else:
1224 dot = '.'
1225 is_a_call = _new_parent._mock_new_name == '()'
1226 mock_call_name = _new_parent._mock_new_name + dot + mock_call_name
1228 # follow the parental chain:
1229 _new_parent = _new_parent._mock_new_parent
1231 def _execute_mock_call(_mock_self, *args, **kwargs):
1232 self = _mock_self
1233 # separate from _increment_mock_call so that awaited functions are
1234 # executed separately from their call, also AsyncMock overrides this method
1236 effect = self.side_effect
1237 if effect is not None:
1238 if _is_exception(effect):
1239 raise effect
1240 elif not _callable(effect):
1241 result = next(effect)
1242 if _is_exception(result):
1243 raise result
1244 else:
1245 result = effect(*args, **kwargs)
1247 if result is not DEFAULT:
1248 return result
1250 if self._mock_return_value is not DEFAULT:
1251 return self.return_value
1253 if self._mock_wraps is not None:
1254 return self._mock_wraps(*args, **kwargs)
1256 return self.return_value
1260class Mock(CallableMixin, NonCallableMock):
1261 """
1262 Create a new `Mock` object. `Mock` takes several optional arguments
1263 that specify the behaviour of the Mock object:
1265 * `spec`: This can be either a list of strings or an existing object (a
1266 class or instance) that acts as the specification for the mock object. If
1267 you pass in an object then a list of strings is formed by calling dir on
1268 the object (excluding unsupported magic attributes and methods). Accessing
1269 any attribute not in this list will raise an `AttributeError`.
1271 If `spec` is an object (rather than a list of strings) then
1272 `mock.__class__` returns the class of the spec object. This allows mocks
1273 to pass `isinstance` tests.
1275 * `spec_set`: A stricter variant of `spec`. If used, attempting to *set*
1276 or get an attribute on the mock that isn't on the object passed as
1277 `spec_set` will raise an `AttributeError`.
1279 * `side_effect`: A function to be called whenever the Mock is called. See
1280 the `side_effect` attribute. Useful for raising exceptions or
1281 dynamically changing return values. The function is called with the same
1282 arguments as the mock, and unless it returns `DEFAULT`, the return
1283 value of this function is used as the return value.
1285 If `side_effect` is an iterable then each call to the mock will return
1286 the next value from the iterable. If any of the members of the iterable
1287 are exceptions they will be raised instead of returned.
1289 * `return_value`: The value returned when the mock is called. By default
1290 this is a new Mock (created on first access). See the
1291 `return_value` attribute.
1293 * `unsafe`: By default, accessing any attribute whose name starts with
1294 *assert*, *assret*, *asert*, *aseert*, or *assrt* raises an AttributeError.
1295 Additionally, an AttributeError is raised when accessing
1296 attributes that match the name of an assertion method without the prefix
1297 `assert_`, e.g. accessing `called_once` instead of `assert_called_once`.
1298 Passing `unsafe=True` will allow access to these attributes.
1300 * `wraps`: Item for the mock object to wrap. If `wraps` is not None then
1301 calling the Mock will pass the call through to the wrapped object
1302 (returning the real result). Attribute access on the mock will return a
1303 Mock object that wraps the corresponding attribute of the wrapped object
1304 (so attempting to access an attribute that doesn't exist will raise an
1305 `AttributeError`).
1307 If the mock has an explicit `return_value` set then calls are not passed
1308 to the wrapped object and the `return_value` is returned instead.
1310 * `name`: If the mock has a name then it will be used in the repr of the
1311 mock. This can be useful for debugging. The name is propagated to child
1312 mocks.
1314 Mocks can also be called with arbitrary keyword arguments. These will be
1315 used to set attributes on the mock after it is created.
1316 """
1319def _dot_lookup(thing, comp, import_path):
1320 try:
1321 return getattr(thing, comp)
1322 except AttributeError:
1323 __import__(import_path)
1324 return getattr(thing, comp)
1327def _importer(target):
1328 components = target.split('.')
1329 import_path = components.pop(0)
1330 thing = __import__(import_path)
1332 for comp in components:
1333 import_path += ".%s" % comp
1334 thing = _dot_lookup(thing, comp, import_path)
1335 return thing
1338# _check_spec_arg_typos takes kwargs from commands like patch and checks that
1339# they don't contain common misspellings of arguments related to autospeccing.
1340def _check_spec_arg_typos(kwargs_to_check):
1341 typos = ("autospect", "auto_spec", "set_spec")
1342 for typo in typos:
1343 if typo in kwargs_to_check:
1344 raise RuntimeError(
1345 f"{typo!r} might be a typo; use unsafe=True if this is intended"
1346 )
1349class _patch(object):
1351 attribute_name = None
1352 _active_patches = []
1354 def __init__(
1355 self, getter, attribute, new, spec, create,
1356 spec_set, autospec, new_callable, kwargs, *, unsafe=False
1357 ):
1358 if new_callable is not None:
1359 if new is not DEFAULT:
1360 raise ValueError(
1361 "Cannot use 'new' and 'new_callable' together"
1362 )
1363 if autospec is not None:
1364 raise ValueError(
1365 "Cannot use 'autospec' and 'new_callable' together"
1366 )
1367 if not unsafe:
1368 _check_spec_arg_typos(kwargs)
1369 if _is_instance_mock(spec):
1370 raise InvalidSpecError(
1371 f'Cannot spec attr {attribute!r} as the spec '
1372 f'has already been mocked out. [spec={spec!r}]')
1373 if _is_instance_mock(spec_set):
1374 raise InvalidSpecError(
1375 f'Cannot spec attr {attribute!r} as the spec_set '
1376 f'target has already been mocked out. [spec_set={spec_set!r}]')
1378 self.getter = getter
1379 self.attribute = attribute
1380 self.new = new
1381 self.new_callable = new_callable
1382 self.spec = spec
1383 self.create = create
1384 self.has_local = False
1385 self.spec_set = spec_set
1386 self.autospec = autospec
1387 self.kwargs = kwargs
1388 self.additional_patchers = []
1391 def copy(self):
1392 patcher = _patch(
1393 self.getter, self.attribute, self.new, self.spec,
1394 self.create, self.spec_set,
1395 self.autospec, self.new_callable, self.kwargs
1396 )
1397 patcher.attribute_name = self.attribute_name
1398 patcher.additional_patchers = [
1399 p.copy() for p in self.additional_patchers
1400 ]
1401 return patcher
1404 def __call__(self, func):
1405 if isinstance(func, type):
1406 return self.decorate_class(func)
1407 if inspect.iscoroutinefunction(func):
1408 return self.decorate_async_callable(func)
1409 return self.decorate_callable(func)
1412 def decorate_class(self, klass):
1413 for attr in dir(klass):
1414 if not attr.startswith(patch.TEST_PREFIX):
1415 continue
1417 attr_value = getattr(klass, attr)
1418 if not hasattr(attr_value, "__call__"):
1419 continue
1421 patcher = self.copy()
1422 setattr(klass, attr, patcher(attr_value))
1423 return klass
1426 @contextlib.contextmanager
1427 def decoration_helper(self, patched, args, keywargs):
1428 extra_args = []
1429 with contextlib.ExitStack() as exit_stack:
1430 for patching in patched.patchings:
1431 arg = exit_stack.enter_context(patching)
1432 if patching.attribute_name is not None:
1433 keywargs.update(arg)
1434 elif patching.new is DEFAULT:
1435 extra_args.append(arg)
1437 args += tuple(extra_args)
1438 yield (args, keywargs)
1441 def decorate_callable(self, func):
1442 # NB. Keep the method in sync with decorate_async_callable()
1443 if hasattr(func, 'patchings'):
1444 func.patchings.append(self)
1445 return func
1447 @wraps(func)
1448 def patched(*args, **keywargs):
1449 with self.decoration_helper(patched,
1450 args,
1451 keywargs) as (newargs, newkeywargs):
1452 return func(*newargs, **newkeywargs)
1454 patched.patchings = [self]
1455 return patched
1458 def decorate_async_callable(self, func):
1459 # NB. Keep the method in sync with decorate_callable()
1460 if hasattr(func, 'patchings'):
1461 func.patchings.append(self)
1462 return func
1464 @wraps(func)
1465 async def patched(*args, **keywargs):
1466 with self.decoration_helper(patched,
1467 args,
1468 keywargs) as (newargs, newkeywargs):
1469 return await func(*newargs, **newkeywargs)
1471 patched.patchings = [self]
1472 return patched
1475 def get_original(self):
1476 target = self.getter()
1477 name = self.attribute
1479 original = DEFAULT
1480 local = False
1482 try:
1483 original = target.__dict__[name]
1484 except (AttributeError, KeyError):
1485 original = getattr(target, name, DEFAULT)
1486 else:
1487 local = True
1489 if name in _builtins and isinstance(target, ModuleType):
1490 self.create = True
1492 if not self.create and original is DEFAULT:
1493 raise AttributeError(
1494 "%s does not have the attribute %r" % (target, name)
1495 )
1496 return original, local
1499 def __enter__(self):
1500 """Perform the patch."""
1501 new, spec, spec_set = self.new, self.spec, self.spec_set
1502 autospec, kwargs = self.autospec, self.kwargs
1503 new_callable = self.new_callable
1504 self.target = self.getter()
1506 # normalise False to None
1507 if spec is False:
1508 spec = None
1509 if spec_set is False:
1510 spec_set = None
1511 if autospec is False:
1512 autospec = None
1514 if spec is not None and autospec is not None:
1515 raise TypeError("Can't specify spec and autospec")
1516 if ((spec is not None or autospec is not None) and
1517 spec_set not in (True, None)):
1518 raise TypeError("Can't provide explicit spec_set *and* spec or autospec")
1520 original, local = self.get_original()
1522 if new is DEFAULT and autospec is None:
1523 inherit = False
1524 if spec is True:
1525 # set spec to the object we are replacing
1526 spec = original
1527 if spec_set is True:
1528 spec_set = original
1529 spec = None
1530 elif spec is not None:
1531 if spec_set is True:
1532 spec_set = spec
1533 spec = None
1534 elif spec_set is True:
1535 spec_set = original
1537 if spec is not None or spec_set is not None:
1538 if original is DEFAULT:
1539 raise TypeError("Can't use 'spec' with create=True")
1540 if isinstance(original, type):
1541 # If we're patching out a class and there is a spec
1542 inherit = True
1543 if spec is None and _is_async_obj(original):
1544 Klass = AsyncMock
1545 else:
1546 Klass = MagicMock
1547 _kwargs = {}
1548 if new_callable is not None:
1549 Klass = new_callable
1550 elif spec is not None or spec_set is not None:
1551 this_spec = spec
1552 if spec_set is not None:
1553 this_spec = spec_set
1554 if _is_list(this_spec):
1555 not_callable = '__call__' not in this_spec
1556 else:
1557 not_callable = not callable(this_spec)
1558 if _is_async_obj(this_spec):
1559 Klass = AsyncMock
1560 elif not_callable:
1561 Klass = NonCallableMagicMock
1563 if spec is not None:
1564 _kwargs['spec'] = spec
1565 if spec_set is not None:
1566 _kwargs['spec_set'] = spec_set
1568 # add a name to mocks
1569 if (isinstance(Klass, type) and
1570 issubclass(Klass, NonCallableMock) and self.attribute):
1571 _kwargs['name'] = self.attribute
1573 _kwargs.update(kwargs)
1574 new = Klass(**_kwargs)
1576 if inherit and _is_instance_mock(new):
1577 # we can only tell if the instance should be callable if the
1578 # spec is not a list
1579 this_spec = spec
1580 if spec_set is not None:
1581 this_spec = spec_set
1582 if (not _is_list(this_spec) and not
1583 _instance_callable(this_spec)):
1584 Klass = NonCallableMagicMock
1586 _kwargs.pop('name')
1587 new.return_value = Klass(_new_parent=new, _new_name='()',
1588 **_kwargs)
1589 elif autospec is not None:
1590 # spec is ignored, new *must* be default, spec_set is treated
1591 # as a boolean. Should we check spec is not None and that spec_set
1592 # is a bool?
1593 if new is not DEFAULT:
1594 raise TypeError(
1595 "autospec creates the mock for you. Can't specify "
1596 "autospec and new."
1597 )
1598 if original is DEFAULT:
1599 raise TypeError("Can't use 'autospec' with create=True")
1600 spec_set = bool(spec_set)
1601 if autospec is True:
1602 autospec = original
1604 if _is_instance_mock(self.target):
1605 raise InvalidSpecError(
1606 f'Cannot autospec attr {self.attribute!r} as the patch '
1607 f'target has already been mocked out. '
1608 f'[target={self.target!r}, attr={autospec!r}]')
1609 if _is_instance_mock(autospec):
1610 target_name = getattr(self.target, '__name__', self.target)
1611 raise InvalidSpecError(
1612 f'Cannot autospec attr {self.attribute!r} from target '
1613 f'{target_name!r} as it has already been mocked out. '
1614 f'[target={self.target!r}, attr={autospec!r}]')
1616 new = create_autospec(autospec, spec_set=spec_set,
1617 _name=self.attribute, **kwargs)
1618 elif kwargs:
1619 # can't set keyword args when we aren't creating the mock
1620 # XXXX If new is a Mock we could call new.configure_mock(**kwargs)
1621 raise TypeError("Can't pass kwargs to a mock we aren't creating")
1623 new_attr = new
1625 self.temp_original = original
1626 self.is_local = local
1627 self._exit_stack = contextlib.ExitStack()
1628 try:
1629 setattr(self.target, self.attribute, new_attr)
1630 if self.attribute_name is not None:
1631 extra_args = {}
1632 if self.new is DEFAULT:
1633 extra_args[self.attribute_name] = new
1634 for patching in self.additional_patchers:
1635 arg = self._exit_stack.enter_context(patching)
1636 if patching.new is DEFAULT:
1637 extra_args.update(arg)
1638 return extra_args
1640 return new
1641 except:
1642 if not self.__exit__(*sys.exc_info()):
1643 raise
1645 def __exit__(self, *exc_info):
1646 """Undo the patch."""
1647 if self.is_local and self.temp_original is not DEFAULT:
1648 setattr(self.target, self.attribute, self.temp_original)
1649 else:
1650 delattr(self.target, self.attribute)
1651 if not self.create and (not hasattr(self.target, self.attribute) or
1652 self.attribute in ('__doc__', '__module__',
1653 '__defaults__', '__annotations__',
1654 '__kwdefaults__')):
1655 # needed for proxy objects like django settings
1656 setattr(self.target, self.attribute, self.temp_original)
1658 del self.temp_original
1659 del self.is_local
1660 del self.target
1661 exit_stack = self._exit_stack
1662 del self._exit_stack
1663 return exit_stack.__exit__(*exc_info)
1666 def start(self):
1667 """Activate a patch, returning any created mock."""
1668 result = self.__enter__()
1669 self._active_patches.append(self)
1670 return result
1673 def stop(self):
1674 """Stop an active patch."""
1675 try:
1676 self._active_patches.remove(self)
1677 except ValueError:
1678 # If the patch hasn't been started this will fail
1679 return None
1681 return self.__exit__(None, None, None)
1685def _get_target(target):
1686 try:
1687 target, attribute = target.rsplit('.', 1)
1688 except (TypeError, ValueError, AttributeError):
1689 raise TypeError(
1690 f"Need a valid target to patch. You supplied: {target!r}")
1691 getter = lambda: _importer(target)
1692 return getter, attribute
1695def _patch_object(
1696 target, attribute, new=DEFAULT, spec=None,
1697 create=False, spec_set=None, autospec=None,
1698 new_callable=None, *, unsafe=False, **kwargs
1699 ):
1700 """
1701 patch the named member (`attribute`) on an object (`target`) with a mock
1702 object.
1704 `patch.object` can be used as a decorator, class decorator or a context
1705 manager. Arguments `new`, `spec`, `create`, `spec_set`,
1706 `autospec` and `new_callable` have the same meaning as for `patch`. Like
1707 `patch`, `patch.object` takes arbitrary keyword arguments for configuring
1708 the mock object it creates.
1710 When used as a class decorator `patch.object` honours `patch.TEST_PREFIX`
1711 for choosing which methods to wrap.
1712 """
1713 if type(target) is str:
1714 raise TypeError(
1715 f"{target!r} must be the actual object to be patched, not a str"
1716 )
1717 getter = lambda: target
1718 return _patch(
1719 getter, attribute, new, spec, create,
1720 spec_set, autospec, new_callable, kwargs, unsafe=unsafe
1721 )
1724def _patch_multiple(target, spec=None, create=False, spec_set=None,
1725 autospec=None, new_callable=None, **kwargs):
1726 """Perform multiple patches in a single call. It takes the object to be
1727 patched (either as an object or a string to fetch the object by importing)
1728 and keyword arguments for the patches::
1730 with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'):
1731 ...
1733 Use `DEFAULT` as the value if you want `patch.multiple` to create
1734 mocks for you. In this case the created mocks are passed into a decorated
1735 function by keyword, and a dictionary is returned when `patch.multiple` is
1736 used as a context manager.
1738 `patch.multiple` can be used as a decorator, class decorator or a context
1739 manager. The arguments `spec`, `spec_set`, `create`,
1740 `autospec` and `new_callable` have the same meaning as for `patch`. These
1741 arguments will be applied to *all* patches done by `patch.multiple`.
1743 When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX`
1744 for choosing which methods to wrap.
1745 """
1746 if type(target) is str:
1747 getter = lambda: _importer(target)
1748 else:
1749 getter = lambda: target
1751 if not kwargs:
1752 raise ValueError(
1753 'Must supply at least one keyword argument with patch.multiple'
1754 )
1755 # need to wrap in a list for python 3, where items is a view
1756 items = list(kwargs.items())
1757 attribute, new = items[0]
1758 patcher = _patch(
1759 getter, attribute, new, spec, create, spec_set,
1760 autospec, new_callable, {}
1761 )
1762 patcher.attribute_name = attribute
1763 for attribute, new in items[1:]:
1764 this_patcher = _patch(
1765 getter, attribute, new, spec, create, spec_set,
1766 autospec, new_callable, {}
1767 )
1768 this_patcher.attribute_name = attribute
1769 patcher.additional_patchers.append(this_patcher)
1770 return patcher
1773def patch(
1774 target, new=DEFAULT, spec=None, create=False,
1775 spec_set=None, autospec=None, new_callable=None, *, unsafe=False, **kwargs
1776 ):
1777 """
1778 `patch` acts as a function decorator, class decorator or a context
1779 manager. Inside the body of the function or with statement, the `target`
1780 is patched with a `new` object. When the function/with statement exits
1781 the patch is undone.
1783 If `new` is omitted, then the target is replaced with an
1784 `AsyncMock if the patched object is an async function or a
1785 `MagicMock` otherwise. If `patch` is used as a decorator and `new` is
1786 omitted, the created mock is passed in as an extra argument to the
1787 decorated function. If `patch` is used as a context manager the created
1788 mock is returned by the context manager.
1790 `target` should be a string in the form `'package.module.ClassName'`. The
1791 `target` is imported and the specified object replaced with the `new`
1792 object, so the `target` must be importable from the environment you are
1793 calling `patch` from. The target is imported when the decorated function
1794 is executed, not at decoration time.
1796 The `spec` and `spec_set` keyword arguments are passed to the `MagicMock`
1797 if patch is creating one for you.
1799 In addition you can pass `spec=True` or `spec_set=True`, which causes
1800 patch to pass in the object being mocked as the spec/spec_set object.
1802 `new_callable` allows you to specify a different class, or callable object,
1803 that will be called to create the `new` object. By default `AsyncMock` is
1804 used for async functions and `MagicMock` for the rest.
1806 A more powerful form of `spec` is `autospec`. If you set `autospec=True`
1807 then the mock will be created with a spec from the object being replaced.
1808 All attributes of the mock will also have the spec of the corresponding
1809 attribute of the object being replaced. Methods and functions being
1810 mocked will have their arguments checked and will raise a `TypeError` if
1811 they are called with the wrong signature. For mocks replacing a class,
1812 their return value (the 'instance') will have the same spec as the class.
1814 Instead of `autospec=True` you can pass `autospec=some_object` to use an
1815 arbitrary object as the spec instead of the one being replaced.
1817 By default `patch` will fail to replace attributes that don't exist. If
1818 you pass in `create=True`, and the attribute doesn't exist, patch will
1819 create the attribute for you when the patched function is called, and
1820 delete it again afterwards. This is useful for writing tests against
1821 attributes that your production code creates at runtime. It is off by
1822 default because it can be dangerous. With it switched on you can write
1823 passing tests against APIs that don't actually exist!
1825 Patch can be used as a `TestCase` class decorator. It works by
1826 decorating each test method in the class. This reduces the boilerplate
1827 code when your test methods share a common patchings set. `patch` finds
1828 tests by looking for method names that start with `patch.TEST_PREFIX`.
1829 By default this is `test`, which matches the way `unittest` finds tests.
1830 You can specify an alternative prefix by setting `patch.TEST_PREFIX`.
1832 Patch can be used as a context manager, with the with statement. Here the
1833 patching applies to the indented block after the with statement. If you
1834 use "as" then the patched object will be bound to the name after the
1835 "as"; very useful if `patch` is creating a mock object for you.
1837 Patch will raise a `RuntimeError` if passed some common misspellings of
1838 the arguments autospec and spec_set. Pass the argument `unsafe` with the
1839 value True to disable that check.
1841 `patch` takes arbitrary keyword arguments. These will be passed to
1842 `AsyncMock` if the patched object is asynchronous, to `MagicMock`
1843 otherwise or to `new_callable` if specified.
1845 `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are
1846 available for alternate use-cases.
1847 """
1848 getter, attribute = _get_target(target)
1849 return _patch(
1850 getter, attribute, new, spec, create,
1851 spec_set, autospec, new_callable, kwargs, unsafe=unsafe
1852 )
1855class _patch_dict(object):
1856 """
1857 Patch a dictionary, or dictionary like object, and restore the dictionary
1858 to its original state after the test.
1860 `in_dict` can be a dictionary or a mapping like container. If it is a
1861 mapping then it must at least support getting, setting and deleting items
1862 plus iterating over keys.
1864 `in_dict` can also be a string specifying the name of the dictionary, which
1865 will then be fetched by importing it.
1867 `values` can be a dictionary of values to set in the dictionary. `values`
1868 can also be an iterable of `(key, value)` pairs.
1870 If `clear` is True then the dictionary will be cleared before the new
1871 values are set.
1873 `patch.dict` can also be called with arbitrary keyword arguments to set
1874 values in the dictionary::
1876 with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()):
1877 ...
1879 `patch.dict` can be used as a context manager, decorator or class
1880 decorator. When used as a class decorator `patch.dict` honours
1881 `patch.TEST_PREFIX` for choosing which methods to wrap.
1882 """
1884 def __init__(self, in_dict, values=(), clear=False, **kwargs):
1885 self.in_dict = in_dict
1886 # support any argument supported by dict(...) constructor
1887 self.values = dict(values)
1888 self.values.update(kwargs)
1889 self.clear = clear
1890 self._original = None
1893 def __call__(self, f):
1894 if isinstance(f, type):
1895 return self.decorate_class(f)
1896 if inspect.iscoroutinefunction(f):
1897 return self.decorate_async_callable(f)
1898 return self.decorate_callable(f)
1901 def decorate_callable(self, f):
1902 @wraps(f)
1903 def _inner(*args, **kw):
1904 self._patch_dict()
1905 try:
1906 return f(*args, **kw)
1907 finally:
1908 self._unpatch_dict()
1910 return _inner
1913 def decorate_async_callable(self, f):
1914 @wraps(f)
1915 async def _inner(*args, **kw):
1916 self._patch_dict()
1917 try:
1918 return await f(*args, **kw)
1919 finally:
1920 self._unpatch_dict()
1922 return _inner
1925 def decorate_class(self, klass):
1926 for attr in dir(klass):
1927 attr_value = getattr(klass, attr)
1928 if (attr.startswith(patch.TEST_PREFIX) and
1929 hasattr(attr_value, "__call__")):
1930 decorator = _patch_dict(self.in_dict, self.values, self.clear)
1931 decorated = decorator(attr_value)
1932 setattr(klass, attr, decorated)
1933 return klass
1936 def __enter__(self):
1937 """Patch the dict."""
1938 self._patch_dict()
1939 return self.in_dict
1942 def _patch_dict(self):
1943 values = self.values
1944 if isinstance(self.in_dict, str):
1945 self.in_dict = _importer(self.in_dict)
1946 in_dict = self.in_dict
1947 clear = self.clear
1949 try:
1950 original = in_dict.copy()
1951 except AttributeError:
1952 # dict like object with no copy method
1953 # must support iteration over keys
1954 original = {}
1955 for key in in_dict:
1956 original[key] = in_dict[key]
1957 self._original = original
1959 if clear:
1960 _clear_dict(in_dict)
1962 try:
1963 in_dict.update(values)
1964 except AttributeError:
1965 # dict like object with no update method
1966 for key in values:
1967 in_dict[key] = values[key]
1970 def _unpatch_dict(self):
1971 in_dict = self.in_dict
1972 original = self._original
1974 _clear_dict(in_dict)
1976 try:
1977 in_dict.update(original)
1978 except AttributeError:
1979 for key in original:
1980 in_dict[key] = original[key]
1983 def __exit__(self, *args):
1984 """Unpatch the dict."""
1985 if self._original is not None:
1986 self._unpatch_dict()
1987 return False
1990 def start(self):
1991 """Activate a patch, returning any created mock."""
1992 result = self.__enter__()
1993 _patch._active_patches.append(self)
1994 return result
1997 def stop(self):
1998 """Stop an active patch."""
1999 try:
2000 _patch._active_patches.remove(self)
2001 except ValueError:
2002 # If the patch hasn't been started this will fail
2003 return None
2005 return self.__exit__(None, None, None)
2008def _clear_dict(in_dict):
2009 try:
2010 in_dict.clear()
2011 except AttributeError:
2012 keys = list(in_dict)
2013 for key in keys:
2014 del in_dict[key]
2017def _patch_stopall():
2018 """Stop all active patches. LIFO to unroll nested patches."""
2019 for patch in reversed(_patch._active_patches):
2020 patch.stop()
2023patch.object = _patch_object
2024patch.dict = _patch_dict
2025patch.multiple = _patch_multiple
2026patch.stopall = _patch_stopall
2027patch.TEST_PREFIX = 'test'
2029magic_methods = (
2030 "lt le gt ge eq ne "
2031 "getitem setitem delitem "
2032 "len contains iter "
2033 "hash str sizeof "
2034 "enter exit "
2035 # we added divmod and rdivmod here instead of numerics
2036 # because there is no idivmod
2037 "divmod rdivmod neg pos abs invert "
2038 "complex int float index "
2039 "round trunc floor ceil "
2040 "bool next "
2041 "fspath "
2042 "aiter "
2043)
2045if IS_PYPY:
2046 # PyPy has no __sizeof__: http://doc.pypy.org/en/latest/cpython_differences.html
2047 magic_methods = magic_methods.replace('sizeof ', '')
2049numerics = (
2050 "add sub mul matmul truediv floordiv mod lshift rshift and xor or pow"
2051)
2052inplace = ' '.join('i%s' % n for n in numerics.split())
2053right = ' '.join('r%s' % n for n in numerics.split())
2055# not including __prepare__, __instancecheck__, __subclasscheck__
2056# (as they are metaclass methods)
2057# __del__ is not supported at all as it causes problems if it exists
2059_non_defaults = {
2060 '__get__', '__set__', '__delete__', '__reversed__', '__missing__',
2061 '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__',
2062 '__getstate__', '__setstate__', '__getformat__',
2063 '__repr__', '__dir__', '__subclasses__', '__format__',
2064 '__getnewargs_ex__',
2065}
2068def _get_method(name, func):
2069 "Turns a callable object (like a mock) into a real function"
2070 def method(self, *args, **kw):
2071 return func(self, *args, **kw)
2072 method.__name__ = name
2073 return method
2076_magics = {
2077 '__%s__' % method for method in
2078 ' '.join([magic_methods, numerics, inplace, right]).split()
2079}
2081# Magic methods used for async `with` statements
2082_async_method_magics = {"__aenter__", "__aexit__", "__anext__"}
2083# Magic methods that are only used with async calls but are synchronous functions themselves
2084_sync_async_magics = {"__aiter__"}
2085_async_magics = _async_method_magics | _sync_async_magics
2087_all_sync_magics = _magics | _non_defaults
2088_all_magics = _all_sync_magics | _async_magics
2090_unsupported_magics = {
2091 '__getattr__', '__setattr__',
2092 '__init__', '__new__', '__prepare__',
2093 '__instancecheck__', '__subclasscheck__',
2094 '__del__'
2095}
2097_calculate_return_value = {
2098 '__hash__': lambda self: object.__hash__(self),
2099 '__str__': lambda self: object.__str__(self),
2100 '__sizeof__': lambda self: object.__sizeof__(self),
2101 '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}",
2102}
2104_return_values = {
2105 '__lt__': NotImplemented,
2106 '__gt__': NotImplemented,
2107 '__le__': NotImplemented,
2108 '__ge__': NotImplemented,
2109 '__int__': 1,
2110 '__contains__': False,
2111 '__len__': 0,
2112 '__exit__': False,
2113 '__complex__': 1j,
2114 '__float__': 1.0,
2115 '__bool__': True,
2116 '__index__': 1,
2117 '__aexit__': False,
2118}
2121def _get_eq(self):
2122 def __eq__(other):
2123 ret_val = self.__eq__._mock_return_value
2124 if ret_val is not DEFAULT:
2125 return ret_val
2126 if self is other:
2127 return True
2128 return NotImplemented
2129 return __eq__
2131def _get_ne(self):
2132 def __ne__(other):
2133 if self.__ne__._mock_return_value is not DEFAULT:
2134 return DEFAULT
2135 if self is other:
2136 return False
2137 return NotImplemented
2138 return __ne__
2140def _get_iter(self):
2141 def __iter__():
2142 ret_val = self.__iter__._mock_return_value
2143 if ret_val is DEFAULT:
2144 return iter([])
2145 # if ret_val was already an iterator, then calling iter on it should
2146 # return the iterator unchanged
2147 return iter(ret_val)
2148 return __iter__
2150def _get_async_iter(self):
2151 def __aiter__():
2152 ret_val = self.__aiter__._mock_return_value
2153 if ret_val is DEFAULT:
2154 return _AsyncIterator(iter([]))
2155 return _AsyncIterator(iter(ret_val))
2156 return __aiter__
2158_side_effect_methods = {
2159 '__eq__': _get_eq,
2160 '__ne__': _get_ne,
2161 '__iter__': _get_iter,
2162 '__aiter__': _get_async_iter
2163}
2167def _set_return_value(mock, method, name):
2168 fixed = _return_values.get(name, DEFAULT)
2169 if fixed is not DEFAULT:
2170 method.return_value = fixed
2171 return
2173 return_calculator = _calculate_return_value.get(name)
2174 if return_calculator is not None:
2175 return_value = return_calculator(mock)
2176 method.return_value = return_value
2177 return
2179 side_effector = _side_effect_methods.get(name)
2180 if side_effector is not None:
2181 method.side_effect = side_effector(mock)
2185class MagicMixin(Base):
2186 def __init__(self, *args, **kw):
2187 self._mock_set_magics() # make magic work for kwargs in init
2188 _safe_super(MagicMixin, self).__init__(*args, **kw)
2189 self._mock_set_magics() # fix magic broken by upper level init
2192 def _mock_set_magics(self):
2193 orig_magics = _magics | _async_method_magics
2194 these_magics = orig_magics
2196 if getattr(self, "_mock_methods", None) is not None:
2197 these_magics = orig_magics.intersection(self._mock_methods)
2199 remove_magics = set()
2200 remove_magics = orig_magics - these_magics
2202 for entry in remove_magics:
2203 if entry in type(self).__dict__:
2204 # remove unneeded magic methods
2205 delattr(self, entry)
2207 # don't overwrite existing attributes if called a second time
2208 these_magics = these_magics - set(type(self).__dict__)
2210 _type = type(self)
2211 for entry in these_magics:
2212 setattr(_type, entry, MagicProxy(entry, self))
2216class NonCallableMagicMock(MagicMixin, NonCallableMock):
2217 """A version of `MagicMock` that isn't callable."""
2218 def mock_add_spec(self, spec, spec_set=False):
2219 """Add a spec to a mock. `spec` can either be an object or a
2220 list of strings. Only attributes on the `spec` can be fetched as
2221 attributes from the mock.
2223 If `spec_set` is True then only attributes on the spec can be set."""
2224 self._mock_add_spec(spec, spec_set)
2225 self._mock_set_magics()
2228class AsyncMagicMixin(MagicMixin):
2229 pass
2232class MagicMock(MagicMixin, Mock):
2233 """
2234 MagicMock is a subclass of Mock with default implementations
2235 of most of the magic methods. You can use MagicMock without having to
2236 configure the magic methods yourself.
2238 If you use the `spec` or `spec_set` arguments then *only* magic
2239 methods that exist in the spec will be created.
2241 Attributes and the return value of a `MagicMock` will also be `MagicMocks`.
2242 """
2243 def mock_add_spec(self, spec, spec_set=False):
2244 """Add a spec to a mock. `spec` can either be an object or a
2245 list of strings. Only attributes on the `spec` can be fetched as
2246 attributes from the mock.
2248 If `spec_set` is True then only attributes on the spec can be set."""
2249 self._mock_add_spec(spec, spec_set)
2250 self._mock_set_magics()
2254class MagicProxy(Base):
2255 def __init__(self, name, parent):
2256 self.name = name
2257 self.parent = parent
2259 def create_mock(self):
2260 entry = self.name
2261 parent = self.parent
2262 m = parent._get_child_mock(name=entry, _new_name=entry,
2263 _new_parent=parent)
2264 setattr(parent, entry, m)
2265 _set_return_value(parent, m, entry)
2266 return m
2268 def __get__(self, obj, _type=None):
2269 return self.create_mock()
2272_CODE_ATTRS = dir(CodeType)
2273_CODE_SIG = inspect.signature(partial(CodeType.__init__, None))
2276class AsyncMockMixin(Base):
2277 await_count = _delegating_property('await_count')
2278 await_args = _delegating_property('await_args')
2279 await_args_list = _delegating_property('await_args_list')
2281 def __init__(self, *args, **kwargs):
2282 super().__init__(*args, **kwargs)
2283 # iscoroutinefunction() checks _is_coroutine property to say if an
2284 # object is a coroutine. Without this check it looks to see if it is a
2285 # function/method, which in this case it is not (since it is an
2286 # AsyncMock).
2287 # It is set through __dict__ because when spec_set is True, this
2288 # attribute is likely undefined.
2289 self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine
2290 self.__dict__['_mock_await_count'] = 0
2291 self.__dict__['_mock_await_args'] = None
2292 self.__dict__['_mock_await_args_list'] = _CallList()
2293 code_mock = NonCallableMock(spec_set=_CODE_ATTRS)
2294 code_mock.__dict__["_spec_class"] = CodeType
2295 code_mock.__dict__["_spec_signature"] = _CODE_SIG
2296 code_mock.co_flags = (
2297 inspect.CO_COROUTINE
2298 + inspect.CO_VARARGS
2299 + inspect.CO_VARKEYWORDS
2300 )
2301 code_mock.co_argcount = 0
2302 code_mock.co_varnames = ('args', 'kwargs')
2303 try:
2304 code_mock.co_posonlyargcount = 0
2305 except AttributeError:
2306 # Python 3.7 and earlier.
2307 pass
2308 code_mock.co_kwonlyargcount = 0
2309 self.__dict__['__code__'] = code_mock
2310 self.__dict__['__name__'] = 'AsyncMock'
2311 self.__dict__['__defaults__'] = tuple()
2312 self.__dict__['__kwdefaults__'] = {}
2313 self.__dict__['__annotations__'] = None
2315 async def _execute_mock_call(_mock_self, *args, **kwargs):
2316 self = _mock_self
2317 # This is nearly just like super(), except for special handling
2318 # of coroutines
2320 _call = _Call((args, kwargs), two=True)
2321 self.await_count += 1
2322 self.await_args = _call
2323 self.await_args_list.append(_call)
2325 effect = self.side_effect
2326 if effect is not None:
2327 if _is_exception(effect):
2328 raise effect
2329 elif not _callable(effect):
2330 try:
2331 result = next(effect)
2332 except StopIteration:
2333 # It is impossible to propagate a StopIteration
2334 # through coroutines because of PEP 479
2335 raise StopAsyncIteration
2336 if _is_exception(result):
2337 raise result
2338 elif iscoroutinefunction(effect):
2339 result = await effect(*args, **kwargs)
2340 else:
2341 result = effect(*args, **kwargs)
2343 if result is not DEFAULT:
2344 return result
2346 if self._mock_return_value is not DEFAULT:
2347 return self.return_value
2349 if self._mock_wraps is not None:
2350 if iscoroutinefunction(self._mock_wraps):
2351 return await self._mock_wraps(*args, **kwargs)
2352 return self._mock_wraps(*args, **kwargs)
2354 return self.return_value
2356 def assert_awaited(_mock_self):
2357 """
2358 Assert that the mock was awaited at least once.
2359 """
2360 self = _mock_self
2361 if self.await_count == 0:
2362 msg = f"Expected {self._mock_name or 'mock'} to have been awaited."
2363 raise AssertionError(msg)
2365 def assert_awaited_once(_mock_self):
2366 """
2367 Assert that the mock was awaited exactly once.
2368 """
2369 self = _mock_self
2370 if not self.await_count == 1:
2371 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2372 f" Awaited {self.await_count} times.")
2373 raise AssertionError(msg)
2375 def assert_awaited_with(_mock_self, *args, **kwargs):
2376 """
2377 Assert that the last await was with the specified arguments.
2378 """
2379 self = _mock_self
2380 if self.await_args is None:
2381 expected = self._format_mock_call_signature(args, kwargs)
2382 raise AssertionError(f'Expected await: {expected}\nNot awaited')
2384 def _error_message():
2385 msg = self._format_mock_failure_message(args, kwargs, action='await')
2386 return msg
2388 expected = self._call_matcher(_Call((args, kwargs), two=True))
2389 actual = self._call_matcher(self.await_args)
2390 if actual != expected:
2391 cause = expected if isinstance(expected, Exception) else None
2392 raise AssertionError(_error_message()) from cause
2394 def assert_awaited_once_with(_mock_self, *args, **kwargs):
2395 """
2396 Assert that the mock was awaited exactly once and with the specified
2397 arguments.
2398 """
2399 self = _mock_self
2400 if not self.await_count == 1:
2401 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2402 f" Awaited {self.await_count} times.")
2403 raise AssertionError(msg)
2404 return self.assert_awaited_with(*args, **kwargs)
2406 def assert_any_await(_mock_self, *args, **kwargs):
2407 """
2408 Assert the mock has ever been awaited with the specified arguments.
2409 """
2410 self = _mock_self
2411 expected = self._call_matcher(_Call((args, kwargs), two=True))
2412 cause = expected if isinstance(expected, Exception) else None
2413 actual = [self._call_matcher(c) for c in self.await_args_list]
2414 if cause or expected not in _AnyComparer(actual):
2415 expected_string = self._format_mock_call_signature(args, kwargs)
2416 raise AssertionError(
2417 '%s await not found' % expected_string
2418 ) from cause
2420 def assert_has_awaits(_mock_self, calls, any_order=False):
2421 """
2422 Assert the mock has been awaited with the specified calls.
2423 The :attr:`await_args_list` list is checked for the awaits.
2425 If `any_order` is False (the default) then the awaits must be
2426 sequential. There can be extra calls before or after the
2427 specified awaits.
2429 If `any_order` is True then the awaits can be in any order, but
2430 they must all appear in :attr:`await_args_list`.
2431 """
2432 self = _mock_self
2433 expected = [self._call_matcher(c) for c in calls]
2434 cause = cause = next((e for e in expected if isinstance(e, Exception)), None)
2435 all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list)
2436 if not any_order:
2437 if expected not in all_awaits:
2438 if cause is None:
2439 problem = 'Awaits not found.'
2440 else:
2441 problem = ('Error processing expected awaits.\n'
2442 'Errors: {}').format(
2443 [e if isinstance(e, Exception) else None
2444 for e in expected])
2445 raise AssertionError(
2446 f'{problem}\n'
2447 f'Expected: {_CallList(calls)}\n'
2448 f'Actual: {self.await_args_list}'
2449 ) from cause
2450 return
2452 all_awaits = list(all_awaits)
2454 not_found = []
2455 for kall in expected:
2456 try:
2457 all_awaits.remove(kall)
2458 except ValueError:
2459 not_found.append(kall)
2460 if not_found:
2461 raise AssertionError(
2462 '%r not all found in await list' % (tuple(not_found),)
2463 ) from cause
2465 def assert_not_awaited(_mock_self):
2466 """
2467 Assert that the mock was never awaited.
2468 """
2469 self = _mock_self
2470 if self.await_count != 0:
2471 msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited."
2472 f" Awaited {self.await_count} times.")
2473 raise AssertionError(msg)
2475 def reset_mock(self, *args, **kwargs):
2476 """
2477 See :func:`.Mock.reset_mock()`
2478 """
2479 super().reset_mock(*args, **kwargs)
2480 self.await_count = 0
2481 self.await_args = None
2482 self.await_args_list = _CallList()
2485class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock):
2486 """
2487 Enhance :class:`Mock` with features allowing to mock
2488 an async function.
2490 The :class:`AsyncMock` object will behave so the object is
2491 recognized as an async function, and the result of a call is an awaitable:
2493 >>> mock = AsyncMock()
2494 >>> iscoroutinefunction(mock)
2495 True
2496 >>> inspect.isawaitable(mock())
2497 True
2500 The result of ``mock()`` is an async function which will have the outcome
2501 of ``side_effect`` or ``return_value``:
2503 - if ``side_effect`` is a function, the async function will return the
2504 result of that function,
2505 - if ``side_effect`` is an exception, the async function will raise the
2506 exception,
2507 - if ``side_effect`` is an iterable, the async function will return the
2508 next value of the iterable, however, if the sequence of result is
2509 exhausted, ``StopIteration`` is raised immediately,
2510 - if ``side_effect`` is not defined, the async function will return the
2511 value defined by ``return_value``, hence, by default, the async function
2512 returns a new :class:`AsyncMock` object.
2514 If the outcome of ``side_effect`` or ``return_value`` is an async function,
2515 the mock async function obtained when the mock object is called will be this
2516 async function itself (and not an async function returning an async
2517 function).
2519 The test author can also specify a wrapped object with ``wraps``. In this
2520 case, the :class:`Mock` object behavior is the same as with an
2521 :class:`.Mock` object: the wrapped object may have methods
2522 defined as async function functions.
2524 Based on Martin Richard's asynctest project.
2525 """
2528class _ANY(object):
2529 "A helper object that compares equal to everything."
2531 def __eq__(self, other):
2532 return True
2534 def __ne__(self, other):
2535 return False
2537 def __repr__(self):
2538 return '<ANY>'
2540ANY = _ANY()
2544def _format_call_signature(name, args, kwargs):
2545 message = '%s(%%s)' % name
2546 formatted_args = ''
2547 args_string = ', '.join([repr(arg) for arg in args])
2548 kwargs_string = ', '.join([
2549 '%s=%r' % (key, value) for key, value in kwargs.items()
2550 ])
2551 if args_string:
2552 formatted_args = args_string
2553 if kwargs_string:
2554 if formatted_args:
2555 formatted_args += ', '
2556 formatted_args += kwargs_string
2558 return message % formatted_args
2562class _Call(tuple):
2563 """
2564 A tuple for holding the results of a call to a mock, either in the form
2565 `(args, kwargs)` or `(name, args, kwargs)`.
2567 If args or kwargs are empty then a call tuple will compare equal to
2568 a tuple without those values. This makes comparisons less verbose::
2570 _Call(('name', (), {})) == ('name',)
2571 _Call(('name', (1,), {})) == ('name', (1,))
2572 _Call(((), {'a': 'b'})) == ({'a': 'b'},)
2574 The `_Call` object provides a useful shortcut for comparing with call::
2576 _Call(((1, 2), {'a': 3})) == call(1, 2, a=3)
2577 _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3)
2579 If the _Call has no name then it will match any name.
2580 """
2581 def __new__(cls, value=(), name='', parent=None, two=False,
2582 from_kall=True):
2583 args = ()
2584 kwargs = {}
2585 _len = len(value)
2586 if _len == 3:
2587 name, args, kwargs = value
2588 elif _len == 2:
2589 first, second = value
2590 if isinstance(first, str):
2591 name = first
2592 if isinstance(second, tuple):
2593 args = second
2594 else:
2595 kwargs = second
2596 else:
2597 args, kwargs = first, second
2598 elif _len == 1:
2599 value, = value
2600 if isinstance(value, str):
2601 name = value
2602 elif isinstance(value, tuple):
2603 args = value
2604 else:
2605 kwargs = value
2607 if two:
2608 return tuple.__new__(cls, (args, kwargs))
2610 return tuple.__new__(cls, (name, args, kwargs))
2613 def __init__(self, value=(), name=None, parent=None, two=False,
2614 from_kall=True):
2615 self._mock_name = name
2616 self._mock_parent = parent
2617 self._mock_from_kall = from_kall
2620 def __eq__(self, other):
2621 try:
2622 len_other = len(other)
2623 except TypeError:
2624 return NotImplemented
2626 self_name = ''
2627 if len(self) == 2:
2628 self_args, self_kwargs = self
2629 else:
2630 self_name, self_args, self_kwargs = self
2632 if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None)
2633 and self._mock_parent != other._mock_parent):
2634 return False
2636 other_name = ''
2637 if len_other == 0:
2638 other_args, other_kwargs = (), {}
2639 elif len_other == 3:
2640 other_name, other_args, other_kwargs = other
2641 elif len_other == 1:
2642 value, = other
2643 if isinstance(value, tuple):
2644 other_args = value
2645 other_kwargs = {}
2646 elif isinstance(value, str):
2647 other_name = value
2648 other_args, other_kwargs = (), {}
2649 else:
2650 other_args = ()
2651 other_kwargs = value
2652 elif len_other == 2:
2653 # could be (name, args) or (name, kwargs) or (args, kwargs)
2654 first, second = other
2655 if isinstance(first, str):
2656 other_name = first
2657 if isinstance(second, tuple):
2658 other_args, other_kwargs = second, {}
2659 else:
2660 other_args, other_kwargs = (), second
2661 else:
2662 other_args, other_kwargs = first, second
2663 else:
2664 return False
2666 if self_name and other_name != self_name:
2667 return False
2669 # this order is important for ANY to work!
2670 return (other_args, other_kwargs) == (self_args, self_kwargs)
2673 __ne__ = object.__ne__
2676 def __call__(self, *args, **kwargs):
2677 if self._mock_name is None:
2678 return _Call(('', args, kwargs), name='()')
2680 name = self._mock_name + '()'
2681 return _Call((self._mock_name, args, kwargs), name=name, parent=self)
2684 def __getattr__(self, attr):
2685 if self._mock_name is None:
2686 return _Call(name=attr, from_kall=False)
2687 name = '%s.%s' % (self._mock_name, attr)
2688 return _Call(name=name, parent=self, from_kall=False)
2691 def __getattribute__(self, attr):
2692 if attr in tuple.__dict__:
2693 raise AttributeError
2694 return tuple.__getattribute__(self, attr)
2697 def _get_call_arguments(self):
2698 if len(self) == 2:
2699 args, kwargs = self
2700 else:
2701 name, args, kwargs = self
2703 return args, kwargs
2705 @property
2706 def args(self):
2707 return self._get_call_arguments()[0]
2709 @property
2710 def kwargs(self):
2711 return self._get_call_arguments()[1]
2713 def __repr__(self):
2714 if not self._mock_from_kall:
2715 name = self._mock_name or 'call'
2716 if name.startswith('()'):
2717 name = 'call%s' % name
2718 return name
2720 if len(self) == 2:
2721 name = 'call'
2722 args, kwargs = self
2723 else:
2724 name, args, kwargs = self
2725 if not name:
2726 name = 'call'
2727 elif not name.startswith('()'):
2728 name = 'call.%s' % name
2729 else:
2730 name = 'call%s' % name
2731 return _format_call_signature(name, args, kwargs)
2734 def call_list(self):
2735 """For a call object that represents multiple calls, `call_list`
2736 returns a list of all the intermediate calls as well as the
2737 final call."""
2738 vals = []
2739 thing = self
2740 while thing is not None:
2741 if thing._mock_from_kall:
2742 vals.append(thing)
2743 thing = thing._mock_parent
2744 return _CallList(reversed(vals))
2747call = _Call(from_kall=False)
2750def create_autospec(spec, spec_set=False, instance=False, _parent=None,
2751 _name=None, *, unsafe=False, **kwargs):
2752 """Create a mock object using another object as a spec. Attributes on the
2753 mock will use the corresponding attribute on the `spec` object as their
2754 spec.
2756 Functions or methods being mocked will have their arguments checked
2757 to check that they are called with the correct signature.
2759 If `spec_set` is True then attempting to set attributes that don't exist
2760 on the spec object will raise an `AttributeError`.
2762 If a class is used as a spec then the return value of the mock (the
2763 instance of the class) will have the same spec. You can use a class as the
2764 spec for an instance object by passing `instance=True`. The returned mock
2765 will only be callable if instances of the mock are callable.
2767 `create_autospec` will raise a `RuntimeError` if passed some common
2768 misspellings of the arguments autospec and spec_set. Pass the argument
2769 `unsafe` with the value True to disable that check.
2771 `create_autospec` also takes arbitrary keyword arguments that are passed to
2772 the constructor of the created mock."""
2773 if _is_list(spec):
2774 # can't pass a list instance to the mock constructor as it will be
2775 # interpreted as a list of strings
2776 spec = type(spec)
2778 is_type = isinstance(spec, type)
2779 if _is_instance_mock(spec):
2780 raise InvalidSpecError(f'Cannot autospec a Mock object. '
2781 f'[object={spec!r}]')
2782 is_async_func = _is_async_func(spec)
2783 _kwargs = {'spec': spec}
2784 if spec_set:
2785 _kwargs = {'spec_set': spec}
2786 elif spec is None:
2787 # None we mock with a normal mock without a spec
2788 _kwargs = {}
2789 if _kwargs and instance:
2790 _kwargs['_spec_as_instance'] = True
2791 if not unsafe:
2792 _check_spec_arg_typos(kwargs)
2794 _kwargs.update(kwargs)
2796 Klass = MagicMock
2797 if inspect.isdatadescriptor(spec):
2798 # descriptors don't have a spec
2799 # because we don't know what type they return
2800 _kwargs = {}
2801 elif is_async_func:
2802 if instance:
2803 raise RuntimeError("Instance can not be True when create_autospec "
2804 "is mocking an async function")
2805 Klass = AsyncMock
2806 elif not _callable(spec):
2807 Klass = NonCallableMagicMock
2808 elif is_type and instance and not _instance_callable(spec):
2809 Klass = NonCallableMagicMock
2811 _name = _kwargs.pop('name', _name)
2813 _new_name = _name
2814 if _parent is None:
2815 # for a top level object no _new_name should be set
2816 _new_name = ''
2818 mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name,
2819 name=_name, **_kwargs)
2821 if isinstance(spec, FunctionTypes):
2822 # should only happen at the top level because we don't
2823 # recurse for functions
2824 if is_async_func:
2825 mock = _set_async_signature(mock, spec)
2826 else:
2827 mock = _set_signature(mock, spec)
2828 else:
2829 _check_signature(spec, mock, is_type, instance)
2831 if _parent is not None and not instance:
2832 _parent._mock_children[_name] = mock
2834 if is_type and not instance and 'return_value' not in kwargs:
2835 mock.return_value = create_autospec(spec, spec_set, instance=True,
2836 _name='()', _parent=mock)
2838 for entry in dir(spec):
2839 if _is_magic(entry):
2840 # MagicMock already does the useful magic methods for us
2841 continue
2843 # XXXX do we need a better way of getting attributes without
2844 # triggering code execution (?) Probably not - we need the actual
2845 # object to mock it so we would rather trigger a property than mock
2846 # the property descriptor. Likewise we want to mock out dynamically
2847 # provided attributes.
2848 # XXXX what about attributes that raise exceptions other than
2849 # AttributeError on being fetched?
2850 # we could be resilient against it, or catch and propagate the
2851 # exception when the attribute is fetched from the mock
2852 try:
2853 original = getattr(spec, entry)
2854 except AttributeError:
2855 continue
2857 kwargs = {'spec': original}
2858 if spec_set:
2859 kwargs = {'spec_set': original}
2861 if not isinstance(original, FunctionTypes):
2862 new = _SpecState(original, spec_set, mock, entry, instance)
2863 mock._mock_children[entry] = new
2864 else:
2865 parent = mock
2866 if isinstance(spec, FunctionTypes):
2867 parent = mock.mock
2869 skipfirst = _must_skip(spec, entry, is_type)
2870 kwargs['_eat_self'] = skipfirst
2871 if iscoroutinefunction(original):
2872 child_klass = AsyncMock
2873 else:
2874 child_klass = MagicMock
2875 new = child_klass(parent=parent, name=entry, _new_name=entry,
2876 _new_parent=parent,
2877 **kwargs)
2878 mock._mock_children[entry] = new
2879 new.return_value = child_klass()
2880 _check_signature(original, new, skipfirst=skipfirst)
2882 # so functions created with _set_signature become instance attributes,
2883 # *plus* their underlying mock exists in _mock_children of the parent
2884 # mock. Adding to _mock_children may be unnecessary where we are also
2885 # setting as an instance attribute?
2886 if isinstance(new, FunctionTypes):
2887 setattr(mock, entry, new)
2889 return mock
2892def _must_skip(spec, entry, is_type):
2893 """
2894 Return whether we should skip the first argument on spec's `entry`
2895 attribute.
2896 """
2897 if not isinstance(spec, type):
2898 if entry in getattr(spec, '__dict__', {}):
2899 # instance attribute - shouldn't skip
2900 return False
2901 spec = spec.__class__
2903 for klass in spec.__mro__:
2904 result = klass.__dict__.get(entry, DEFAULT)
2905 if result is DEFAULT:
2906 continue
2907 if isinstance(result, (staticmethod, classmethod)):
2908 return False
2909 elif isinstance(result, FunctionTypes):
2910 # Normal method => skip if looked up on type
2911 # (if looked up on instance, self is already skipped)
2912 return is_type
2913 else:
2914 return False
2916 # function is a dynamically provided attribute
2917 return is_type
2920class _SpecState(object):
2922 def __init__(self, spec, spec_set=False, parent=None,
2923 name=None, ids=None, instance=False):
2924 self.spec = spec
2925 self.ids = ids
2926 self.spec_set = spec_set
2927 self.parent = parent
2928 self.instance = instance
2929 self.name = name
2932FunctionTypes = (
2933 # python function
2934 type(create_autospec),
2935 # instance method
2936 type(ANY.__eq__),
2937)
2940file_spec = None
2941open_spec = None
2944def _to_stream(read_data):
2945 if isinstance(read_data, bytes):
2946 return io.BytesIO(read_data)
2947 else:
2948 return io.StringIO(read_data)
2951def mock_open(mock=None, read_data=''):
2952 """
2953 A helper function to create a mock to replace the use of `open`. It works
2954 for `open` called directly or used as a context manager.
2956 The `mock` argument is the mock object to configure. If `None` (the
2957 default) then a `MagicMock` will be created for you, with the API limited
2958 to methods or attributes available on standard file handles.
2960 `read_data` is a string for the `read`, `readline` and `readlines` of the
2961 file handle to return. This is an empty string by default.
2962 """
2963 _read_data = _to_stream(read_data)
2964 _state = [_read_data, None]
2966 def _readlines_side_effect(*args, **kwargs):
2967 if handle.readlines.return_value is not None:
2968 return handle.readlines.return_value
2969 return _state[0].readlines(*args, **kwargs)
2971 def _read_side_effect(*args, **kwargs):
2972 if handle.read.return_value is not None:
2973 return handle.read.return_value
2974 return _state[0].read(*args, **kwargs)
2976 def _readline_side_effect(*args, **kwargs):
2977 yield from _iter_side_effect()
2978 while True:
2979 yield _state[0].readline(*args, **kwargs)
2981 def _iter_side_effect():
2982 if handle.readline.return_value is not None:
2983 while True:
2984 yield handle.readline.return_value
2985 for line in _state[0]:
2986 yield line
2988 def _next_side_effect():
2989 if handle.readline.return_value is not None:
2990 return handle.readline.return_value
2991 return next(_state[0])
2993 def _exit_side_effect(exctype, excinst, exctb):
2994 handle.close()
2996 global file_spec
2997 if file_spec is None:
2998 import _io
2999 file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO))))
3001 global open_spec
3002 if open_spec is None:
3003 import _io
3004 open_spec = list(set(dir(_io.open)))
3005 if mock is None:
3006 mock = MagicMock(name='open', spec=open_spec)
3008 handle = MagicMock(spec=file_spec)
3009 handle.__enter__.return_value = handle
3011 handle.write.return_value = None
3012 handle.read.return_value = None
3013 handle.readline.return_value = None
3014 handle.readlines.return_value = None
3016 handle.read.side_effect = _read_side_effect
3017 _state[1] = _readline_side_effect()
3018 handle.readline.side_effect = _state[1]
3019 handle.readlines.side_effect = _readlines_side_effect
3020 handle.__iter__.side_effect = _iter_side_effect
3021 handle.__next__.side_effect = _next_side_effect
3022 handle.__exit__.side_effect = _exit_side_effect
3024 def reset_data(*args, **kwargs):
3025 _state[0] = _to_stream(read_data)
3026 if handle.readline.side_effect == _state[1]:
3027 # Only reset the side effect if the user hasn't overridden it.
3028 _state[1] = _readline_side_effect()
3029 handle.readline.side_effect = _state[1]
3030 return DEFAULT
3032 mock.side_effect = reset_data
3033 mock.return_value = handle
3034 return mock
3037class PropertyMock(Mock):
3038 """
3039 A mock intended to be used as a property, or other descriptor, on a class.
3040 `PropertyMock` provides `__get__` and `__set__` methods so you can specify
3041 a return value when it is fetched.
3043 Fetching a `PropertyMock` instance from an object calls the mock, with
3044 no args. Setting it calls the mock with the value being set.
3045 """
3046 def _get_child_mock(self, **kwargs):
3047 return MagicMock(**kwargs)
3049 def __get__(self, obj, obj_type=None):
3050 return self()
3051 def __set__(self, obj, val):
3052 self(val)
3055_timeout_unset = sentinel.TIMEOUT_UNSET
3057class ThreadingMixin(Base):
3059 DEFAULT_TIMEOUT = None
3061 def _get_child_mock(self, **kw):
3062 if isinstance(kw.get("parent"), ThreadingMixin):
3063 kw["timeout"] = kw["parent"]._mock_wait_timeout
3064 elif isinstance(kw.get("_new_parent"), ThreadingMixin):
3065 kw["timeout"] = kw["_new_parent"]._mock_wait_timeout
3066 return super()._get_child_mock(**kw)
3068 def __init__(self, *args, timeout=_timeout_unset, **kwargs):
3069 super().__init__(*args, **kwargs)
3070 if timeout is _timeout_unset:
3071 timeout = self.DEFAULT_TIMEOUT
3072 self.__dict__["_mock_event"] = threading.Event() # Event for any call
3073 self.__dict__["_mock_calls_events"] = [] # Events for each of the calls
3074 self.__dict__["_mock_calls_events_lock"] = threading.Lock()
3075 self.__dict__["_mock_wait_timeout"] = timeout
3077 def reset_mock(self, *args, **kwargs):
3078 """
3079 See :func:`.Mock.reset_mock()`
3080 """
3081 super().reset_mock(*args, **kwargs)
3082 self.__dict__["_mock_event"] = threading.Event()
3083 self.__dict__["_mock_calls_events"] = []
3085 def __get_event(self, expected_args, expected_kwargs):
3086 with self._mock_calls_events_lock:
3087 for args, kwargs, event in self._mock_calls_events:
3088 if (args, kwargs) == (expected_args, expected_kwargs):
3089 return event
3090 new_event = threading.Event()
3091 self._mock_calls_events.append((expected_args, expected_kwargs, new_event))
3092 return new_event
3094 def _mock_call(self, *args, **kwargs):
3095 ret_value = super()._mock_call(*args, **kwargs)
3097 call_event = self.__get_event(args, kwargs)
3098 call_event.set()
3100 self._mock_event.set()
3102 return ret_value
3104 def wait_until_called(self, *, timeout=_timeout_unset):
3105 """Wait until the mock object is called.
3107 `timeout` - time to wait for in seconds, waits forever otherwise.
3108 Defaults to the constructor provided timeout.
3109 Use None to block undefinetively.
3110 """
3111 if timeout is _timeout_unset:
3112 timeout = self._mock_wait_timeout
3113 if not self._mock_event.wait(timeout=timeout):
3114 msg = (f"{self._mock_name or 'mock'} was not called before"
3115 f" timeout({timeout}).")
3116 raise AssertionError(msg)
3118 def wait_until_any_call_with(self, *args, **kwargs):
3119 """Wait until the mock object is called with given args.
3121 Waits for the timeout in seconds provided in the constructor.
3122 """
3123 event = self.__get_event(args, kwargs)
3124 if not event.wait(timeout=self._mock_wait_timeout):
3125 expected_string = self._format_mock_call_signature(args, kwargs)
3126 raise AssertionError(f'{expected_string} call not found')
3129class ThreadingMock(ThreadingMixin, MagicMixin, Mock):
3130 """
3131 A mock that can be used to wait until on calls happening
3132 in a different thread.
3134 The constructor can take a `timeout` argument which
3135 controls the timeout in seconds for all `wait` calls of the mock.
3137 You can change the default timeout of all instances via the
3138 `ThreadingMock.DEFAULT_TIMEOUT` attribute.
3140 If no timeout is set, it will block undefinetively.
3141 """
3142 pass
3145def seal(mock):
3146 """Disable the automatic generation of child mocks.
3148 Given an input Mock, seals it to ensure no further mocks will be generated
3149 when accessing an attribute that was not already defined.
3151 The operation recursively seals the mock passed in, meaning that
3152 the mock itself, any mocks generated by accessing one of its attributes,
3153 and all assigned mocks without a name or spec will be sealed.
3154 """
3155 mock._mock_sealed = True
3156 for attr in dir(mock):
3157 try:
3158 m = getattr(mock, attr)
3159 except AttributeError:
3160 continue
3161 if not isinstance(m, NonCallableMock):
3162 continue
3163 if isinstance(m._mock_children.get(attr), _SpecState):
3164 continue
3165 if m._mock_new_parent is mock:
3166 seal(m)
3169class _AsyncIterator:
3170 """
3171 Wraps an iterator in an asynchronous iterator.
3172 """
3173 def __init__(self, iterator):
3174 self.iterator = iterator
3175 code_mock = NonCallableMock(spec_set=CodeType)
3176 code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE
3177 self.__dict__['__code__'] = code_mock
3179 async def __anext__(self):
3180 try:
3181 return next(self.iterator)
3182 except StopIteration:
3183 pass
3184 raise StopAsyncIteration