Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/mock/mock.py: 29%
1626 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-06 06:03 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-06 06:03 +0000
1# mock.py
2# Test tools for mocking and patching.
3# Maintained by Michael Foord
4# Backport for other versions of Python available from
5# https://pypi.org/project/mock
7__all__ = (
8 'Mock',
9 'MagicMock',
10 'patch',
11 'sentinel',
12 'DEFAULT',
13 'ANY',
14 'call',
15 'create_autospec',
16 'AsyncMock',
17 'FILTER_DIR',
18 'NonCallableMock',
19 'NonCallableMagicMock',
20 'mock_open',
21 'PropertyMock',
22 'seal',
23)
26import asyncio
27import contextlib
28import io
29import inspect
30import pprint
31import sys
32import builtins
33from asyncio import iscoroutinefunction
34from types import CodeType, ModuleType, MethodType
35from unittest.util import safe_repr
36from functools import wraps, partial
37from threading import RLock
40from mock import IS_PYPY
41from .backports import iscoroutinefunction
44class InvalidSpecError(Exception):
45 """Indicates that an invalid value was used as a mock spec."""
48_builtins = {name for name in dir(builtins) if not name.startswith('_')}
50FILTER_DIR = True
52# Workaround for issue #12370
53# Without this, the __class__ properties wouldn't be set correctly
54_safe_super = super
56def _is_async_obj(obj):
57 if _is_instance_mock(obj) and not isinstance(obj, AsyncMock):
58 return False
59 if hasattr(obj, '__func__'):
60 obj = getattr(obj, '__func__')
61 return iscoroutinefunction(obj) or inspect.isawaitable(obj)
64def _is_async_func(func):
65 if getattr(func, '__code__', None):
66 return iscoroutinefunction(func)
67 else:
68 return False
71def _is_instance_mock(obj):
72 # can't use isinstance on Mock objects because they override __class__
73 # The base class for all mocks is NonCallableMock
74 return issubclass(type(obj), NonCallableMock)
77def _is_exception(obj):
78 return (
79 isinstance(obj, BaseException) or
80 isinstance(obj, type) and issubclass(obj, BaseException)
81 )
84def _extract_mock(obj):
85 # Autospecced functions will return a FunctionType with "mock" attribute
86 # which is the actual mock object that needs to be used.
87 if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'):
88 return obj.mock
89 else:
90 return obj
93def _get_signature_object(func, as_instance, eat_self):
94 """
95 Given an arbitrary, possibly callable object, try to create a suitable
96 signature object.
97 Return a (reduced func, signature) tuple, or None.
98 """
99 if isinstance(func, type) and not as_instance:
100 # If it's a type and should be modelled as a type, use __init__.
101 func = func.__init__
102 # Skip the `self` argument in __init__
103 eat_self = True
104 elif isinstance(func, (classmethod, staticmethod)):
105 if isinstance(func, classmethod):
106 # Skip the `cls` argument of a class method
107 eat_self = True
108 # Use the original decorated method to extract the correct function signature
109 func = func.__func__
110 elif not isinstance(func, FunctionTypes):
111 # If we really want to model an instance of the passed type,
112 # __call__ should be looked up, not __init__.
113 try:
114 func = func.__call__
115 except AttributeError:
116 return None
117 if eat_self:
118 sig_func = partial(func, None)
119 else:
120 sig_func = func
121 try:
122 return func, inspect.signature(sig_func)
123 except ValueError:
124 # Certain callable types are not supported by inspect.signature()
125 return None
128def _check_signature(func, mock, skipfirst, instance=False):
129 sig = _get_signature_object(func, instance, skipfirst)
130 if sig is None:
131 return
132 func, sig = sig
133 def checksig(_mock_self, *args, **kwargs):
134 sig.bind(*args, **kwargs)
135 _copy_func_details(func, checksig)
136 type(mock)._mock_check_sig = checksig
137 type(mock).__signature__ = sig
140def _copy_func_details(func, funcopy):
141 # we explicitly don't copy func.__dict__ into this copy as it would
142 # expose original attributes that should be mocked
143 for attribute in (
144 '__name__', '__doc__', '__text_signature__',
145 '__module__', '__defaults__', '__kwdefaults__',
146 ):
147 try:
148 setattr(funcopy, attribute, getattr(func, attribute))
149 except AttributeError:
150 pass
153def _callable(obj):
154 if isinstance(obj, type):
155 return True
156 if isinstance(obj, (staticmethod, classmethod, MethodType)):
157 return _callable(obj.__func__)
158 if getattr(obj, '__call__', None) is not None:
159 return True
160 return False
163def _is_list(obj):
164 # checks for list or tuples
165 # XXXX badly named!
166 return type(obj) in (list, tuple)
169def _instance_callable(obj):
170 """Given an object, return True if the object is callable.
171 For classes, return True if instances would be callable."""
172 if not isinstance(obj, type):
173 # already an instance
174 return getattr(obj, '__call__', None) is not None
176 # *could* be broken by a class overriding __mro__ or __dict__ via
177 # a metaclass
178 for base in (obj,) + obj.__mro__:
179 if base.__dict__.get('__call__') is not None:
180 return True
181 return False
184def _set_signature(mock, original, instance=False):
185 # creates a function with signature (*args, **kwargs) that delegates to a
186 # mock. It still does signature checking by calling a lambda with the same
187 # signature as the original.
189 skipfirst = isinstance(original, type)
190 result = _get_signature_object(original, instance, skipfirst)
191 if result is None:
192 return mock
193 func, sig = result
194 def checksig(*args, **kwargs):
195 sig.bind(*args, **kwargs)
196 _copy_func_details(func, checksig)
198 name = original.__name__
199 if not name.isidentifier():
200 name = 'funcopy'
201 context = {'_checksig_': checksig, 'mock': mock}
202 src = """def %s(*args, **kwargs):
203 _checksig_(*args, **kwargs)
204 return mock(*args, **kwargs)""" % name
205 exec (src, context)
206 funcopy = context[name]
207 _setup_func(funcopy, mock, sig)
208 return funcopy
211def _setup_func(funcopy, mock, sig):
212 funcopy.mock = mock
214 def assert_called_with(*args, **kwargs):
215 return mock.assert_called_with(*args, **kwargs)
216 def assert_called(*args, **kwargs):
217 return mock.assert_called(*args, **kwargs)
218 def assert_not_called(*args, **kwargs):
219 return mock.assert_not_called(*args, **kwargs)
220 def assert_called_once(*args, **kwargs):
221 return mock.assert_called_once(*args, **kwargs)
222 def assert_called_once_with(*args, **kwargs):
223 return mock.assert_called_once_with(*args, **kwargs)
224 def assert_has_calls(*args, **kwargs):
225 return mock.assert_has_calls(*args, **kwargs)
226 def assert_any_call(*args, **kwargs):
227 return mock.assert_any_call(*args, **kwargs)
228 def reset_mock():
229 funcopy.method_calls = _CallList()
230 funcopy.mock_calls = _CallList()
231 mock.reset_mock()
232 ret = funcopy.return_value
233 if _is_instance_mock(ret) and not ret is mock:
234 ret.reset_mock()
236 funcopy.called = False
237 funcopy.call_count = 0
238 funcopy.call_args = None
239 funcopy.call_args_list = _CallList()
240 funcopy.method_calls = _CallList()
241 funcopy.mock_calls = _CallList()
243 funcopy.return_value = mock.return_value
244 funcopy.side_effect = mock.side_effect
245 funcopy._mock_children = mock._mock_children
247 funcopy.assert_called_with = assert_called_with
248 funcopy.assert_called_once_with = assert_called_once_with
249 funcopy.assert_has_calls = assert_has_calls
250 funcopy.assert_any_call = assert_any_call
251 funcopy.reset_mock = reset_mock
252 funcopy.assert_called = assert_called
253 funcopy.assert_not_called = assert_not_called
254 funcopy.assert_called_once = assert_called_once
255 funcopy.__signature__ = sig
257 mock._mock_delegate = funcopy
260def _setup_async_mock(mock):
261 mock._is_coroutine = asyncio.coroutines._is_coroutine
262 mock.await_count = 0
263 mock.await_args = None
264 mock.await_args_list = _CallList()
266 # Mock is not configured yet so the attributes are set
267 # to a function and then the corresponding mock helper function
268 # is called when the helper is accessed similar to _setup_func.
269 def wrapper(attr, *args, **kwargs):
270 return getattr(mock.mock, attr)(*args, **kwargs)
272 for attribute in ('assert_awaited',
273 'assert_awaited_once',
274 'assert_awaited_with',
275 'assert_awaited_once_with',
276 'assert_any_await',
277 'assert_has_awaits',
278 'assert_not_awaited'):
280 # setattr(mock, attribute, wrapper) causes late binding
281 # hence attribute will always be the last value in the loop
282 # Use partial(wrapper, attribute) to ensure the attribute is bound
283 # correctly.
284 setattr(mock, attribute, partial(wrapper, attribute))
287def _is_magic(name):
288 return '__%s__' % name[2:-2] == name
291class _SentinelObject(object):
292 "A unique, named, sentinel object."
293 def __init__(self, name):
294 self.name = name
296 def __repr__(self):
297 return 'sentinel.%s' % self.name
299 def __reduce__(self):
300 return 'sentinel.%s' % self.name
303class _Sentinel(object):
304 """Access attributes to return a named object, usable as a sentinel."""
305 def __init__(self):
306 self._sentinels = {}
308 def __getattr__(self, name):
309 if name == '__bases__':
310 # Without this help(unittest.mock) raises an exception
311 raise AttributeError
312 return self._sentinels.setdefault(name, _SentinelObject(name))
314 def __reduce__(self):
315 return 'sentinel'
318sentinel = _Sentinel()
320DEFAULT = sentinel.DEFAULT
321_missing = sentinel.MISSING
322_deleted = sentinel.DELETED
325_allowed_names = {
326 'return_value', '_mock_return_value', 'side_effect',
327 '_mock_side_effect', '_mock_parent', '_mock_new_parent',
328 '_mock_name', '_mock_new_name'
329}
332def _delegating_property(name):
333 _allowed_names.add(name)
334 _the_name = '_mock_' + name
335 def _get(self, name=name, _the_name=_the_name):
336 sig = self._mock_delegate
337 if sig is None:
338 return getattr(self, _the_name)
339 return getattr(sig, name)
340 def _set(self, value, name=name, _the_name=_the_name):
341 sig = self._mock_delegate
342 if sig is None:
343 self.__dict__[_the_name] = value
344 else:
345 setattr(sig, name, value)
347 return property(_get, _set)
351class _CallList(list):
353 def __contains__(self, value):
354 if not isinstance(value, list):
355 return list.__contains__(self, value)
356 len_value = len(value)
357 len_self = len(self)
358 if len_value > len_self:
359 return False
361 for i in range(0, len_self - len_value + 1):
362 sub_list = self[i:i+len_value]
363 if sub_list == value:
364 return True
365 return False
367 def __repr__(self):
368 return pprint.pformat(list(self))
371def _check_and_set_parent(parent, value, name, new_name):
372 value = _extract_mock(value)
374 if not _is_instance_mock(value):
375 return False
376 if ((value._mock_name or value._mock_new_name) or
377 (value._mock_parent is not None) or
378 (value._mock_new_parent is not None)):
379 return False
381 _parent = parent
382 while _parent is not None:
383 # setting a mock (value) as a child or return value of itself
384 # should not modify the mock
385 if _parent is value:
386 return False
387 _parent = _parent._mock_new_parent
389 if new_name:
390 value._mock_new_parent = parent
391 value._mock_new_name = new_name
392 if name:
393 value._mock_parent = parent
394 value._mock_name = name
395 return True
397# Internal class to identify if we wrapped an iterator object or not.
398class _MockIter(object):
399 def __init__(self, obj):
400 self.obj = iter(obj)
401 def __next__(self):
402 return next(self.obj)
404class Base(object):
405 _mock_return_value = DEFAULT
406 _mock_side_effect = None
407 def __init__(self, *args, **kwargs):
408 pass
412class NonCallableMock(Base):
413 """A non-callable version of `Mock`"""
415 # Store a mutex as a class attribute in order to protect concurrent access
416 # to mock attributes. Using a class attribute allows all NonCallableMock
417 # instances to share the mutex for simplicity.
418 #
419 # See https://github.com/python/cpython/issues/98624 for why this is
420 # necessary.
421 _lock = RLock()
423 def __new__(
424 cls, spec=None, wraps=None, name=None, spec_set=None,
425 parent=None, _spec_state=None, _new_name='', _new_parent=None,
426 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs
427 ):
428 # every instance has its own class
429 # so we can create magic methods on the
430 # class without stomping on other mocks
431 bases = (cls,)
432 if not issubclass(cls, AsyncMockMixin):
433 # Check if spec is an async object or function
434 spec_arg = spec_set or spec
435 if spec_arg is not None and _is_async_obj(spec_arg):
436 bases = (AsyncMockMixin, cls)
437 new = type(cls.__name__, bases, {'__doc__': cls.__doc__})
438 instance = _safe_super(NonCallableMock, cls).__new__(new)
439 return instance
442 def __init__(
443 self, spec=None, wraps=None, name=None, spec_set=None,
444 parent=None, _spec_state=None, _new_name='', _new_parent=None,
445 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs
446 ):
447 if _new_parent is None:
448 _new_parent = parent
450 __dict__ = self.__dict__
451 __dict__['_mock_parent'] = parent
452 __dict__['_mock_name'] = name
453 __dict__['_mock_new_name'] = _new_name
454 __dict__['_mock_new_parent'] = _new_parent
455 __dict__['_mock_sealed'] = False
457 if spec_set is not None:
458 spec = spec_set
459 spec_set = True
460 if _eat_self is None:
461 _eat_self = parent is not None
463 self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self)
465 __dict__['_mock_children'] = {}
466 __dict__['_mock_wraps'] = wraps
467 __dict__['_mock_delegate'] = None
469 __dict__['_mock_called'] = False
470 __dict__['_mock_call_args'] = None
471 __dict__['_mock_call_count'] = 0
472 __dict__['_mock_call_args_list'] = _CallList()
473 __dict__['_mock_mock_calls'] = _CallList()
475 __dict__['method_calls'] = _CallList()
476 __dict__['_mock_unsafe'] = unsafe
478 if kwargs:
479 self.configure_mock(**kwargs)
481 _safe_super(NonCallableMock, self).__init__(
482 spec, wraps, name, spec_set, parent,
483 _spec_state
484 )
487 def attach_mock(self, mock, attribute):
488 """
489 Attach a mock as an attribute of this one, replacing its name and
490 parent. Calls to the attached mock will be recorded in the
491 `method_calls` and `mock_calls` attributes of this one."""
492 inner_mock = _extract_mock(mock)
494 inner_mock._mock_parent = None
495 inner_mock._mock_new_parent = None
496 inner_mock._mock_name = ''
497 inner_mock._mock_new_name = None
499 setattr(self, attribute, mock)
502 def mock_add_spec(self, spec, spec_set=False):
503 """Add a spec to a mock. `spec` can either be an object or a
504 list of strings. Only attributes on the `spec` can be fetched as
505 attributes from the mock.
507 If `spec_set` is True then only attributes on the spec can be set."""
508 self._mock_add_spec(spec, spec_set)
511 def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False,
512 _eat_self=False):
513 if _is_instance_mock(spec):
514 raise InvalidSpecError(f'Cannot spec a Mock object. [object={spec!r}]')
516 _spec_class = None
517 _spec_signature = None
518 _spec_asyncs = []
520 if spec is not None and not _is_list(spec):
521 if isinstance(spec, type):
522 _spec_class = spec
523 else:
524 _spec_class = type(spec)
525 res = _get_signature_object(spec,
526 _spec_as_instance, _eat_self)
527 _spec_signature = res and res[1]
529 spec_list = dir(spec)
531 for attr in spec_list:
532 if iscoroutinefunction(getattr(spec, attr, None)):
533 _spec_asyncs.append(attr)
535 spec = spec_list
537 __dict__ = self.__dict__
538 __dict__['_spec_class'] = _spec_class
539 __dict__['_spec_set'] = spec_set
540 __dict__['_spec_signature'] = _spec_signature
541 __dict__['_mock_methods'] = spec
542 __dict__['_spec_asyncs'] = _spec_asyncs
544 def __get_return_value(self):
545 ret = self._mock_return_value
546 if self._mock_delegate is not None:
547 ret = self._mock_delegate.return_value
549 if ret is DEFAULT:
550 ret = self._get_child_mock(
551 _new_parent=self, _new_name='()'
552 )
553 self.return_value = ret
554 return ret
557 def __set_return_value(self, value):
558 if self._mock_delegate is not None:
559 self._mock_delegate.return_value = value
560 else:
561 self._mock_return_value = value
562 _check_and_set_parent(self, value, None, '()')
564 __return_value_doc = "The value to be returned when the mock is called."
565 return_value = property(__get_return_value, __set_return_value,
566 __return_value_doc)
569 @property
570 def __class__(self):
571 if self._spec_class is None:
572 return type(self)
573 return self._spec_class
575 called = _delegating_property('called')
576 call_count = _delegating_property('call_count')
577 call_args = _delegating_property('call_args')
578 call_args_list = _delegating_property('call_args_list')
579 mock_calls = _delegating_property('mock_calls')
582 def __get_side_effect(self):
583 delegated = self._mock_delegate
584 if delegated is None:
585 return self._mock_side_effect
586 sf = delegated.side_effect
587 if (sf is not None and not callable(sf)
588 and not isinstance(sf, _MockIter) and not _is_exception(sf)):
589 sf = _MockIter(sf)
590 delegated.side_effect = sf
591 return sf
593 def __set_side_effect(self, value):
594 value = _try_iter(value)
595 delegated = self._mock_delegate
596 if delegated is None:
597 self._mock_side_effect = value
598 else:
599 delegated.side_effect = value
601 side_effect = property(__get_side_effect, __set_side_effect)
604 def reset_mock(self, visited=None,*, return_value=False, side_effect=False):
605 "Restore the mock object to its initial state."
606 if visited is None:
607 visited = []
608 if id(self) in visited:
609 return
610 visited.append(id(self))
612 self.called = False
613 self.call_args = None
614 self.call_count = 0
615 self.mock_calls = _CallList()
616 self.call_args_list = _CallList()
617 self.method_calls = _CallList()
619 if return_value:
620 self._mock_return_value = DEFAULT
621 if side_effect:
622 self._mock_side_effect = None
624 for child in self._mock_children.values():
625 if isinstance(child, _SpecState) or child is _deleted:
626 continue
627 child.reset_mock(visited, return_value=return_value, side_effect=side_effect)
629 ret = self._mock_return_value
630 if _is_instance_mock(ret) and ret is not self:
631 ret.reset_mock(visited)
634 def configure_mock(self, **kwargs):
635 """Set attributes on the mock through keyword arguments.
637 Attributes plus return values and side effects can be set on child
638 mocks using standard dot notation and unpacking a dictionary in the
639 method call:
641 >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError}
642 >>> mock.configure_mock(**attrs)"""
643 for arg, val in sorted(kwargs.items(),
644 # we sort on the number of dots so that
645 # attributes are set before we set attributes on
646 # attributes
647 key=lambda entry: entry[0].count('.')):
648 args = arg.split('.')
649 final = args.pop()
650 obj = self
651 for entry in args:
652 obj = getattr(obj, entry)
653 setattr(obj, final, val)
656 def __getattr__(self, name):
657 if name in {'_mock_methods', '_mock_unsafe'}:
658 raise AttributeError(name)
659 elif self._mock_methods is not None:
660 if name not in self._mock_methods or name in _all_magics:
661 raise AttributeError("Mock object has no attribute %r" % name)
662 elif _is_magic(name):
663 raise AttributeError(name)
664 if not self._mock_unsafe and (not self._mock_methods or name not in self._mock_methods):
665 if name.startswith(('assert', 'assret', 'asert', 'aseert', 'assrt')) or name in _ATTRIB_DENY_LIST:
666 raise AttributeError(
667 f"{name!r} is not a valid assertion. Use a spec "
668 f"for the mock if {name!r} is meant to be an attribute.")
670 with NonCallableMock._lock:
671 result = self._mock_children.get(name)
672 if result is _deleted:
673 raise AttributeError(name)
674 elif result is None:
675 wraps = None
676 if self._mock_wraps is not None:
677 # XXXX should we get the attribute without triggering code
678 # execution?
679 wraps = getattr(self._mock_wraps, name)
681 result = self._get_child_mock(
682 parent=self, name=name, wraps=wraps, _new_name=name,
683 _new_parent=self
684 )
685 self._mock_children[name] = result
687 elif isinstance(result, _SpecState):
688 try:
689 result = create_autospec(
690 result.spec, result.spec_set, result.instance,
691 result.parent, result.name
692 )
693 except InvalidSpecError:
694 target_name = self.__dict__['_mock_name'] or self
695 raise InvalidSpecError(
696 f'Cannot autospec attr {name!r} from target '
697 f'{target_name!r} as it has already been mocked out. '
698 f'[target={self!r}, attr={result.spec!r}]')
699 self._mock_children[name] = result
701 return result
704 def _extract_mock_name(self):
705 _name_list = [self._mock_new_name]
706 _parent = self._mock_new_parent
707 last = self
709 dot = '.'
710 if _name_list == ['()']:
711 dot = ''
713 while _parent is not None:
714 last = _parent
716 _name_list.append(_parent._mock_new_name + dot)
717 dot = '.'
718 if _parent._mock_new_name == '()':
719 dot = ''
721 _parent = _parent._mock_new_parent
723 _name_list = list(reversed(_name_list))
724 _first = last._mock_name or 'mock'
725 if len(_name_list) > 1:
726 if _name_list[1] not in ('()', '().'):
727 _first += '.'
728 _name_list[0] = _first
729 return ''.join(_name_list)
731 def __repr__(self):
732 name = self._extract_mock_name()
734 name_string = ''
735 if name not in ('mock', 'mock.'):
736 name_string = ' name=%r' % name
738 spec_string = ''
739 if self._spec_class is not None:
740 spec_string = ' spec=%r'
741 if self._spec_set:
742 spec_string = ' spec_set=%r'
743 spec_string = spec_string % self._spec_class.__name__
744 return "<%s%s%s id='%s'>" % (
745 type(self).__name__,
746 name_string,
747 spec_string,
748 id(self)
749 )
752 def __dir__(self):
753 """Filter the output of `dir(mock)` to only useful members."""
754 if not FILTER_DIR:
755 return object.__dir__(self)
757 extras = self._mock_methods or []
758 from_type = dir(type(self))
759 from_dict = list(self.__dict__)
760 from_child_mocks = [
761 m_name for m_name, m_value in self._mock_children.items()
762 if m_value is not _deleted]
764 from_type = [e for e in from_type if not e.startswith('_')]
765 from_dict = [e for e in from_dict if not e.startswith('_') or
766 _is_magic(e)]
767 return sorted(set(extras + from_type + from_dict + from_child_mocks))
770 def __setattr__(self, name, value):
771 if name in _allowed_names:
772 # property setters go through here
773 return object.__setattr__(self, name, value)
774 elif (self._spec_set and self._mock_methods is not None and
775 name not in self._mock_methods and
776 name not in self.__dict__):
777 raise AttributeError("Mock object has no attribute '%s'" % name)
778 elif name in _unsupported_magics:
779 msg = 'Attempting to set unsupported magic method %r.' % name
780 raise AttributeError(msg)
781 elif name in _all_magics:
782 if self._mock_methods is not None and name not in self._mock_methods:
783 raise AttributeError("Mock object has no attribute '%s'" % name)
785 if not _is_instance_mock(value):
786 setattr(type(self), name, _get_method(name, value))
787 original = value
788 value = lambda *args, **kw: original(self, *args, **kw)
789 else:
790 # only set _new_name and not name so that mock_calls is tracked
791 # but not method calls
792 _check_and_set_parent(self, value, None, name)
793 setattr(type(self), name, value)
794 self._mock_children[name] = value
795 elif name == '__class__':
796 self._spec_class = value
797 return
798 else:
799 if _check_and_set_parent(self, value, name, name):
800 self._mock_children[name] = value
802 if self._mock_sealed and not hasattr(self, name):
803 mock_name = f'{self._extract_mock_name()}.{name}'
804 raise AttributeError(f'Cannot set {mock_name}')
806 return object.__setattr__(self, name, value)
809 def __delattr__(self, name):
810 if name in _all_magics and name in type(self).__dict__:
811 delattr(type(self), name)
812 if name not in self.__dict__:
813 # for magic methods that are still MagicProxy objects and
814 # not set on the instance itself
815 return
817 obj = self._mock_children.get(name, _missing)
818 if name in self.__dict__:
819 _safe_super(NonCallableMock, self).__delattr__(name)
820 elif obj is _deleted:
821 raise AttributeError(name)
822 if obj is not _missing:
823 del self._mock_children[name]
824 self._mock_children[name] = _deleted
827 def _format_mock_call_signature(self, args, kwargs):
828 name = self._mock_name or 'mock'
829 return _format_call_signature(name, args, kwargs)
832 def _format_mock_failure_message(self, args, kwargs, action='call'):
833 message = 'expected %s not found.\nExpected: %s\nActual: %s'
834 expected_string = self._format_mock_call_signature(args, kwargs)
835 call_args = self.call_args
836 actual_string = self._format_mock_call_signature(*call_args)
837 return message % (action, expected_string, actual_string)
840 def _get_call_signature_from_name(self, name):
841 """
842 * If call objects are asserted against a method/function like obj.meth1
843 then there could be no name for the call object to lookup. Hence just
844 return the spec_signature of the method/function being asserted against.
845 * If the name is not empty then remove () and split by '.' to get
846 list of names to iterate through the children until a potential
847 match is found. A child mock is created only during attribute access
848 so if we get a _SpecState then no attributes of the spec were accessed
849 and can be safely exited.
850 """
851 if not name:
852 return self._spec_signature
854 sig = None
855 names = name.replace('()', '').split('.')
856 children = self._mock_children
858 for name in names:
859 child = children.get(name)
860 if child is None or isinstance(child, _SpecState):
861 break
862 else:
863 # If an autospecced object is attached using attach_mock the
864 # child would be a function with mock object as attribute from
865 # which signature has to be derived.
866 child = _extract_mock(child)
867 children = child._mock_children
868 sig = child._spec_signature
870 return sig
873 def _call_matcher(self, _call):
874 """
875 Given a call (or simply an (args, kwargs) tuple), return a
876 comparison key suitable for matching with other calls.
877 This is a best effort method which relies on the spec's signature,
878 if available, or falls back on the arguments themselves.
879 """
881 if isinstance(_call, tuple) and len(_call) > 2:
882 sig = self._get_call_signature_from_name(_call[0])
883 else:
884 sig = self._spec_signature
886 if sig is not None:
887 if len(_call) == 2:
888 name = ''
889 args, kwargs = _call
890 else:
891 name, args, kwargs = _call
892 try:
893 bound_call = sig.bind(*args, **kwargs)
894 return call(name, bound_call.args, bound_call.kwargs)
895 except TypeError as e:
896 return e.with_traceback(None)
897 else:
898 return _call
900 def assert_not_called(_mock_self):
901 """assert that the mock was never called.
902 """
903 self = _mock_self
904 if self.call_count != 0:
905 msg = ("Expected '%s' to not have been called. Called %s times.%s"
906 % (self._mock_name or 'mock',
907 self.call_count,
908 self._calls_repr()))
909 raise AssertionError(msg)
911 def assert_called(_mock_self):
912 """assert that the mock was called at least once
913 """
914 self = _mock_self
915 if self.call_count == 0:
916 msg = ("Expected '%s' to have been called." %
917 (self._mock_name or 'mock'))
918 raise AssertionError(msg)
920 def assert_called_once(_mock_self):
921 """assert that the mock was called only once.
922 """
923 self = _mock_self
924 if not self.call_count == 1:
925 msg = ("Expected '%s' to have been called once. Called %s times.%s"
926 % (self._mock_name or 'mock',
927 self.call_count,
928 self._calls_repr()))
929 raise AssertionError(msg)
931 def assert_called_with(_mock_self, *args, **kwargs):
932 """assert that the last call was made with the specified arguments.
934 Raises an AssertionError if the args and keyword args passed in are
935 different to the last call to the mock."""
936 self = _mock_self
937 if self.call_args is None:
938 expected = self._format_mock_call_signature(args, kwargs)
939 actual = 'not called.'
940 error_message = ('expected call not found.\nExpected: %s\nActual: %s'
941 % (expected, actual))
942 raise AssertionError(error_message)
944 def _error_message():
945 msg = self._format_mock_failure_message(args, kwargs)
946 return msg
947 expected = self._call_matcher(_Call((args, kwargs), two=True))
948 actual = self._call_matcher(self.call_args)
949 if actual != expected:
950 cause = expected if isinstance(expected, Exception) else None
951 raise AssertionError(_error_message()) from cause
954 def assert_called_once_with(_mock_self, *args, **kwargs):
955 """assert that the mock was called exactly once and that that call was
956 with the specified arguments."""
957 self = _mock_self
958 if not self.call_count == 1:
959 msg = ("Expected '%s' to be called once. Called %s times.%s"
960 % (self._mock_name or 'mock',
961 self.call_count,
962 self._calls_repr()))
963 raise AssertionError(msg)
964 return self.assert_called_with(*args, **kwargs)
967 def assert_has_calls(self, calls, any_order=False):
968 """assert the mock has been called with the specified calls.
969 The `mock_calls` list is checked for the calls.
971 If `any_order` is False (the default) then the calls must be
972 sequential. There can be extra calls before or after the
973 specified calls.
975 If `any_order` is True then the calls can be in any order, but
976 they must all appear in `mock_calls`."""
977 expected = [self._call_matcher(c) for c in calls]
978 cause = next((e for e in expected if isinstance(e, Exception)), None)
979 all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls)
980 if not any_order:
981 if expected not in all_calls:
982 if cause is None:
983 problem = 'Calls not found.'
984 else:
985 problem = ('Error processing expected calls.\n'
986 'Errors: {}').format(
987 [e if isinstance(e, Exception) else None
988 for e in expected])
989 raise AssertionError(
990 f'{problem}\n'
991 f'Expected: {_CallList(calls)}'
992 f'{self._calls_repr(prefix="Actual").rstrip(".")}'
993 ) from cause
994 return
996 all_calls = list(all_calls)
998 not_found = []
999 for kall in expected:
1000 try:
1001 all_calls.remove(kall)
1002 except ValueError:
1003 not_found.append(kall)
1004 if not_found:
1005 raise AssertionError(
1006 '%r does not contain all of %r in its call list, '
1007 'found %r instead' % (self._mock_name or 'mock',
1008 tuple(not_found), all_calls)
1009 ) from cause
1012 def assert_any_call(self, *args, **kwargs):
1013 """assert the mock has been called with the specified arguments.
1015 The assert passes if the mock has *ever* been called, unlike
1016 `assert_called_with` and `assert_called_once_with` that only pass if
1017 the call is the most recent one."""
1018 expected = self._call_matcher(_Call((args, kwargs), two=True))
1019 cause = expected if isinstance(expected, Exception) else None
1020 actual = [self._call_matcher(c) for c in self.call_args_list]
1021 if cause or expected not in _AnyComparer(actual):
1022 expected_string = self._format_mock_call_signature(args, kwargs)
1023 raise AssertionError(
1024 '%s call not found' % expected_string
1025 ) from cause
1028 def _get_child_mock(self, **kw):
1029 """Create the child mocks for attributes and return value.
1030 By default child mocks will be the same type as the parent.
1031 Subclasses of Mock may want to override this to customize the way
1032 child mocks are made.
1034 For non-callable mocks the callable variant will be used (rather than
1035 any custom subclass)."""
1036 if self._mock_sealed:
1037 attribute = f".{kw['name']}" if "name" in kw else "()"
1038 mock_name = self._extract_mock_name() + attribute
1039 raise AttributeError(mock_name)
1041 _new_name = kw.get("_new_name")
1042 if _new_name in self.__dict__['_spec_asyncs']:
1043 return AsyncMock(**kw)
1045 _type = type(self)
1046 if issubclass(_type, MagicMock) and _new_name in _async_method_magics:
1047 # Any asynchronous magic becomes an AsyncMock
1048 klass = AsyncMock
1049 elif issubclass(_type, AsyncMockMixin):
1050 if (_new_name in _all_sync_magics or
1051 self._mock_methods and _new_name in self._mock_methods):
1052 # Any synchronous method on AsyncMock becomes a MagicMock
1053 klass = MagicMock
1054 else:
1055 klass = AsyncMock
1056 elif not issubclass(_type, CallableMixin):
1057 if issubclass(_type, NonCallableMagicMock):
1058 klass = MagicMock
1059 elif issubclass(_type, NonCallableMock):
1060 klass = Mock
1061 else:
1062 klass = _type.__mro__[1]
1063 return klass(**kw)
1066 def _calls_repr(self, prefix="Calls"):
1067 """Renders self.mock_calls as a string.
1069 Example: "\nCalls: [call(1), call(2)]."
1071 If self.mock_calls is empty, an empty string is returned. The
1072 output will be truncated if very long.
1073 """
1074 if not self.mock_calls:
1075 return ""
1076 return f"\n{prefix}: {safe_repr(self.mock_calls)}."
1079try:
1080 removeprefix = str.removeprefix
1081except AttributeError:
1082 # Py 3.8 and earlier:
1083 def removeprefix(name, prefix):
1084 return name[len(prefix):]
1086# Denylist for forbidden attribute names in safe mode
1087_ATTRIB_DENY_LIST = frozenset({
1088 removeprefix(name, "assert_")
1089 for name in dir(NonCallableMock)
1090 if name.startswith("assert_")
1091})
1094class _AnyComparer(list):
1095 """A list which checks if it contains a call which may have an
1096 argument of ANY, flipping the components of item and self from
1097 their traditional locations so that ANY is guaranteed to be on
1098 the left."""
1099 def __contains__(self, item):
1100 for _call in self:
1101 assert len(item) == len(_call)
1102 if all([
1103 expected == actual
1104 for expected, actual in zip(item, _call)
1105 ]):
1106 return True
1107 return False
1110def _try_iter(obj):
1111 if obj is None:
1112 return obj
1113 if _is_exception(obj):
1114 return obj
1115 if _callable(obj):
1116 return obj
1117 try:
1118 return iter(obj)
1119 except TypeError:
1120 # XXXX backwards compatibility
1121 # but this will blow up on first call - so maybe we should fail early?
1122 return obj
1125class CallableMixin(Base):
1127 def __init__(self, spec=None, side_effect=None, return_value=DEFAULT,
1128 wraps=None, name=None, spec_set=None, parent=None,
1129 _spec_state=None, _new_name='', _new_parent=None, **kwargs):
1130 self.__dict__['_mock_return_value'] = return_value
1131 _safe_super(CallableMixin, self).__init__(
1132 spec, wraps, name, spec_set, parent,
1133 _spec_state, _new_name, _new_parent, **kwargs
1134 )
1136 self.side_effect = side_effect
1139 def _mock_check_sig(self, *args, **kwargs):
1140 # stub method that can be replaced with one with a specific signature
1141 pass
1144 def __call__(_mock_self, *args, **kwargs):
1145 # can't use self in-case a function / method we are mocking uses self
1146 # in the signature
1147 _mock_self._mock_check_sig(*args, **kwargs)
1148 _mock_self._increment_mock_call(*args, **kwargs)
1149 return _mock_self._mock_call(*args, **kwargs)
1152 def _mock_call(_mock_self, *args, **kwargs):
1153 return _mock_self._execute_mock_call(*args, **kwargs)
1155 def _increment_mock_call(_mock_self, *args, **kwargs):
1156 self = _mock_self
1157 self.called = True
1158 self.call_count += 1
1160 # handle call_args
1161 # needs to be set here so assertions on call arguments pass before
1162 # execution in the case of awaited calls
1163 _call = _Call((args, kwargs), two=True)
1164 self.call_args = _call
1165 self.call_args_list.append(_call)
1167 # initial stuff for method_calls:
1168 do_method_calls = self._mock_parent is not None
1169 method_call_name = self._mock_name
1171 # initial stuff for mock_calls:
1172 mock_call_name = self._mock_new_name
1173 is_a_call = mock_call_name == '()'
1174 self.mock_calls.append(_Call(('', args, kwargs)))
1176 # follow up the chain of mocks:
1177 _new_parent = self._mock_new_parent
1178 while _new_parent is not None:
1180 # handle method_calls:
1181 if do_method_calls:
1182 _new_parent.method_calls.append(_Call((method_call_name, args, kwargs)))
1183 do_method_calls = _new_parent._mock_parent is not None
1184 if do_method_calls:
1185 method_call_name = _new_parent._mock_name + '.' + method_call_name
1187 # handle mock_calls:
1188 this_mock_call = _Call((mock_call_name, args, kwargs))
1189 _new_parent.mock_calls.append(this_mock_call)
1191 if _new_parent._mock_new_name:
1192 if is_a_call:
1193 dot = ''
1194 else:
1195 dot = '.'
1196 is_a_call = _new_parent._mock_new_name == '()'
1197 mock_call_name = _new_parent._mock_new_name + dot + mock_call_name
1199 # follow the parental chain:
1200 _new_parent = _new_parent._mock_new_parent
1202 def _execute_mock_call(_mock_self, *args, **kwargs):
1203 self = _mock_self
1204 # separate from _increment_mock_call so that awaited functions are
1205 # executed separately from their call, also AsyncMock overrides this method
1207 effect = self.side_effect
1208 if effect is not None:
1209 if _is_exception(effect):
1210 raise effect
1211 elif not _callable(effect):
1212 result = next(effect)
1213 if _is_exception(result):
1214 raise result
1215 else:
1216 result = effect(*args, **kwargs)
1218 if result is not DEFAULT:
1219 return result
1221 if self._mock_return_value is not DEFAULT:
1222 return self.return_value
1224 if self._mock_wraps is not None:
1225 return self._mock_wraps(*args, **kwargs)
1227 return self.return_value
1231class Mock(CallableMixin, NonCallableMock):
1232 """
1233 Create a new `Mock` object. `Mock` takes several optional arguments
1234 that specify the behaviour of the Mock object:
1236 * `spec`: This can be either a list of strings or an existing object (a
1237 class or instance) that acts as the specification for the mock object. If
1238 you pass in an object then a list of strings is formed by calling dir on
1239 the object (excluding unsupported magic attributes and methods). Accessing
1240 any attribute not in this list will raise an `AttributeError`.
1242 If `spec` is an object (rather than a list of strings) then
1243 `mock.__class__` returns the class of the spec object. This allows mocks
1244 to pass `isinstance` tests.
1246 * `spec_set`: A stricter variant of `spec`. If used, attempting to *set*
1247 or get an attribute on the mock that isn't on the object passed as
1248 `spec_set` will raise an `AttributeError`.
1250 * `side_effect`: A function to be called whenever the Mock is called. See
1251 the `side_effect` attribute. Useful for raising exceptions or
1252 dynamically changing return values. The function is called with the same
1253 arguments as the mock, and unless it returns `DEFAULT`, the return
1254 value of this function is used as the return value.
1256 If `side_effect` is an iterable then each call to the mock will return
1257 the next value from the iterable. If any of the members of the iterable
1258 are exceptions they will be raised instead of returned.
1260 * `return_value`: The value returned when the mock is called. By default
1261 this is a new Mock (created on first access). See the
1262 `return_value` attribute.
1264 * `unsafe`: By default, accessing any attribute whose name starts with
1265 *assert*, *assret*, *asert*, *aseert*, or *assrt* raises an AttributeError.
1266 Additionally, an AttributeError is raised when accessing
1267 attributes that match the name of an assertion method without the prefix
1268 `assert_`, e.g. accessing `called_once` instead of `assert_called_once`.
1269 Passing `unsafe=True` will allow access to these attributes.
1271 * `wraps`: Item for the mock object to wrap. If `wraps` is not None then
1272 calling the Mock will pass the call through to the wrapped object
1273 (returning the real result). Attribute access on the mock will return a
1274 Mock object that wraps the corresponding attribute of the wrapped object
1275 (so attempting to access an attribute that doesn't exist will raise an
1276 `AttributeError`).
1278 If the mock has an explicit `return_value` set then calls are not passed
1279 to the wrapped object and the `return_value` is returned instead.
1281 * `name`: If the mock has a name then it will be used in the repr of the
1282 mock. This can be useful for debugging. The name is propagated to child
1283 mocks.
1285 Mocks can also be called with arbitrary keyword arguments. These will be
1286 used to set attributes on the mock after it is created.
1287 """
1290def _dot_lookup(thing, comp, import_path):
1291 try:
1292 return getattr(thing, comp)
1293 except AttributeError:
1294 __import__(import_path)
1295 return getattr(thing, comp)
1298def _importer(target):
1299 components = target.split('.')
1300 import_path = components.pop(0)
1301 thing = __import__(import_path)
1303 for comp in components:
1304 import_path += ".%s" % comp
1305 thing = _dot_lookup(thing, comp, import_path)
1306 return thing
1309# _check_spec_arg_typos takes kwargs from commands like patch and checks that
1310# they don't contain common misspellings of arguments related to autospeccing.
1311def _check_spec_arg_typos(kwargs_to_check):
1312 typos = ("autospect", "auto_spec", "set_spec")
1313 for typo in typos:
1314 if typo in kwargs_to_check:
1315 raise RuntimeError(
1316 f"{typo!r} might be a typo; use unsafe=True if this is intended"
1317 )
1320class _patch(object):
1322 attribute_name = None
1323 _active_patches = []
1325 def __init__(
1326 self, getter, attribute, new, spec, create,
1327 spec_set, autospec, new_callable, kwargs, *, unsafe=False
1328 ):
1329 if new_callable is not None:
1330 if new is not DEFAULT:
1331 raise ValueError(
1332 "Cannot use 'new' and 'new_callable' together"
1333 )
1334 if autospec is not None:
1335 raise ValueError(
1336 "Cannot use 'autospec' and 'new_callable' together"
1337 )
1338 if not unsafe:
1339 _check_spec_arg_typos(kwargs)
1340 if _is_instance_mock(spec):
1341 raise InvalidSpecError(
1342 f'Cannot spec attr {attribute!r} as the spec '
1343 f'has already been mocked out. [spec={spec!r}]')
1344 if _is_instance_mock(spec_set):
1345 raise InvalidSpecError(
1346 f'Cannot spec attr {attribute!r} as the spec_set '
1347 f'target has already been mocked out. [spec_set={spec_set!r}]')
1349 self.getter = getter
1350 self.attribute = attribute
1351 self.new = new
1352 self.new_callable = new_callable
1353 self.spec = spec
1354 self.create = create
1355 self.has_local = False
1356 self.spec_set = spec_set
1357 self.autospec = autospec
1358 self.kwargs = kwargs
1359 self.additional_patchers = []
1362 def copy(self):
1363 patcher = _patch(
1364 self.getter, self.attribute, self.new, self.spec,
1365 self.create, self.spec_set,
1366 self.autospec, self.new_callable, self.kwargs
1367 )
1368 patcher.attribute_name = self.attribute_name
1369 patcher.additional_patchers = [
1370 p.copy() for p in self.additional_patchers
1371 ]
1372 return patcher
1375 def __call__(self, func):
1376 if isinstance(func, type):
1377 return self.decorate_class(func)
1378 if inspect.iscoroutinefunction(func):
1379 return self.decorate_async_callable(func)
1380 return self.decorate_callable(func)
1383 def decorate_class(self, klass):
1384 for attr in dir(klass):
1385 if not attr.startswith(patch.TEST_PREFIX):
1386 continue
1388 attr_value = getattr(klass, attr)
1389 if not hasattr(attr_value, "__call__"):
1390 continue
1392 patcher = self.copy()
1393 setattr(klass, attr, patcher(attr_value))
1394 return klass
1397 @contextlib.contextmanager
1398 def decoration_helper(self, patched, args, keywargs):
1399 extra_args = []
1400 with contextlib.ExitStack() as exit_stack:
1401 for patching in patched.patchings:
1402 arg = exit_stack.enter_context(patching)
1403 if patching.attribute_name is not None:
1404 keywargs.update(arg)
1405 elif patching.new is DEFAULT:
1406 extra_args.append(arg)
1408 args += tuple(extra_args)
1409 yield (args, keywargs)
1412 def decorate_callable(self, func):
1413 # NB. Keep the method in sync with decorate_async_callable()
1414 if hasattr(func, 'patchings'):
1415 func.patchings.append(self)
1416 return func
1418 @wraps(func)
1419 def patched(*args, **keywargs):
1420 with self.decoration_helper(patched,
1421 args,
1422 keywargs) as (newargs, newkeywargs):
1423 return func(*newargs, **newkeywargs)
1425 patched.patchings = [self]
1426 return patched
1429 def decorate_async_callable(self, func):
1430 # NB. Keep the method in sync with decorate_callable()
1431 if hasattr(func, 'patchings'):
1432 func.patchings.append(self)
1433 return func
1435 @wraps(func)
1436 async def patched(*args, **keywargs):
1437 with self.decoration_helper(patched,
1438 args,
1439 keywargs) as (newargs, newkeywargs):
1440 return await func(*newargs, **newkeywargs)
1442 patched.patchings = [self]
1443 return patched
1446 def get_original(self):
1447 target = self.getter()
1448 name = self.attribute
1450 original = DEFAULT
1451 local = False
1453 try:
1454 original = target.__dict__[name]
1455 except (AttributeError, KeyError):
1456 original = getattr(target, name, DEFAULT)
1457 else:
1458 local = True
1460 if name in _builtins and isinstance(target, ModuleType):
1461 self.create = True
1463 if not self.create and original is DEFAULT:
1464 raise AttributeError(
1465 "%s does not have the attribute %r" % (target, name)
1466 )
1467 return original, local
1470 def __enter__(self):
1471 """Perform the patch."""
1472 new, spec, spec_set = self.new, self.spec, self.spec_set
1473 autospec, kwargs = self.autospec, self.kwargs
1474 new_callable = self.new_callable
1475 self.target = self.getter()
1477 # normalise False to None
1478 if spec is False:
1479 spec = None
1480 if spec_set is False:
1481 spec_set = None
1482 if autospec is False:
1483 autospec = None
1485 if spec is not None and autospec is not None:
1486 raise TypeError("Can't specify spec and autospec")
1487 if ((spec is not None or autospec is not None) and
1488 spec_set not in (True, None)):
1489 raise TypeError("Can't provide explicit spec_set *and* spec or autospec")
1491 original, local = self.get_original()
1493 if new is DEFAULT and autospec is None:
1494 inherit = False
1495 if spec is True:
1496 # set spec to the object we are replacing
1497 spec = original
1498 if spec_set is True:
1499 spec_set = original
1500 spec = None
1501 elif spec is not None:
1502 if spec_set is True:
1503 spec_set = spec
1504 spec = None
1505 elif spec_set is True:
1506 spec_set = original
1508 if spec is not None or spec_set is not None:
1509 if original is DEFAULT:
1510 raise TypeError("Can't use 'spec' with create=True")
1511 if isinstance(original, type):
1512 # If we're patching out a class and there is a spec
1513 inherit = True
1514 if spec is None and _is_async_obj(original):
1515 Klass = AsyncMock
1516 else:
1517 Klass = MagicMock
1518 _kwargs = {}
1519 if new_callable is not None:
1520 Klass = new_callable
1521 elif spec is not None or spec_set is not None:
1522 this_spec = spec
1523 if spec_set is not None:
1524 this_spec = spec_set
1525 if _is_list(this_spec):
1526 not_callable = '__call__' not in this_spec
1527 else:
1528 not_callable = not callable(this_spec)
1529 if _is_async_obj(this_spec):
1530 Klass = AsyncMock
1531 elif not_callable:
1532 Klass = NonCallableMagicMock
1534 if spec is not None:
1535 _kwargs['spec'] = spec
1536 if spec_set is not None:
1537 _kwargs['spec_set'] = spec_set
1539 # add a name to mocks
1540 if (isinstance(Klass, type) and
1541 issubclass(Klass, NonCallableMock) and self.attribute):
1542 _kwargs['name'] = self.attribute
1544 _kwargs.update(kwargs)
1545 new = Klass(**_kwargs)
1547 if inherit and _is_instance_mock(new):
1548 # we can only tell if the instance should be callable if the
1549 # spec is not a list
1550 this_spec = spec
1551 if spec_set is not None:
1552 this_spec = spec_set
1553 if (not _is_list(this_spec) and not
1554 _instance_callable(this_spec)):
1555 Klass = NonCallableMagicMock
1557 _kwargs.pop('name')
1558 new.return_value = Klass(_new_parent=new, _new_name='()',
1559 **_kwargs)
1560 elif autospec is not None:
1561 # spec is ignored, new *must* be default, spec_set is treated
1562 # as a boolean. Should we check spec is not None and that spec_set
1563 # is a bool?
1564 if new is not DEFAULT:
1565 raise TypeError(
1566 "autospec creates the mock for you. Can't specify "
1567 "autospec and new."
1568 )
1569 if original is DEFAULT:
1570 raise TypeError("Can't use 'autospec' with create=True")
1571 spec_set = bool(spec_set)
1572 if autospec is True:
1573 autospec = original
1575 if _is_instance_mock(self.target):
1576 raise InvalidSpecError(
1577 f'Cannot autospec attr {self.attribute!r} as the patch '
1578 f'target has already been mocked out. '
1579 f'[target={self.target!r}, attr={autospec!r}]')
1580 if _is_instance_mock(autospec):
1581 target_name = getattr(self.target, '__name__', self.target)
1582 raise InvalidSpecError(
1583 f'Cannot autospec attr {self.attribute!r} from target '
1584 f'{target_name!r} as it has already been mocked out. '
1585 f'[target={self.target!r}, attr={autospec!r}]')
1587 new = create_autospec(autospec, spec_set=spec_set,
1588 _name=self.attribute, **kwargs)
1589 elif kwargs:
1590 # can't set keyword args when we aren't creating the mock
1591 # XXXX If new is a Mock we could call new.configure_mock(**kwargs)
1592 raise TypeError("Can't pass kwargs to a mock we aren't creating")
1594 new_attr = new
1596 self.temp_original = original
1597 self.is_local = local
1598 self._exit_stack = contextlib.ExitStack()
1599 try:
1600 setattr(self.target, self.attribute, new_attr)
1601 if self.attribute_name is not None:
1602 extra_args = {}
1603 if self.new is DEFAULT:
1604 extra_args[self.attribute_name] = new
1605 for patching in self.additional_patchers:
1606 arg = self._exit_stack.enter_context(patching)
1607 if patching.new is DEFAULT:
1608 extra_args.update(arg)
1609 return extra_args
1611 return new
1612 except:
1613 if not self.__exit__(*sys.exc_info()):
1614 raise
1616 def __exit__(self, *exc_info):
1617 """Undo the patch."""
1618 if self.is_local and self.temp_original is not DEFAULT:
1619 setattr(self.target, self.attribute, self.temp_original)
1620 else:
1621 delattr(self.target, self.attribute)
1622 if not self.create and (not hasattr(self.target, self.attribute) or
1623 self.attribute in ('__doc__', '__module__',
1624 '__defaults__', '__annotations__',
1625 '__kwdefaults__')):
1626 # needed for proxy objects like django settings
1627 setattr(self.target, self.attribute, self.temp_original)
1629 del self.temp_original
1630 del self.is_local
1631 del self.target
1632 exit_stack = self._exit_stack
1633 del self._exit_stack
1634 return exit_stack.__exit__(*exc_info)
1637 def start(self):
1638 """Activate a patch, returning any created mock."""
1639 result = self.__enter__()
1640 self._active_patches.append(self)
1641 return result
1644 def stop(self):
1645 """Stop an active patch."""
1646 try:
1647 self._active_patches.remove(self)
1648 except ValueError:
1649 # If the patch hasn't been started this will fail
1650 return None
1652 return self.__exit__(None, None, None)
1656def _get_target(target):
1657 try:
1658 target, attribute = target.rsplit('.', 1)
1659 except (TypeError, ValueError, AttributeError):
1660 raise TypeError(
1661 f"Need a valid target to patch. You supplied: {target!r}")
1662 getter = lambda: _importer(target)
1663 return getter, attribute
1666def _patch_object(
1667 target, attribute, new=DEFAULT, spec=None,
1668 create=False, spec_set=None, autospec=None,
1669 new_callable=None, *, unsafe=False, **kwargs
1670 ):
1671 """
1672 patch the named member (`attribute`) on an object (`target`) with a mock
1673 object.
1675 `patch.object` can be used as a decorator, class decorator or a context
1676 manager. Arguments `new`, `spec`, `create`, `spec_set`,
1677 `autospec` and `new_callable` have the same meaning as for `patch`. Like
1678 `patch`, `patch.object` takes arbitrary keyword arguments for configuring
1679 the mock object it creates.
1681 When used as a class decorator `patch.object` honours `patch.TEST_PREFIX`
1682 for choosing which methods to wrap.
1683 """
1684 if type(target) is str:
1685 raise TypeError(
1686 f"{target!r} must be the actual object to be patched, not a str"
1687 )
1688 getter = lambda: target
1689 return _patch(
1690 getter, attribute, new, spec, create,
1691 spec_set, autospec, new_callable, kwargs, unsafe=unsafe
1692 )
1695def _patch_multiple(target, spec=None, create=False, spec_set=None,
1696 autospec=None, new_callable=None, **kwargs):
1697 """Perform multiple patches in a single call. It takes the object to be
1698 patched (either as an object or a string to fetch the object by importing)
1699 and keyword arguments for the patches::
1701 with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'):
1702 ...
1704 Use `DEFAULT` as the value if you want `patch.multiple` to create
1705 mocks for you. In this case the created mocks are passed into a decorated
1706 function by keyword, and a dictionary is returned when `patch.multiple` is
1707 used as a context manager.
1709 `patch.multiple` can be used as a decorator, class decorator or a context
1710 manager. The arguments `spec`, `spec_set`, `create`,
1711 `autospec` and `new_callable` have the same meaning as for `patch`. These
1712 arguments will be applied to *all* patches done by `patch.multiple`.
1714 When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX`
1715 for choosing which methods to wrap.
1716 """
1717 if type(target) is str:
1718 getter = lambda: _importer(target)
1719 else:
1720 getter = lambda: target
1722 if not kwargs:
1723 raise ValueError(
1724 'Must supply at least one keyword argument with patch.multiple'
1725 )
1726 # need to wrap in a list for python 3, where items is a view
1727 items = list(kwargs.items())
1728 attribute, new = items[0]
1729 patcher = _patch(
1730 getter, attribute, new, spec, create, spec_set,
1731 autospec, new_callable, {}
1732 )
1733 patcher.attribute_name = attribute
1734 for attribute, new in items[1:]:
1735 this_patcher = _patch(
1736 getter, attribute, new, spec, create, spec_set,
1737 autospec, new_callable, {}
1738 )
1739 this_patcher.attribute_name = attribute
1740 patcher.additional_patchers.append(this_patcher)
1741 return patcher
1744def patch(
1745 target, new=DEFAULT, spec=None, create=False,
1746 spec_set=None, autospec=None, new_callable=None, *, unsafe=False, **kwargs
1747 ):
1748 """
1749 `patch` acts as a function decorator, class decorator or a context
1750 manager. Inside the body of the function or with statement, the `target`
1751 is patched with a `new` object. When the function/with statement exits
1752 the patch is undone.
1754 If `new` is omitted, then the target is replaced with an
1755 `AsyncMock if the patched object is an async function or a
1756 `MagicMock` otherwise. If `patch` is used as a decorator and `new` is
1757 omitted, the created mock is passed in as an extra argument to the
1758 decorated function. If `patch` is used as a context manager the created
1759 mock is returned by the context manager.
1761 `target` should be a string in the form `'package.module.ClassName'`. The
1762 `target` is imported and the specified object replaced with the `new`
1763 object, so the `target` must be importable from the environment you are
1764 calling `patch` from. The target is imported when the decorated function
1765 is executed, not at decoration time.
1767 The `spec` and `spec_set` keyword arguments are passed to the `MagicMock`
1768 if patch is creating one for you.
1770 In addition you can pass `spec=True` or `spec_set=True`, which causes
1771 patch to pass in the object being mocked as the spec/spec_set object.
1773 `new_callable` allows you to specify a different class, or callable object,
1774 that will be called to create the `new` object. By default `AsyncMock` is
1775 used for async functions and `MagicMock` for the rest.
1777 A more powerful form of `spec` is `autospec`. If you set `autospec=True`
1778 then the mock will be created with a spec from the object being replaced.
1779 All attributes of the mock will also have the spec of the corresponding
1780 attribute of the object being replaced. Methods and functions being
1781 mocked will have their arguments checked and will raise a `TypeError` if
1782 they are called with the wrong signature. For mocks replacing a class,
1783 their return value (the 'instance') will have the same spec as the class.
1785 Instead of `autospec=True` you can pass `autospec=some_object` to use an
1786 arbitrary object as the spec instead of the one being replaced.
1788 By default `patch` will fail to replace attributes that don't exist. If
1789 you pass in `create=True`, and the attribute doesn't exist, patch will
1790 create the attribute for you when the patched function is called, and
1791 delete it again afterwards. This is useful for writing tests against
1792 attributes that your production code creates at runtime. It is off by
1793 default because it can be dangerous. With it switched on you can write
1794 passing tests against APIs that don't actually exist!
1796 Patch can be used as a `TestCase` class decorator. It works by
1797 decorating each test method in the class. This reduces the boilerplate
1798 code when your test methods share a common patchings set. `patch` finds
1799 tests by looking for method names that start with `patch.TEST_PREFIX`.
1800 By default this is `test`, which matches the way `unittest` finds tests.
1801 You can specify an alternative prefix by setting `patch.TEST_PREFIX`.
1803 Patch can be used as a context manager, with the with statement. Here the
1804 patching applies to the indented block after the with statement. If you
1805 use "as" then the patched object will be bound to the name after the
1806 "as"; very useful if `patch` is creating a mock object for you.
1808 Patch will raise a `RuntimeError` if passed some common misspellings of
1809 the arguments autospec and spec_set. Pass the argument `unsafe` with the
1810 value True to disable that check.
1812 `patch` takes arbitrary keyword arguments. These will be passed to
1813 `AsyncMock` if the patched object is asynchronous, to `MagicMock`
1814 otherwise or to `new_callable` if specified.
1816 `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are
1817 available for alternate use-cases.
1818 """
1819 getter, attribute = _get_target(target)
1820 return _patch(
1821 getter, attribute, new, spec, create,
1822 spec_set, autospec, new_callable, kwargs, unsafe=unsafe
1823 )
1826class _patch_dict(object):
1827 """
1828 Patch a dictionary, or dictionary like object, and restore the dictionary
1829 to its original state after the test.
1831 `in_dict` can be a dictionary or a mapping like container. If it is a
1832 mapping then it must at least support getting, setting and deleting items
1833 plus iterating over keys.
1835 `in_dict` can also be a string specifying the name of the dictionary, which
1836 will then be fetched by importing it.
1838 `values` can be a dictionary of values to set in the dictionary. `values`
1839 can also be an iterable of `(key, value)` pairs.
1841 If `clear` is True then the dictionary will be cleared before the new
1842 values are set.
1844 `patch.dict` can also be called with arbitrary keyword arguments to set
1845 values in the dictionary::
1847 with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()):
1848 ...
1850 `patch.dict` can be used as a context manager, decorator or class
1851 decorator. When used as a class decorator `patch.dict` honours
1852 `patch.TEST_PREFIX` for choosing which methods to wrap.
1853 """
1855 def __init__(self, in_dict, values=(), clear=False, **kwargs):
1856 self.in_dict = in_dict
1857 # support any argument supported by dict(...) constructor
1858 self.values = dict(values)
1859 self.values.update(kwargs)
1860 self.clear = clear
1861 self._original = None
1864 def __call__(self, f):
1865 if isinstance(f, type):
1866 return self.decorate_class(f)
1867 if inspect.iscoroutinefunction(f):
1868 return self.decorate_async_callable(f)
1869 return self.decorate_callable(f)
1872 def decorate_callable(self, f):
1873 @wraps(f)
1874 def _inner(*args, **kw):
1875 self._patch_dict()
1876 try:
1877 return f(*args, **kw)
1878 finally:
1879 self._unpatch_dict()
1881 return _inner
1884 def decorate_async_callable(self, f):
1885 @wraps(f)
1886 async def _inner(*args, **kw):
1887 self._patch_dict()
1888 try:
1889 return await f(*args, **kw)
1890 finally:
1891 self._unpatch_dict()
1893 return _inner
1896 def decorate_class(self, klass):
1897 for attr in dir(klass):
1898 attr_value = getattr(klass, attr)
1899 if (attr.startswith(patch.TEST_PREFIX) and
1900 hasattr(attr_value, "__call__")):
1901 decorator = _patch_dict(self.in_dict, self.values, self.clear)
1902 decorated = decorator(attr_value)
1903 setattr(klass, attr, decorated)
1904 return klass
1907 def __enter__(self):
1908 """Patch the dict."""
1909 self._patch_dict()
1910 return self.in_dict
1913 def _patch_dict(self):
1914 values = self.values
1915 if isinstance(self.in_dict, str):
1916 self.in_dict = _importer(self.in_dict)
1917 in_dict = self.in_dict
1918 clear = self.clear
1920 try:
1921 original = in_dict.copy()
1922 except AttributeError:
1923 # dict like object with no copy method
1924 # must support iteration over keys
1925 original = {}
1926 for key in in_dict:
1927 original[key] = in_dict[key]
1928 self._original = original
1930 if clear:
1931 _clear_dict(in_dict)
1933 try:
1934 in_dict.update(values)
1935 except AttributeError:
1936 # dict like object with no update method
1937 for key in values:
1938 in_dict[key] = values[key]
1941 def _unpatch_dict(self):
1942 in_dict = self.in_dict
1943 original = self._original
1945 _clear_dict(in_dict)
1947 try:
1948 in_dict.update(original)
1949 except AttributeError:
1950 for key in original:
1951 in_dict[key] = original[key]
1954 def __exit__(self, *args):
1955 """Unpatch the dict."""
1956 if self._original is not None:
1957 self._unpatch_dict()
1958 return False
1961 def start(self):
1962 """Activate a patch, returning any created mock."""
1963 result = self.__enter__()
1964 _patch._active_patches.append(self)
1965 return result
1968 def stop(self):
1969 """Stop an active patch."""
1970 try:
1971 _patch._active_patches.remove(self)
1972 except ValueError:
1973 # If the patch hasn't been started this will fail
1974 return None
1976 return self.__exit__(None, None, None)
1979def _clear_dict(in_dict):
1980 try:
1981 in_dict.clear()
1982 except AttributeError:
1983 keys = list(in_dict)
1984 for key in keys:
1985 del in_dict[key]
1988def _patch_stopall():
1989 """Stop all active patches. LIFO to unroll nested patches."""
1990 for patch in reversed(_patch._active_patches):
1991 patch.stop()
1994patch.object = _patch_object
1995patch.dict = _patch_dict
1996patch.multiple = _patch_multiple
1997patch.stopall = _patch_stopall
1998patch.TEST_PREFIX = 'test'
2000magic_methods = (
2001 "lt le gt ge eq ne "
2002 "getitem setitem delitem "
2003 "len contains iter "
2004 "hash str sizeof "
2005 "enter exit "
2006 # we added divmod and rdivmod here instead of numerics
2007 # because there is no idivmod
2008 "divmod rdivmod neg pos abs invert "
2009 "complex int float index "
2010 "round trunc floor ceil "
2011 "bool next "
2012 "fspath "
2013 "aiter "
2014)
2016if IS_PYPY:
2017 # PyPy has no __sizeof__: http://doc.pypy.org/en/latest/cpython_differences.html
2018 magic_methods = magic_methods.replace('sizeof ', '')
2020numerics = (
2021 "add sub mul matmul truediv floordiv mod lshift rshift and xor or pow"
2022)
2023inplace = ' '.join('i%s' % n for n in numerics.split())
2024right = ' '.join('r%s' % n for n in numerics.split())
2026# not including __prepare__, __instancecheck__, __subclasscheck__
2027# (as they are metaclass methods)
2028# __del__ is not supported at all as it causes problems if it exists
2030_non_defaults = {
2031 '__get__', '__set__', '__delete__', '__reversed__', '__missing__',
2032 '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__',
2033 '__getstate__', '__setstate__', '__getformat__',
2034 '__repr__', '__dir__', '__subclasses__', '__format__',
2035 '__getnewargs_ex__',
2036}
2039def _get_method(name, func):
2040 "Turns a callable object (like a mock) into a real function"
2041 def method(self, *args, **kw):
2042 return func(self, *args, **kw)
2043 method.__name__ = name
2044 return method
2047_magics = {
2048 '__%s__' % method for method in
2049 ' '.join([magic_methods, numerics, inplace, right]).split()
2050}
2052# Magic methods used for async `with` statements
2053_async_method_magics = {"__aenter__", "__aexit__", "__anext__"}
2054# Magic methods that are only used with async calls but are synchronous functions themselves
2055_sync_async_magics = {"__aiter__"}
2056_async_magics = _async_method_magics | _sync_async_magics
2058_all_sync_magics = _magics | _non_defaults
2059_all_magics = _all_sync_magics | _async_magics
2061_unsupported_magics = {
2062 '__getattr__', '__setattr__',
2063 '__init__', '__new__', '__prepare__',
2064 '__instancecheck__', '__subclasscheck__',
2065 '__del__'
2066}
2068_calculate_return_value = {
2069 '__hash__': lambda self: object.__hash__(self),
2070 '__str__': lambda self: object.__str__(self),
2071 '__sizeof__': lambda self: object.__sizeof__(self),
2072 '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}",
2073}
2075_return_values = {
2076 '__lt__': NotImplemented,
2077 '__gt__': NotImplemented,
2078 '__le__': NotImplemented,
2079 '__ge__': NotImplemented,
2080 '__int__': 1,
2081 '__contains__': False,
2082 '__len__': 0,
2083 '__exit__': False,
2084 '__complex__': 1j,
2085 '__float__': 1.0,
2086 '__bool__': True,
2087 '__index__': 1,
2088 '__aexit__': False,
2089}
2092def _get_eq(self):
2093 def __eq__(other):
2094 ret_val = self.__eq__._mock_return_value
2095 if ret_val is not DEFAULT:
2096 return ret_val
2097 if self is other:
2098 return True
2099 return NotImplemented
2100 return __eq__
2102def _get_ne(self):
2103 def __ne__(other):
2104 if self.__ne__._mock_return_value is not DEFAULT:
2105 return DEFAULT
2106 if self is other:
2107 return False
2108 return NotImplemented
2109 return __ne__
2111def _get_iter(self):
2112 def __iter__():
2113 ret_val = self.__iter__._mock_return_value
2114 if ret_val is DEFAULT:
2115 return iter([])
2116 # if ret_val was already an iterator, then calling iter on it should
2117 # return the iterator unchanged
2118 return iter(ret_val)
2119 return __iter__
2121def _get_async_iter(self):
2122 def __aiter__():
2123 ret_val = self.__aiter__._mock_return_value
2124 if ret_val is DEFAULT:
2125 return _AsyncIterator(iter([]))
2126 return _AsyncIterator(iter(ret_val))
2127 return __aiter__
2129_side_effect_methods = {
2130 '__eq__': _get_eq,
2131 '__ne__': _get_ne,
2132 '__iter__': _get_iter,
2133 '__aiter__': _get_async_iter
2134}
2138def _set_return_value(mock, method, name):
2139 fixed = _return_values.get(name, DEFAULT)
2140 if fixed is not DEFAULT:
2141 method.return_value = fixed
2142 return
2144 return_calculator = _calculate_return_value.get(name)
2145 if return_calculator is not None:
2146 return_value = return_calculator(mock)
2147 method.return_value = return_value
2148 return
2150 side_effector = _side_effect_methods.get(name)
2151 if side_effector is not None:
2152 method.side_effect = side_effector(mock)
2156class MagicMixin(Base):
2157 def __init__(self, *args, **kw):
2158 self._mock_set_magics() # make magic work for kwargs in init
2159 _safe_super(MagicMixin, self).__init__(*args, **kw)
2160 self._mock_set_magics() # fix magic broken by upper level init
2163 def _mock_set_magics(self):
2164 orig_magics = _magics | _async_method_magics
2165 these_magics = orig_magics
2167 if getattr(self, "_mock_methods", None) is not None:
2168 these_magics = orig_magics.intersection(self._mock_methods)
2170 remove_magics = set()
2171 remove_magics = orig_magics - these_magics
2173 for entry in remove_magics:
2174 if entry in type(self).__dict__:
2175 # remove unneeded magic methods
2176 delattr(self, entry)
2178 # don't overwrite existing attributes if called a second time
2179 these_magics = these_magics - set(type(self).__dict__)
2181 _type = type(self)
2182 for entry in these_magics:
2183 setattr(_type, entry, MagicProxy(entry, self))
2187class NonCallableMagicMock(MagicMixin, NonCallableMock):
2188 """A version of `MagicMock` that isn't callable."""
2189 def mock_add_spec(self, spec, spec_set=False):
2190 """Add a spec to a mock. `spec` can either be an object or a
2191 list of strings. Only attributes on the `spec` can be fetched as
2192 attributes from the mock.
2194 If `spec_set` is True then only attributes on the spec can be set."""
2195 self._mock_add_spec(spec, spec_set)
2196 self._mock_set_magics()
2199class AsyncMagicMixin(MagicMixin):
2200 pass
2203class MagicMock(MagicMixin, Mock):
2204 """
2205 MagicMock is a subclass of Mock with default implementations
2206 of most of the magic methods. You can use MagicMock without having to
2207 configure the magic methods yourself.
2209 If you use the `spec` or `spec_set` arguments then *only* magic
2210 methods that exist in the spec will be created.
2212 Attributes and the return value of a `MagicMock` will also be `MagicMocks`.
2213 """
2214 def mock_add_spec(self, spec, spec_set=False):
2215 """Add a spec to a mock. `spec` can either be an object or a
2216 list of strings. Only attributes on the `spec` can be fetched as
2217 attributes from the mock.
2219 If `spec_set` is True then only attributes on the spec can be set."""
2220 self._mock_add_spec(spec, spec_set)
2221 self._mock_set_magics()
2225class MagicProxy(Base):
2226 def __init__(self, name, parent):
2227 self.name = name
2228 self.parent = parent
2230 def create_mock(self):
2231 entry = self.name
2232 parent = self.parent
2233 m = parent._get_child_mock(name=entry, _new_name=entry,
2234 _new_parent=parent)
2235 setattr(parent, entry, m)
2236 _set_return_value(parent, m, entry)
2237 return m
2239 def __get__(self, obj, _type=None):
2240 return self.create_mock()
2243_CODE_ATTRS = dir(CodeType)
2244_CODE_SIG = inspect.signature(partial(CodeType.__init__, None))
2247class AsyncMockMixin(Base):
2248 await_count = _delegating_property('await_count')
2249 await_args = _delegating_property('await_args')
2250 await_args_list = _delegating_property('await_args_list')
2252 def __init__(self, *args, **kwargs):
2253 super().__init__(*args, **kwargs)
2254 # iscoroutinefunction() checks _is_coroutine property to say if an
2255 # object is a coroutine. Without this check it looks to see if it is a
2256 # function/method, which in this case it is not (since it is an
2257 # AsyncMock).
2258 # It is set through __dict__ because when spec_set is True, this
2259 # attribute is likely undefined.
2260 self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine
2261 self.__dict__['_mock_await_count'] = 0
2262 self.__dict__['_mock_await_args'] = None
2263 self.__dict__['_mock_await_args_list'] = _CallList()
2264 code_mock = NonCallableMock(spec_set=_CODE_ATTRS)
2265 code_mock.__dict__["_spec_class"] = CodeType
2266 code_mock.__dict__["_spec_signature"] = _CODE_SIG
2267 code_mock.co_flags = (
2268 inspect.CO_COROUTINE
2269 + inspect.CO_VARARGS
2270 + inspect.CO_VARKEYWORDS
2271 )
2272 code_mock.co_argcount = 0
2273 code_mock.co_varnames = ('args', 'kwargs')
2274 try:
2275 code_mock.co_posonlyargcount = 0
2276 except AttributeError:
2277 # Python 3.7 and earlier.
2278 pass
2279 code_mock.co_kwonlyargcount = 0
2280 self.__dict__['__code__'] = code_mock
2281 self.__dict__['__name__'] = 'AsyncMock'
2282 self.__dict__['__defaults__'] = tuple()
2283 self.__dict__['__kwdefaults__'] = {}
2284 self.__dict__['__annotations__'] = None
2286 async def _execute_mock_call(_mock_self, *args, **kwargs):
2287 self = _mock_self
2288 # This is nearly just like super(), except for special handling
2289 # of coroutines
2291 _call = _Call((args, kwargs), two=True)
2292 self.await_count += 1
2293 self.await_args = _call
2294 self.await_args_list.append(_call)
2296 effect = self.side_effect
2297 if effect is not None:
2298 if _is_exception(effect):
2299 raise effect
2300 elif not _callable(effect):
2301 try:
2302 result = next(effect)
2303 except StopIteration:
2304 # It is impossible to propagate a StopIteration
2305 # through coroutines because of PEP 479
2306 raise StopAsyncIteration
2307 if _is_exception(result):
2308 raise result
2309 elif iscoroutinefunction(effect):
2310 result = await effect(*args, **kwargs)
2311 else:
2312 result = effect(*args, **kwargs)
2314 if result is not DEFAULT:
2315 return result
2317 if self._mock_return_value is not DEFAULT:
2318 return self.return_value
2320 if self._mock_wraps is not None:
2321 if iscoroutinefunction(self._mock_wraps):
2322 return await self._mock_wraps(*args, **kwargs)
2323 return self._mock_wraps(*args, **kwargs)
2325 return self.return_value
2327 def assert_awaited(_mock_self):
2328 """
2329 Assert that the mock was awaited at least once.
2330 """
2331 self = _mock_self
2332 if self.await_count == 0:
2333 msg = f"Expected {self._mock_name or 'mock'} to have been awaited."
2334 raise AssertionError(msg)
2336 def assert_awaited_once(_mock_self):
2337 """
2338 Assert that the mock was awaited exactly once.
2339 """
2340 self = _mock_self
2341 if not self.await_count == 1:
2342 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2343 f" Awaited {self.await_count} times.")
2344 raise AssertionError(msg)
2346 def assert_awaited_with(_mock_self, *args, **kwargs):
2347 """
2348 Assert that the last await was with the specified arguments.
2349 """
2350 self = _mock_self
2351 if self.await_args is None:
2352 expected = self._format_mock_call_signature(args, kwargs)
2353 raise AssertionError(f'Expected await: {expected}\nNot awaited')
2355 def _error_message():
2356 msg = self._format_mock_failure_message(args, kwargs, action='await')
2357 return msg
2359 expected = self._call_matcher(_Call((args, kwargs), two=True))
2360 actual = self._call_matcher(self.await_args)
2361 if actual != expected:
2362 cause = expected if isinstance(expected, Exception) else None
2363 raise AssertionError(_error_message()) from cause
2365 def assert_awaited_once_with(_mock_self, *args, **kwargs):
2366 """
2367 Assert that the mock was awaited exactly once and with the specified
2368 arguments.
2369 """
2370 self = _mock_self
2371 if not self.await_count == 1:
2372 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2373 f" Awaited {self.await_count} times.")
2374 raise AssertionError(msg)
2375 return self.assert_awaited_with(*args, **kwargs)
2377 def assert_any_await(_mock_self, *args, **kwargs):
2378 """
2379 Assert the mock has ever been awaited with the specified arguments.
2380 """
2381 self = _mock_self
2382 expected = self._call_matcher(_Call((args, kwargs), two=True))
2383 cause = expected if isinstance(expected, Exception) else None
2384 actual = [self._call_matcher(c) for c in self.await_args_list]
2385 if cause or expected not in _AnyComparer(actual):
2386 expected_string = self._format_mock_call_signature(args, kwargs)
2387 raise AssertionError(
2388 '%s await not found' % expected_string
2389 ) from cause
2391 def assert_has_awaits(_mock_self, calls, any_order=False):
2392 """
2393 Assert the mock has been awaited with the specified calls.
2394 The :attr:`await_args_list` list is checked for the awaits.
2396 If `any_order` is False (the default) then the awaits must be
2397 sequential. There can be extra calls before or after the
2398 specified awaits.
2400 If `any_order` is True then the awaits can be in any order, but
2401 they must all appear in :attr:`await_args_list`.
2402 """
2403 self = _mock_self
2404 expected = [self._call_matcher(c) for c in calls]
2405 cause = cause = next((e for e in expected if isinstance(e, Exception)), None)
2406 all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list)
2407 if not any_order:
2408 if expected not in all_awaits:
2409 if cause is None:
2410 problem = 'Awaits not found.'
2411 else:
2412 problem = ('Error processing expected awaits.\n'
2413 'Errors: {}').format(
2414 [e if isinstance(e, Exception) else None
2415 for e in expected])
2416 raise AssertionError(
2417 f'{problem}\n'
2418 f'Expected: {_CallList(calls)}\n'
2419 f'Actual: {self.await_args_list}'
2420 ) from cause
2421 return
2423 all_awaits = list(all_awaits)
2425 not_found = []
2426 for kall in expected:
2427 try:
2428 all_awaits.remove(kall)
2429 except ValueError:
2430 not_found.append(kall)
2431 if not_found:
2432 raise AssertionError(
2433 '%r not all found in await list' % (tuple(not_found),)
2434 ) from cause
2436 def assert_not_awaited(_mock_self):
2437 """
2438 Assert that the mock was never awaited.
2439 """
2440 self = _mock_self
2441 if self.await_count != 0:
2442 msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited."
2443 f" Awaited {self.await_count} times.")
2444 raise AssertionError(msg)
2446 def reset_mock(self, *args, **kwargs):
2447 """
2448 See :func:`.Mock.reset_mock()`
2449 """
2450 super().reset_mock(*args, **kwargs)
2451 self.await_count = 0
2452 self.await_args = None
2453 self.await_args_list = _CallList()
2456class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock):
2457 """
2458 Enhance :class:`Mock` with features allowing to mock
2459 an async function.
2461 The :class:`AsyncMock` object will behave so the object is
2462 recognized as an async function, and the result of a call is an awaitable:
2464 >>> mock = AsyncMock()
2465 >>> iscoroutinefunction(mock)
2466 True
2467 >>> inspect.isawaitable(mock())
2468 True
2471 The result of ``mock()`` is an async function which will have the outcome
2472 of ``side_effect`` or ``return_value``:
2474 - if ``side_effect`` is a function, the async function will return the
2475 result of that function,
2476 - if ``side_effect`` is an exception, the async function will raise the
2477 exception,
2478 - if ``side_effect`` is an iterable, the async function will return the
2479 next value of the iterable, however, if the sequence of result is
2480 exhausted, ``StopIteration`` is raised immediately,
2481 - if ``side_effect`` is not defined, the async function will return the
2482 value defined by ``return_value``, hence, by default, the async function
2483 returns a new :class:`AsyncMock` object.
2485 If the outcome of ``side_effect`` or ``return_value`` is an async function,
2486 the mock async function obtained when the mock object is called will be this
2487 async function itself (and not an async function returning an async
2488 function).
2490 The test author can also specify a wrapped object with ``wraps``. In this
2491 case, the :class:`Mock` object behavior is the same as with an
2492 :class:`.Mock` object: the wrapped object may have methods
2493 defined as async function functions.
2495 Based on Martin Richard's asynctest project.
2496 """
2499class _ANY(object):
2500 "A helper object that compares equal to everything."
2502 def __eq__(self, other):
2503 return True
2505 def __ne__(self, other):
2506 return False
2508 def __repr__(self):
2509 return '<ANY>'
2511ANY = _ANY()
2515def _format_call_signature(name, args, kwargs):
2516 message = '%s(%%s)' % name
2517 formatted_args = ''
2518 args_string = ', '.join([repr(arg) for arg in args])
2519 kwargs_string = ', '.join([
2520 '%s=%r' % (key, value) for key, value in kwargs.items()
2521 ])
2522 if args_string:
2523 formatted_args = args_string
2524 if kwargs_string:
2525 if formatted_args:
2526 formatted_args += ', '
2527 formatted_args += kwargs_string
2529 return message % formatted_args
2533class _Call(tuple):
2534 """
2535 A tuple for holding the results of a call to a mock, either in the form
2536 `(args, kwargs)` or `(name, args, kwargs)`.
2538 If args or kwargs are empty then a call tuple will compare equal to
2539 a tuple without those values. This makes comparisons less verbose::
2541 _Call(('name', (), {})) == ('name',)
2542 _Call(('name', (1,), {})) == ('name', (1,))
2543 _Call(((), {'a': 'b'})) == ({'a': 'b'},)
2545 The `_Call` object provides a useful shortcut for comparing with call::
2547 _Call(((1, 2), {'a': 3})) == call(1, 2, a=3)
2548 _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3)
2550 If the _Call has no name then it will match any name.
2551 """
2552 def __new__(cls, value=(), name='', parent=None, two=False,
2553 from_kall=True):
2554 args = ()
2555 kwargs = {}
2556 _len = len(value)
2557 if _len == 3:
2558 name, args, kwargs = value
2559 elif _len == 2:
2560 first, second = value
2561 if isinstance(first, str):
2562 name = first
2563 if isinstance(second, tuple):
2564 args = second
2565 else:
2566 kwargs = second
2567 else:
2568 args, kwargs = first, second
2569 elif _len == 1:
2570 value, = value
2571 if isinstance(value, str):
2572 name = value
2573 elif isinstance(value, tuple):
2574 args = value
2575 else:
2576 kwargs = value
2578 if two:
2579 return tuple.__new__(cls, (args, kwargs))
2581 return tuple.__new__(cls, (name, args, kwargs))
2584 def __init__(self, value=(), name=None, parent=None, two=False,
2585 from_kall=True):
2586 self._mock_name = name
2587 self._mock_parent = parent
2588 self._mock_from_kall = from_kall
2591 def __eq__(self, other):
2592 try:
2593 len_other = len(other)
2594 except TypeError:
2595 return NotImplemented
2597 self_name = ''
2598 if len(self) == 2:
2599 self_args, self_kwargs = self
2600 else:
2601 self_name, self_args, self_kwargs = self
2603 if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None)
2604 and self._mock_parent != other._mock_parent):
2605 return False
2607 other_name = ''
2608 if len_other == 0:
2609 other_args, other_kwargs = (), {}
2610 elif len_other == 3:
2611 other_name, other_args, other_kwargs = other
2612 elif len_other == 1:
2613 value, = other
2614 if isinstance(value, tuple):
2615 other_args = value
2616 other_kwargs = {}
2617 elif isinstance(value, str):
2618 other_name = value
2619 other_args, other_kwargs = (), {}
2620 else:
2621 other_args = ()
2622 other_kwargs = value
2623 elif len_other == 2:
2624 # could be (name, args) or (name, kwargs) or (args, kwargs)
2625 first, second = other
2626 if isinstance(first, str):
2627 other_name = first
2628 if isinstance(second, tuple):
2629 other_args, other_kwargs = second, {}
2630 else:
2631 other_args, other_kwargs = (), second
2632 else:
2633 other_args, other_kwargs = first, second
2634 else:
2635 return False
2637 if self_name and other_name != self_name:
2638 return False
2640 # this order is important for ANY to work!
2641 return (other_args, other_kwargs) == (self_args, self_kwargs)
2644 __ne__ = object.__ne__
2647 def __call__(self, *args, **kwargs):
2648 if self._mock_name is None:
2649 return _Call(('', args, kwargs), name='()')
2651 name = self._mock_name + '()'
2652 return _Call((self._mock_name, args, kwargs), name=name, parent=self)
2655 def __getattr__(self, attr):
2656 if self._mock_name is None:
2657 return _Call(name=attr, from_kall=False)
2658 name = '%s.%s' % (self._mock_name, attr)
2659 return _Call(name=name, parent=self, from_kall=False)
2662 def __getattribute__(self, attr):
2663 if attr in tuple.__dict__:
2664 raise AttributeError
2665 return tuple.__getattribute__(self, attr)
2668 def _get_call_arguments(self):
2669 if len(self) == 2:
2670 args, kwargs = self
2671 else:
2672 name, args, kwargs = self
2674 return args, kwargs
2676 @property
2677 def args(self):
2678 return self._get_call_arguments()[0]
2680 @property
2681 def kwargs(self):
2682 return self._get_call_arguments()[1]
2684 def __repr__(self):
2685 if not self._mock_from_kall:
2686 name = self._mock_name or 'call'
2687 if name.startswith('()'):
2688 name = 'call%s' % name
2689 return name
2691 if len(self) == 2:
2692 name = 'call'
2693 args, kwargs = self
2694 else:
2695 name, args, kwargs = self
2696 if not name:
2697 name = 'call'
2698 elif not name.startswith('()'):
2699 name = 'call.%s' % name
2700 else:
2701 name = 'call%s' % name
2702 return _format_call_signature(name, args, kwargs)
2705 def call_list(self):
2706 """For a call object that represents multiple calls, `call_list`
2707 returns a list of all the intermediate calls as well as the
2708 final call."""
2709 vals = []
2710 thing = self
2711 while thing is not None:
2712 if thing._mock_from_kall:
2713 vals.append(thing)
2714 thing = thing._mock_parent
2715 return _CallList(reversed(vals))
2718call = _Call(from_kall=False)
2721def create_autospec(spec, spec_set=False, instance=False, _parent=None,
2722 _name=None, *, unsafe=False, **kwargs):
2723 """Create a mock object using another object as a spec. Attributes on the
2724 mock will use the corresponding attribute on the `spec` object as their
2725 spec.
2727 Functions or methods being mocked will have their arguments checked
2728 to check that they are called with the correct signature.
2730 If `spec_set` is True then attempting to set attributes that don't exist
2731 on the spec object will raise an `AttributeError`.
2733 If a class is used as a spec then the return value of the mock (the
2734 instance of the class) will have the same spec. You can use a class as the
2735 spec for an instance object by passing `instance=True`. The returned mock
2736 will only be callable if instances of the mock are callable.
2738 `create_autospec` will raise a `RuntimeError` if passed some common
2739 misspellings of the arguments autospec and spec_set. Pass the argument
2740 `unsafe` with the value True to disable that check.
2742 `create_autospec` also takes arbitrary keyword arguments that are passed to
2743 the constructor of the created mock."""
2744 if _is_list(spec):
2745 # can't pass a list instance to the mock constructor as it will be
2746 # interpreted as a list of strings
2747 spec = type(spec)
2749 is_type = isinstance(spec, type)
2750 if _is_instance_mock(spec):
2751 raise InvalidSpecError(f'Cannot autospec a Mock object. '
2752 f'[object={spec!r}]')
2753 is_async_func = _is_async_func(spec)
2754 _kwargs = {'spec': spec}
2755 if spec_set:
2756 _kwargs = {'spec_set': spec}
2757 elif spec is None:
2758 # None we mock with a normal mock without a spec
2759 _kwargs = {}
2760 if _kwargs and instance:
2761 _kwargs['_spec_as_instance'] = True
2762 if not unsafe:
2763 _check_spec_arg_typos(kwargs)
2765 _kwargs.update(kwargs)
2767 Klass = MagicMock
2768 if inspect.isdatadescriptor(spec):
2769 # descriptors don't have a spec
2770 # because we don't know what type they return
2771 _kwargs = {}
2772 elif is_async_func:
2773 if instance:
2774 raise RuntimeError("Instance can not be True when create_autospec "
2775 "is mocking an async function")
2776 Klass = AsyncMock
2777 elif not _callable(spec):
2778 Klass = NonCallableMagicMock
2779 elif is_type and instance and not _instance_callable(spec):
2780 Klass = NonCallableMagicMock
2782 _name = _kwargs.pop('name', _name)
2784 _new_name = _name
2785 if _parent is None:
2786 # for a top level object no _new_name should be set
2787 _new_name = ''
2789 mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name,
2790 name=_name, **_kwargs)
2792 if isinstance(spec, FunctionTypes):
2793 # should only happen at the top level because we don't
2794 # recurse for functions
2795 mock = _set_signature(mock, spec)
2796 if is_async_func:
2797 _setup_async_mock(mock)
2798 else:
2799 _check_signature(spec, mock, is_type, instance)
2801 if _parent is not None and not instance:
2802 _parent._mock_children[_name] = mock
2804 if is_type and not instance and 'return_value' not in kwargs:
2805 mock.return_value = create_autospec(spec, spec_set, instance=True,
2806 _name='()', _parent=mock)
2808 for entry in dir(spec):
2809 if _is_magic(entry):
2810 # MagicMock already does the useful magic methods for us
2811 continue
2813 # XXXX do we need a better way of getting attributes without
2814 # triggering code execution (?) Probably not - we need the actual
2815 # object to mock it so we would rather trigger a property than mock
2816 # the property descriptor. Likewise we want to mock out dynamically
2817 # provided attributes.
2818 # XXXX what about attributes that raise exceptions other than
2819 # AttributeError on being fetched?
2820 # we could be resilient against it, or catch and propagate the
2821 # exception when the attribute is fetched from the mock
2822 try:
2823 original = getattr(spec, entry)
2824 except AttributeError:
2825 continue
2827 kwargs = {'spec': original}
2828 if spec_set:
2829 kwargs = {'spec_set': original}
2831 if not isinstance(original, FunctionTypes):
2832 new = _SpecState(original, spec_set, mock, entry, instance)
2833 mock._mock_children[entry] = new
2834 else:
2835 parent = mock
2836 if isinstance(spec, FunctionTypes):
2837 parent = mock.mock
2839 skipfirst = _must_skip(spec, entry, is_type)
2840 kwargs['_eat_self'] = skipfirst
2841 if iscoroutinefunction(original):
2842 child_klass = AsyncMock
2843 else:
2844 child_klass = MagicMock
2845 new = child_klass(parent=parent, name=entry, _new_name=entry,
2846 _new_parent=parent,
2847 **kwargs)
2848 mock._mock_children[entry] = new
2849 new.return_value = child_klass()
2850 _check_signature(original, new, skipfirst=skipfirst)
2852 # so functions created with _set_signature become instance attributes,
2853 # *plus* their underlying mock exists in _mock_children of the parent
2854 # mock. Adding to _mock_children may be unnecessary where we are also
2855 # setting as an instance attribute?
2856 if isinstance(new, FunctionTypes):
2857 setattr(mock, entry, new)
2859 return mock
2862def _must_skip(spec, entry, is_type):
2863 """
2864 Return whether we should skip the first argument on spec's `entry`
2865 attribute.
2866 """
2867 if not isinstance(spec, type):
2868 if entry in getattr(spec, '__dict__', {}):
2869 # instance attribute - shouldn't skip
2870 return False
2871 spec = spec.__class__
2873 for klass in spec.__mro__:
2874 result = klass.__dict__.get(entry, DEFAULT)
2875 if result is DEFAULT:
2876 continue
2877 if isinstance(result, (staticmethod, classmethod)):
2878 return False
2879 elif isinstance(result, FunctionTypes):
2880 # Normal method => skip if looked up on type
2881 # (if looked up on instance, self is already skipped)
2882 return is_type
2883 else:
2884 return False
2886 # function is a dynamically provided attribute
2887 return is_type
2890class _SpecState(object):
2892 def __init__(self, spec, spec_set=False, parent=None,
2893 name=None, ids=None, instance=False):
2894 self.spec = spec
2895 self.ids = ids
2896 self.spec_set = spec_set
2897 self.parent = parent
2898 self.instance = instance
2899 self.name = name
2902FunctionTypes = (
2903 # python function
2904 type(create_autospec),
2905 # instance method
2906 type(ANY.__eq__),
2907)
2910file_spec = None
2911open_spec = None
2914def _to_stream(read_data):
2915 if isinstance(read_data, bytes):
2916 return io.BytesIO(read_data)
2917 else:
2918 return io.StringIO(read_data)
2921def mock_open(mock=None, read_data=''):
2922 """
2923 A helper function to create a mock to replace the use of `open`. It works
2924 for `open` called directly or used as a context manager.
2926 The `mock` argument is the mock object to configure. If `None` (the
2927 default) then a `MagicMock` will be created for you, with the API limited
2928 to methods or attributes available on standard file handles.
2930 `read_data` is a string for the `read`, `readline` and `readlines` of the
2931 file handle to return. This is an empty string by default.
2932 """
2933 _read_data = _to_stream(read_data)
2934 _state = [_read_data, None]
2936 def _readlines_side_effect(*args, **kwargs):
2937 if handle.readlines.return_value is not None:
2938 return handle.readlines.return_value
2939 return _state[0].readlines(*args, **kwargs)
2941 def _read_side_effect(*args, **kwargs):
2942 if handle.read.return_value is not None:
2943 return handle.read.return_value
2944 return _state[0].read(*args, **kwargs)
2946 def _readline_side_effect(*args, **kwargs):
2947 yield from _iter_side_effect()
2948 while True:
2949 yield _state[0].readline(*args, **kwargs)
2951 def _iter_side_effect():
2952 if handle.readline.return_value is not None:
2953 while True:
2954 yield handle.readline.return_value
2955 for line in _state[0]:
2956 yield line
2958 def _next_side_effect():
2959 if handle.readline.return_value is not None:
2960 return handle.readline.return_value
2961 return next(_state[0])
2963 global file_spec
2964 if file_spec is None:
2965 import _io
2966 file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO))))
2968 global open_spec
2969 if open_spec is None:
2970 import _io
2971 open_spec = list(set(dir(_io.open)))
2972 if mock is None:
2973 mock = MagicMock(name='open', spec=open_spec)
2975 handle = MagicMock(spec=file_spec)
2976 handle.__enter__.return_value = handle
2978 handle.write.return_value = None
2979 handle.read.return_value = None
2980 handle.readline.return_value = None
2981 handle.readlines.return_value = None
2983 handle.read.side_effect = _read_side_effect
2984 _state[1] = _readline_side_effect()
2985 handle.readline.side_effect = _state[1]
2986 handle.readlines.side_effect = _readlines_side_effect
2987 handle.__iter__.side_effect = _iter_side_effect
2988 handle.__next__.side_effect = _next_side_effect
2990 def reset_data(*args, **kwargs):
2991 _state[0] = _to_stream(read_data)
2992 if handle.readline.side_effect == _state[1]:
2993 # Only reset the side effect if the user hasn't overridden it.
2994 _state[1] = _readline_side_effect()
2995 handle.readline.side_effect = _state[1]
2996 return DEFAULT
2998 mock.side_effect = reset_data
2999 mock.return_value = handle
3000 return mock
3003class PropertyMock(Mock):
3004 """
3005 A mock intended to be used as a property, or other descriptor, on a class.
3006 `PropertyMock` provides `__get__` and `__set__` methods so you can specify
3007 a return value when it is fetched.
3009 Fetching a `PropertyMock` instance from an object calls the mock, with
3010 no args. Setting it calls the mock with the value being set.
3011 """
3012 def _get_child_mock(self, **kwargs):
3013 return MagicMock(**kwargs)
3015 def __get__(self, obj, obj_type=None):
3016 return self()
3017 def __set__(self, obj, val):
3018 self(val)
3021def seal(mock):
3022 """Disable the automatic generation of child mocks.
3024 Given an input Mock, seals it to ensure no further mocks will be generated
3025 when accessing an attribute that was not already defined.
3027 The operation recursively seals the mock passed in, meaning that
3028 the mock itself, any mocks generated by accessing one of its attributes,
3029 and all assigned mocks without a name or spec will be sealed.
3030 """
3031 mock._mock_sealed = True
3032 for attr in dir(mock):
3033 try:
3034 m = getattr(mock, attr)
3035 except AttributeError:
3036 continue
3037 if not isinstance(m, NonCallableMock):
3038 continue
3039 if isinstance(m._mock_children.get(attr), _SpecState):
3040 continue
3041 if m._mock_new_parent is mock:
3042 seal(m)
3045class _AsyncIterator:
3046 """
3047 Wraps an iterator in an asynchronous iterator.
3048 """
3049 def __init__(self, iterator):
3050 self.iterator = iterator
3051 code_mock = NonCallableMock(spec_set=CodeType)
3052 code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE
3053 self.__dict__['__code__'] = code_mock
3055 async def __anext__(self):
3056 try:
3057 return next(self.iterator)
3058 except StopIteration:
3059 pass
3060 raise StopAsyncIteration