Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/mock/mock.py: 21%
1622 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:30 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:30 +0000
1# mock.py
2# Test tools for mocking and patching.
3# Maintained by Michael Foord
4# Backport for other versions of Python available from
5# https://pypi.org/project/mock
7__all__ = (
8 'Mock',
9 'MagicMock',
10 'patch',
11 'sentinel',
12 'DEFAULT',
13 'ANY',
14 'call',
15 'create_autospec',
16 'AsyncMock',
17 'FILTER_DIR',
18 'NonCallableMock',
19 'NonCallableMagicMock',
20 'mock_open',
21 'PropertyMock',
22 'seal',
23)
26import asyncio
27import contextlib
28import io
29import inspect
30import pprint
31import sys
32import builtins
33from asyncio import iscoroutinefunction
34from types import CodeType, ModuleType, MethodType
35from unittest.util import safe_repr
36from functools import wraps, partial
37from threading import RLock
40from mock import IS_PYPY
41from .backports import iscoroutinefunction
44class InvalidSpecError(Exception):
45 """Indicates that an invalid value was used as a mock spec."""
48_builtins = {name for name in dir(builtins) if not name.startswith('_')}
50FILTER_DIR = True
52# Workaround for issue #12370
53# Without this, the __class__ properties wouldn't be set correctly
54_safe_super = super
56def _is_async_obj(obj):
57 if _is_instance_mock(obj) and not isinstance(obj, AsyncMock):
58 return False
59 if hasattr(obj, '__func__'):
60 obj = getattr(obj, '__func__')
61 return iscoroutinefunction(obj) or inspect.isawaitable(obj)
64def _is_async_func(func):
65 if getattr(func, '__code__', None):
66 return iscoroutinefunction(func)
67 else:
68 return False
71def _is_instance_mock(obj):
72 # can't use isinstance on Mock objects because they override __class__
73 # The base class for all mocks is NonCallableMock
74 return issubclass(type(obj), NonCallableMock)
77def _is_exception(obj):
78 return (
79 isinstance(obj, BaseException) or
80 isinstance(obj, type) and issubclass(obj, BaseException)
81 )
84def _extract_mock(obj):
85 # Autospecced functions will return a FunctionType with "mock" attribute
86 # which is the actual mock object that needs to be used.
87 if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'):
88 return obj.mock
89 else:
90 return obj
93def _get_signature_object(func, as_instance, eat_self):
94 """
95 Given an arbitrary, possibly callable object, try to create a suitable
96 signature object.
97 Return a (reduced func, signature) tuple, or None.
98 """
99 if isinstance(func, type) and not as_instance:
100 # If it's a type and should be modelled as a type, use __init__.
101 func = func.__init__
102 # Skip the `self` argument in __init__
103 eat_self = True
104 elif not isinstance(func, FunctionTypes):
105 # If we really want to model an instance of the passed type,
106 # __call__ should be looked up, not __init__.
107 try:
108 func = func.__call__
109 except AttributeError:
110 return None
111 if eat_self:
112 sig_func = partial(func, None)
113 else:
114 sig_func = func
115 try:
116 return func, inspect.signature(sig_func)
117 except ValueError:
118 # Certain callable types are not supported by inspect.signature()
119 return None
122def _check_signature(func, mock, skipfirst, instance=False):
123 sig = _get_signature_object(func, instance, skipfirst)
124 if sig is None:
125 return
126 func, sig = sig
127 def checksig(_mock_self, *args, **kwargs):
128 sig.bind(*args, **kwargs)
129 _copy_func_details(func, checksig)
130 type(mock)._mock_check_sig = checksig
131 type(mock).__signature__ = sig
134def _copy_func_details(func, funcopy):
135 # we explicitly don't copy func.__dict__ into this copy as it would
136 # expose original attributes that should be mocked
137 for attribute in (
138 '__name__', '__doc__', '__text_signature__',
139 '__module__', '__defaults__', '__kwdefaults__',
140 ):
141 try:
142 setattr(funcopy, attribute, getattr(func, attribute))
143 except AttributeError:
144 pass
147def _callable(obj):
148 if isinstance(obj, type):
149 return True
150 if isinstance(obj, (staticmethod, classmethod, MethodType)):
151 return _callable(obj.__func__)
152 if getattr(obj, '__call__', None) is not None:
153 return True
154 return False
157def _is_list(obj):
158 # checks for list or tuples
159 # XXXX badly named!
160 return type(obj) in (list, tuple)
163def _instance_callable(obj):
164 """Given an object, return True if the object is callable.
165 For classes, return True if instances would be callable."""
166 if not isinstance(obj, type):
167 # already an instance
168 return getattr(obj, '__call__', None) is not None
170 # *could* be broken by a class overriding __mro__ or __dict__ via
171 # a metaclass
172 for base in (obj,) + obj.__mro__:
173 if base.__dict__.get('__call__') is not None:
174 return True
175 return False
178def _set_signature(mock, original, instance=False):
179 # creates a function with signature (*args, **kwargs) that delegates to a
180 # mock. It still does signature checking by calling a lambda with the same
181 # signature as the original.
183 skipfirst = isinstance(original, type)
184 result = _get_signature_object(original, instance, skipfirst)
185 if result is None:
186 return mock
187 func, sig = result
188 def checksig(*args, **kwargs):
189 sig.bind(*args, **kwargs)
190 _copy_func_details(func, checksig)
192 name = original.__name__
193 if not name.isidentifier():
194 name = 'funcopy'
195 context = {'_checksig_': checksig, 'mock': mock}
196 src = """def %s(*args, **kwargs):
197 _checksig_(*args, **kwargs)
198 return mock(*args, **kwargs)""" % name
199 exec (src, context)
200 funcopy = context[name]
201 _setup_func(funcopy, mock, sig)
202 return funcopy
205def _setup_func(funcopy, mock, sig):
206 funcopy.mock = mock
208 def assert_called_with(*args, **kwargs):
209 return mock.assert_called_with(*args, **kwargs)
210 def assert_called(*args, **kwargs):
211 return mock.assert_called(*args, **kwargs)
212 def assert_not_called(*args, **kwargs):
213 return mock.assert_not_called(*args, **kwargs)
214 def assert_called_once(*args, **kwargs):
215 return mock.assert_called_once(*args, **kwargs)
216 def assert_called_once_with(*args, **kwargs):
217 return mock.assert_called_once_with(*args, **kwargs)
218 def assert_has_calls(*args, **kwargs):
219 return mock.assert_has_calls(*args, **kwargs)
220 def assert_any_call(*args, **kwargs):
221 return mock.assert_any_call(*args, **kwargs)
222 def reset_mock():
223 funcopy.method_calls = _CallList()
224 funcopy.mock_calls = _CallList()
225 mock.reset_mock()
226 ret = funcopy.return_value
227 if _is_instance_mock(ret) and not ret is mock:
228 ret.reset_mock()
230 funcopy.called = False
231 funcopy.call_count = 0
232 funcopy.call_args = None
233 funcopy.call_args_list = _CallList()
234 funcopy.method_calls = _CallList()
235 funcopy.mock_calls = _CallList()
237 funcopy.return_value = mock.return_value
238 funcopy.side_effect = mock.side_effect
239 funcopy._mock_children = mock._mock_children
241 funcopy.assert_called_with = assert_called_with
242 funcopy.assert_called_once_with = assert_called_once_with
243 funcopy.assert_has_calls = assert_has_calls
244 funcopy.assert_any_call = assert_any_call
245 funcopy.reset_mock = reset_mock
246 funcopy.assert_called = assert_called
247 funcopy.assert_not_called = assert_not_called
248 funcopy.assert_called_once = assert_called_once
249 funcopy.__signature__ = sig
251 mock._mock_delegate = funcopy
254def _setup_async_mock(mock):
255 mock._is_coroutine = asyncio.coroutines._is_coroutine
256 mock.await_count = 0
257 mock.await_args = None
258 mock.await_args_list = _CallList()
260 # Mock is not configured yet so the attributes are set
261 # to a function and then the corresponding mock helper function
262 # is called when the helper is accessed similar to _setup_func.
263 def wrapper(attr, *args, **kwargs):
264 return getattr(mock.mock, attr)(*args, **kwargs)
266 for attribute in ('assert_awaited',
267 'assert_awaited_once',
268 'assert_awaited_with',
269 'assert_awaited_once_with',
270 'assert_any_await',
271 'assert_has_awaits',
272 'assert_not_awaited'):
274 # setattr(mock, attribute, wrapper) causes late binding
275 # hence attribute will always be the last value in the loop
276 # Use partial(wrapper, attribute) to ensure the attribute is bound
277 # correctly.
278 setattr(mock, attribute, partial(wrapper, attribute))
281def _is_magic(name):
282 return '__%s__' % name[2:-2] == name
285class _SentinelObject(object):
286 "A unique, named, sentinel object."
287 def __init__(self, name):
288 self.name = name
290 def __repr__(self):
291 return 'sentinel.%s' % self.name
293 def __reduce__(self):
294 return 'sentinel.%s' % self.name
297class _Sentinel(object):
298 """Access attributes to return a named object, usable as a sentinel."""
299 def __init__(self):
300 self._sentinels = {}
302 def __getattr__(self, name):
303 if name == '__bases__':
304 # Without this help(unittest.mock) raises an exception
305 raise AttributeError
306 return self._sentinels.setdefault(name, _SentinelObject(name))
308 def __reduce__(self):
309 return 'sentinel'
312sentinel = _Sentinel()
314DEFAULT = sentinel.DEFAULT
315_missing = sentinel.MISSING
316_deleted = sentinel.DELETED
319_allowed_names = {
320 'return_value', '_mock_return_value', 'side_effect',
321 '_mock_side_effect', '_mock_parent', '_mock_new_parent',
322 '_mock_name', '_mock_new_name'
323}
326def _delegating_property(name):
327 _allowed_names.add(name)
328 _the_name = '_mock_' + name
329 def _get(self, name=name, _the_name=_the_name):
330 sig = self._mock_delegate
331 if sig is None:
332 return getattr(self, _the_name)
333 return getattr(sig, name)
334 def _set(self, value, name=name, _the_name=_the_name):
335 sig = self._mock_delegate
336 if sig is None:
337 self.__dict__[_the_name] = value
338 else:
339 setattr(sig, name, value)
341 return property(_get, _set)
345class _CallList(list):
347 def __contains__(self, value):
348 if not isinstance(value, list):
349 return list.__contains__(self, value)
350 len_value = len(value)
351 len_self = len(self)
352 if len_value > len_self:
353 return False
355 for i in range(0, len_self - len_value + 1):
356 sub_list = self[i:i+len_value]
357 if sub_list == value:
358 return True
359 return False
361 def __repr__(self):
362 return pprint.pformat(list(self))
365def _check_and_set_parent(parent, value, name, new_name):
366 value = _extract_mock(value)
368 if not _is_instance_mock(value):
369 return False
370 if ((value._mock_name or value._mock_new_name) or
371 (value._mock_parent is not None) or
372 (value._mock_new_parent is not None)):
373 return False
375 _parent = parent
376 while _parent is not None:
377 # setting a mock (value) as a child or return value of itself
378 # should not modify the mock
379 if _parent is value:
380 return False
381 _parent = _parent._mock_new_parent
383 if new_name:
384 value._mock_new_parent = parent
385 value._mock_new_name = new_name
386 if name:
387 value._mock_parent = parent
388 value._mock_name = name
389 return True
391# Internal class to identify if we wrapped an iterator object or not.
392class _MockIter(object):
393 def __init__(self, obj):
394 self.obj = iter(obj)
395 def __next__(self):
396 return next(self.obj)
398class Base(object):
399 _mock_return_value = DEFAULT
400 _mock_side_effect = None
401 def __init__(self, *args, **kwargs):
402 pass
406class NonCallableMock(Base):
407 """A non-callable version of `Mock`"""
409 # Store a mutex as a class attribute in order to protect concurrent access
410 # to mock attributes. Using a class attribute allows all NonCallableMock
411 # instances to share the mutex for simplicity.
412 #
413 # See https://github.com/python/cpython/issues/98624 for why this is
414 # necessary.
415 _lock = RLock()
417 def __new__(
418 cls, spec=None, wraps=None, name=None, spec_set=None,
419 parent=None, _spec_state=None, _new_name='', _new_parent=None,
420 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs
421 ):
422 # every instance has its own class
423 # so we can create magic methods on the
424 # class without stomping on other mocks
425 bases = (cls,)
426 if not issubclass(cls, AsyncMockMixin):
427 # Check if spec is an async object or function
428 spec_arg = spec_set or spec
429 if spec_arg is not None and _is_async_obj(spec_arg):
430 bases = (AsyncMockMixin, cls)
431 new = type(cls.__name__, bases, {'__doc__': cls.__doc__})
432 instance = _safe_super(NonCallableMock, cls).__new__(new)
433 return instance
436 def __init__(
437 self, spec=None, wraps=None, name=None, spec_set=None,
438 parent=None, _spec_state=None, _new_name='', _new_parent=None,
439 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs
440 ):
441 if _new_parent is None:
442 _new_parent = parent
444 __dict__ = self.__dict__
445 __dict__['_mock_parent'] = parent
446 __dict__['_mock_name'] = name
447 __dict__['_mock_new_name'] = _new_name
448 __dict__['_mock_new_parent'] = _new_parent
449 __dict__['_mock_sealed'] = False
451 if spec_set is not None:
452 spec = spec_set
453 spec_set = True
454 if _eat_self is None:
455 _eat_self = parent is not None
457 self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self)
459 __dict__['_mock_children'] = {}
460 __dict__['_mock_wraps'] = wraps
461 __dict__['_mock_delegate'] = None
463 __dict__['_mock_called'] = False
464 __dict__['_mock_call_args'] = None
465 __dict__['_mock_call_count'] = 0
466 __dict__['_mock_call_args_list'] = _CallList()
467 __dict__['_mock_mock_calls'] = _CallList()
469 __dict__['method_calls'] = _CallList()
470 __dict__['_mock_unsafe'] = unsafe
472 if kwargs:
473 self.configure_mock(**kwargs)
475 _safe_super(NonCallableMock, self).__init__(
476 spec, wraps, name, spec_set, parent,
477 _spec_state
478 )
481 def attach_mock(self, mock, attribute):
482 """
483 Attach a mock as an attribute of this one, replacing its name and
484 parent. Calls to the attached mock will be recorded in the
485 `method_calls` and `mock_calls` attributes of this one."""
486 inner_mock = _extract_mock(mock)
488 inner_mock._mock_parent = None
489 inner_mock._mock_new_parent = None
490 inner_mock._mock_name = ''
491 inner_mock._mock_new_name = None
493 setattr(self, attribute, mock)
496 def mock_add_spec(self, spec, spec_set=False):
497 """Add a spec to a mock. `spec` can either be an object or a
498 list of strings. Only attributes on the `spec` can be fetched as
499 attributes from the mock.
501 If `spec_set` is True then only attributes on the spec can be set."""
502 self._mock_add_spec(spec, spec_set)
505 def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False,
506 _eat_self=False):
507 if _is_instance_mock(spec):
508 raise InvalidSpecError(f'Cannot spec a Mock object. [object={spec!r}]')
510 _spec_class = None
511 _spec_signature = None
512 _spec_asyncs = []
514 if spec is not None and not _is_list(spec):
515 if isinstance(spec, type):
516 _spec_class = spec
517 else:
518 _spec_class = type(spec)
519 res = _get_signature_object(spec,
520 _spec_as_instance, _eat_self)
521 _spec_signature = res and res[1]
523 spec_list = dir(spec)
525 for attr in spec_list:
526 if iscoroutinefunction(getattr(spec, attr, None)):
527 _spec_asyncs.append(attr)
529 spec = spec_list
531 __dict__ = self.__dict__
532 __dict__['_spec_class'] = _spec_class
533 __dict__['_spec_set'] = spec_set
534 __dict__['_spec_signature'] = _spec_signature
535 __dict__['_mock_methods'] = spec
536 __dict__['_spec_asyncs'] = _spec_asyncs
538 def __get_return_value(self):
539 ret = self._mock_return_value
540 if self._mock_delegate is not None:
541 ret = self._mock_delegate.return_value
543 if ret is DEFAULT:
544 ret = self._get_child_mock(
545 _new_parent=self, _new_name='()'
546 )
547 self.return_value = ret
548 return ret
551 def __set_return_value(self, value):
552 if self._mock_delegate is not None:
553 self._mock_delegate.return_value = value
554 else:
555 self._mock_return_value = value
556 _check_and_set_parent(self, value, None, '()')
558 __return_value_doc = "The value to be returned when the mock is called."
559 return_value = property(__get_return_value, __set_return_value,
560 __return_value_doc)
563 @property
564 def __class__(self):
565 if self._spec_class is None:
566 return type(self)
567 return self._spec_class
569 called = _delegating_property('called')
570 call_count = _delegating_property('call_count')
571 call_args = _delegating_property('call_args')
572 call_args_list = _delegating_property('call_args_list')
573 mock_calls = _delegating_property('mock_calls')
576 def __get_side_effect(self):
577 delegated = self._mock_delegate
578 if delegated is None:
579 return self._mock_side_effect
580 sf = delegated.side_effect
581 if (sf is not None and not callable(sf)
582 and not isinstance(sf, _MockIter) and not _is_exception(sf)):
583 sf = _MockIter(sf)
584 delegated.side_effect = sf
585 return sf
587 def __set_side_effect(self, value):
588 value = _try_iter(value)
589 delegated = self._mock_delegate
590 if delegated is None:
591 self._mock_side_effect = value
592 else:
593 delegated.side_effect = value
595 side_effect = property(__get_side_effect, __set_side_effect)
598 def reset_mock(self, visited=None,*, return_value=False, side_effect=False):
599 "Restore the mock object to its initial state."
600 if visited is None:
601 visited = []
602 if id(self) in visited:
603 return
604 visited.append(id(self))
606 self.called = False
607 self.call_args = None
608 self.call_count = 0
609 self.mock_calls = _CallList()
610 self.call_args_list = _CallList()
611 self.method_calls = _CallList()
613 if return_value:
614 self._mock_return_value = DEFAULT
615 if side_effect:
616 self._mock_side_effect = None
618 for child in self._mock_children.values():
619 if isinstance(child, _SpecState) or child is _deleted:
620 continue
621 child.reset_mock(visited, return_value=return_value, side_effect=side_effect)
623 ret = self._mock_return_value
624 if _is_instance_mock(ret) and ret is not self:
625 ret.reset_mock(visited)
628 def configure_mock(self, **kwargs):
629 """Set attributes on the mock through keyword arguments.
631 Attributes plus return values and side effects can be set on child
632 mocks using standard dot notation and unpacking a dictionary in the
633 method call:
635 >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError}
636 >>> mock.configure_mock(**attrs)"""
637 for arg, val in sorted(kwargs.items(),
638 # we sort on the number of dots so that
639 # attributes are set before we set attributes on
640 # attributes
641 key=lambda entry: entry[0].count('.')):
642 args = arg.split('.')
643 final = args.pop()
644 obj = self
645 for entry in args:
646 obj = getattr(obj, entry)
647 setattr(obj, final, val)
650 def __getattr__(self, name):
651 if name in {'_mock_methods', '_mock_unsafe'}:
652 raise AttributeError(name)
653 elif self._mock_methods is not None:
654 if name not in self._mock_methods or name in _all_magics:
655 raise AttributeError("Mock object has no attribute %r" % name)
656 elif _is_magic(name):
657 raise AttributeError(name)
658 if not self._mock_unsafe and (not self._mock_methods or name not in self._mock_methods):
659 if name.startswith(('assert', 'assret', 'asert', 'aseert', 'assrt')) or name in _ATTRIB_DENY_LIST:
660 raise AttributeError(
661 f"{name!r} is not a valid assertion. Use a spec "
662 f"for the mock if {name!r} is meant to be an attribute.")
664 with NonCallableMock._lock:
665 result = self._mock_children.get(name)
666 if result is _deleted:
667 raise AttributeError(name)
668 elif result is None:
669 wraps = None
670 if self._mock_wraps is not None:
671 # XXXX should we get the attribute without triggering code
672 # execution?
673 wraps = getattr(self._mock_wraps, name)
675 result = self._get_child_mock(
676 parent=self, name=name, wraps=wraps, _new_name=name,
677 _new_parent=self
678 )
679 self._mock_children[name] = result
681 elif isinstance(result, _SpecState):
682 try:
683 result = create_autospec(
684 result.spec, result.spec_set, result.instance,
685 result.parent, result.name
686 )
687 except InvalidSpecError:
688 target_name = self.__dict__['_mock_name'] or self
689 raise InvalidSpecError(
690 f'Cannot autospec attr {name!r} from target '
691 f'{target_name!r} as it has already been mocked out. '
692 f'[target={self!r}, attr={result.spec!r}]')
693 self._mock_children[name] = result
695 return result
698 def _extract_mock_name(self):
699 _name_list = [self._mock_new_name]
700 _parent = self._mock_new_parent
701 last = self
703 dot = '.'
704 if _name_list == ['()']:
705 dot = ''
707 while _parent is not None:
708 last = _parent
710 _name_list.append(_parent._mock_new_name + dot)
711 dot = '.'
712 if _parent._mock_new_name == '()':
713 dot = ''
715 _parent = _parent._mock_new_parent
717 _name_list = list(reversed(_name_list))
718 _first = last._mock_name or 'mock'
719 if len(_name_list) > 1:
720 if _name_list[1] not in ('()', '().'):
721 _first += '.'
722 _name_list[0] = _first
723 return ''.join(_name_list)
725 def __repr__(self):
726 name = self._extract_mock_name()
728 name_string = ''
729 if name not in ('mock', 'mock.'):
730 name_string = ' name=%r' % name
732 spec_string = ''
733 if self._spec_class is not None:
734 spec_string = ' spec=%r'
735 if self._spec_set:
736 spec_string = ' spec_set=%r'
737 spec_string = spec_string % self._spec_class.__name__
738 return "<%s%s%s id='%s'>" % (
739 type(self).__name__,
740 name_string,
741 spec_string,
742 id(self)
743 )
746 def __dir__(self):
747 """Filter the output of `dir(mock)` to only useful members."""
748 if not FILTER_DIR:
749 return object.__dir__(self)
751 extras = self._mock_methods or []
752 from_type = dir(type(self))
753 from_dict = list(self.__dict__)
754 from_child_mocks = [
755 m_name for m_name, m_value in self._mock_children.items()
756 if m_value is not _deleted]
758 from_type = [e for e in from_type if not e.startswith('_')]
759 from_dict = [e for e in from_dict if not e.startswith('_') or
760 _is_magic(e)]
761 return sorted(set(extras + from_type + from_dict + from_child_mocks))
764 def __setattr__(self, name, value):
765 if name in _allowed_names:
766 # property setters go through here
767 return object.__setattr__(self, name, value)
768 elif (self._spec_set and self._mock_methods is not None and
769 name not in self._mock_methods and
770 name not in self.__dict__):
771 raise AttributeError("Mock object has no attribute '%s'" % name)
772 elif name in _unsupported_magics:
773 msg = 'Attempting to set unsupported magic method %r.' % name
774 raise AttributeError(msg)
775 elif name in _all_magics:
776 if self._mock_methods is not None and name not in self._mock_methods:
777 raise AttributeError("Mock object has no attribute '%s'" % name)
779 if not _is_instance_mock(value):
780 setattr(type(self), name, _get_method(name, value))
781 original = value
782 value = lambda *args, **kw: original(self, *args, **kw)
783 else:
784 # only set _new_name and not name so that mock_calls is tracked
785 # but not method calls
786 _check_and_set_parent(self, value, None, name)
787 setattr(type(self), name, value)
788 self._mock_children[name] = value
789 elif name == '__class__':
790 self._spec_class = value
791 return
792 else:
793 if _check_and_set_parent(self, value, name, name):
794 self._mock_children[name] = value
796 if self._mock_sealed and not hasattr(self, name):
797 mock_name = f'{self._extract_mock_name()}.{name}'
798 raise AttributeError(f'Cannot set {mock_name}')
800 return object.__setattr__(self, name, value)
803 def __delattr__(self, name):
804 if name in _all_magics and name in type(self).__dict__:
805 delattr(type(self), name)
806 if name not in self.__dict__:
807 # for magic methods that are still MagicProxy objects and
808 # not set on the instance itself
809 return
811 obj = self._mock_children.get(name, _missing)
812 if name in self.__dict__:
813 _safe_super(NonCallableMock, self).__delattr__(name)
814 elif obj is _deleted:
815 raise AttributeError(name)
816 if obj is not _missing:
817 del self._mock_children[name]
818 self._mock_children[name] = _deleted
821 def _format_mock_call_signature(self, args, kwargs):
822 name = self._mock_name or 'mock'
823 return _format_call_signature(name, args, kwargs)
826 def _format_mock_failure_message(self, args, kwargs, action='call'):
827 message = 'expected %s not found.\nExpected: %s\nActual: %s'
828 expected_string = self._format_mock_call_signature(args, kwargs)
829 call_args = self.call_args
830 actual_string = self._format_mock_call_signature(*call_args)
831 return message % (action, expected_string, actual_string)
834 def _get_call_signature_from_name(self, name):
835 """
836 * If call objects are asserted against a method/function like obj.meth1
837 then there could be no name for the call object to lookup. Hence just
838 return the spec_signature of the method/function being asserted against.
839 * If the name is not empty then remove () and split by '.' to get
840 list of names to iterate through the children until a potential
841 match is found. A child mock is created only during attribute access
842 so if we get a _SpecState then no attributes of the spec were accessed
843 and can be safely exited.
844 """
845 if not name:
846 return self._spec_signature
848 sig = None
849 names = name.replace('()', '').split('.')
850 children = self._mock_children
852 for name in names:
853 child = children.get(name)
854 if child is None or isinstance(child, _SpecState):
855 break
856 else:
857 # If an autospecced object is attached using attach_mock the
858 # child would be a function with mock object as attribute from
859 # which signature has to be derived.
860 child = _extract_mock(child)
861 children = child._mock_children
862 sig = child._spec_signature
864 return sig
867 def _call_matcher(self, _call):
868 """
869 Given a call (or simply an (args, kwargs) tuple), return a
870 comparison key suitable for matching with other calls.
871 This is a best effort method which relies on the spec's signature,
872 if available, or falls back on the arguments themselves.
873 """
875 if isinstance(_call, tuple) and len(_call) > 2:
876 sig = self._get_call_signature_from_name(_call[0])
877 else:
878 sig = self._spec_signature
880 if sig is not None:
881 if len(_call) == 2:
882 name = ''
883 args, kwargs = _call
884 else:
885 name, args, kwargs = _call
886 try:
887 bound_call = sig.bind(*args, **kwargs)
888 return call(name, bound_call.args, bound_call.kwargs)
889 except TypeError as e:
890 return e.with_traceback(None)
891 else:
892 return _call
894 def assert_not_called(_mock_self):
895 """assert that the mock was never called.
896 """
897 self = _mock_self
898 if self.call_count != 0:
899 msg = ("Expected '%s' to not have been called. Called %s times.%s"
900 % (self._mock_name or 'mock',
901 self.call_count,
902 self._calls_repr()))
903 raise AssertionError(msg)
905 def assert_called(_mock_self):
906 """assert that the mock was called at least once
907 """
908 self = _mock_self
909 if self.call_count == 0:
910 msg = ("Expected '%s' to have been called." %
911 (self._mock_name or 'mock'))
912 raise AssertionError(msg)
914 def assert_called_once(_mock_self):
915 """assert that the mock was called only once.
916 """
917 self = _mock_self
918 if not self.call_count == 1:
919 msg = ("Expected '%s' to have been called once. Called %s times.%s"
920 % (self._mock_name or 'mock',
921 self.call_count,
922 self._calls_repr()))
923 raise AssertionError(msg)
925 def assert_called_with(_mock_self, *args, **kwargs):
926 """assert that the last call was made with the specified arguments.
928 Raises an AssertionError if the args and keyword args passed in are
929 different to the last call to the mock."""
930 self = _mock_self
931 if self.call_args is None:
932 expected = self._format_mock_call_signature(args, kwargs)
933 actual = 'not called.'
934 error_message = ('expected call not found.\nExpected: %s\nActual: %s'
935 % (expected, actual))
936 raise AssertionError(error_message)
938 def _error_message():
939 msg = self._format_mock_failure_message(args, kwargs)
940 return msg
941 expected = self._call_matcher(_Call((args, kwargs), two=True))
942 actual = self._call_matcher(self.call_args)
943 if actual != expected:
944 cause = expected if isinstance(expected, Exception) else None
945 raise AssertionError(_error_message()) from cause
948 def assert_called_once_with(_mock_self, *args, **kwargs):
949 """assert that the mock was called exactly once and that that call was
950 with the specified arguments."""
951 self = _mock_self
952 if not self.call_count == 1:
953 msg = ("Expected '%s' to be called once. Called %s times.%s"
954 % (self._mock_name or 'mock',
955 self.call_count,
956 self._calls_repr()))
957 raise AssertionError(msg)
958 return self.assert_called_with(*args, **kwargs)
961 def assert_has_calls(self, calls, any_order=False):
962 """assert the mock has been called with the specified calls.
963 The `mock_calls` list is checked for the calls.
965 If `any_order` is False (the default) then the calls must be
966 sequential. There can be extra calls before or after the
967 specified calls.
969 If `any_order` is True then the calls can be in any order, but
970 they must all appear in `mock_calls`."""
971 expected = [self._call_matcher(c) for c in calls]
972 cause = next((e for e in expected if isinstance(e, Exception)), None)
973 all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls)
974 if not any_order:
975 if expected not in all_calls:
976 if cause is None:
977 problem = 'Calls not found.'
978 else:
979 problem = ('Error processing expected calls.\n'
980 'Errors: {}').format(
981 [e if isinstance(e, Exception) else None
982 for e in expected])
983 raise AssertionError(
984 f'{problem}\n'
985 f'Expected: {_CallList(calls)}'
986 f'{self._calls_repr(prefix="Actual").rstrip(".")}'
987 ) from cause
988 return
990 all_calls = list(all_calls)
992 not_found = []
993 for kall in expected:
994 try:
995 all_calls.remove(kall)
996 except ValueError:
997 not_found.append(kall)
998 if not_found:
999 raise AssertionError(
1000 '%r does not contain all of %r in its call list, '
1001 'found %r instead' % (self._mock_name or 'mock',
1002 tuple(not_found), all_calls)
1003 ) from cause
1006 def assert_any_call(self, *args, **kwargs):
1007 """assert the mock has been called with the specified arguments.
1009 The assert passes if the mock has *ever* been called, unlike
1010 `assert_called_with` and `assert_called_once_with` that only pass if
1011 the call is the most recent one."""
1012 expected = self._call_matcher(_Call((args, kwargs), two=True))
1013 cause = expected if isinstance(expected, Exception) else None
1014 actual = [self._call_matcher(c) for c in self.call_args_list]
1015 if cause or expected not in _AnyComparer(actual):
1016 expected_string = self._format_mock_call_signature(args, kwargs)
1017 raise AssertionError(
1018 '%s call not found' % expected_string
1019 ) from cause
1022 def _get_child_mock(self, **kw):
1023 """Create the child mocks for attributes and return value.
1024 By default child mocks will be the same type as the parent.
1025 Subclasses of Mock may want to override this to customize the way
1026 child mocks are made.
1028 For non-callable mocks the callable variant will be used (rather than
1029 any custom subclass)."""
1030 if self._mock_sealed:
1031 attribute = f".{kw['name']}" if "name" in kw else "()"
1032 mock_name = self._extract_mock_name() + attribute
1033 raise AttributeError(mock_name)
1035 _new_name = kw.get("_new_name")
1036 if _new_name in self.__dict__['_spec_asyncs']:
1037 return AsyncMock(**kw)
1039 _type = type(self)
1040 if issubclass(_type, MagicMock) and _new_name in _async_method_magics:
1041 # Any asynchronous magic becomes an AsyncMock
1042 klass = AsyncMock
1043 elif issubclass(_type, AsyncMockMixin):
1044 if (_new_name in _all_sync_magics or
1045 self._mock_methods and _new_name in self._mock_methods):
1046 # Any synchronous method on AsyncMock becomes a MagicMock
1047 klass = MagicMock
1048 else:
1049 klass = AsyncMock
1050 elif not issubclass(_type, CallableMixin):
1051 if issubclass(_type, NonCallableMagicMock):
1052 klass = MagicMock
1053 elif issubclass(_type, NonCallableMock):
1054 klass = Mock
1055 else:
1056 klass = _type.__mro__[1]
1057 return klass(**kw)
1060 def _calls_repr(self, prefix="Calls"):
1061 """Renders self.mock_calls as a string.
1063 Example: "\nCalls: [call(1), call(2)]."
1065 If self.mock_calls is empty, an empty string is returned. The
1066 output will be truncated if very long.
1067 """
1068 if not self.mock_calls:
1069 return ""
1070 return f"\n{prefix}: {safe_repr(self.mock_calls)}."
1073try:
1074 removeprefix = str.removeprefix
1075except AttributeError:
1076 # Py 3.8 and earlier:
1077 def removeprefix(name, prefix):
1078 return name[len(prefix):]
1080# Denylist for forbidden attribute names in safe mode
1081_ATTRIB_DENY_LIST = frozenset({
1082 removeprefix(name, "assert_")
1083 for name in dir(NonCallableMock)
1084 if name.startswith("assert_")
1085})
1088class _AnyComparer(list):
1089 """A list which checks if it contains a call which may have an
1090 argument of ANY, flipping the components of item and self from
1091 their traditional locations so that ANY is guaranteed to be on
1092 the left."""
1093 def __contains__(self, item):
1094 for _call in self:
1095 assert len(item) == len(_call)
1096 if all([
1097 expected == actual
1098 for expected, actual in zip(item, _call)
1099 ]):
1100 return True
1101 return False
1104def _try_iter(obj):
1105 if obj is None:
1106 return obj
1107 if _is_exception(obj):
1108 return obj
1109 if _callable(obj):
1110 return obj
1111 try:
1112 return iter(obj)
1113 except TypeError:
1114 # XXXX backwards compatibility
1115 # but this will blow up on first call - so maybe we should fail early?
1116 return obj
1119class CallableMixin(Base):
1121 def __init__(self, spec=None, side_effect=None, return_value=DEFAULT,
1122 wraps=None, name=None, spec_set=None, parent=None,
1123 _spec_state=None, _new_name='', _new_parent=None, **kwargs):
1124 self.__dict__['_mock_return_value'] = return_value
1125 _safe_super(CallableMixin, self).__init__(
1126 spec, wraps, name, spec_set, parent,
1127 _spec_state, _new_name, _new_parent, **kwargs
1128 )
1130 self.side_effect = side_effect
1133 def _mock_check_sig(self, *args, **kwargs):
1134 # stub method that can be replaced with one with a specific signature
1135 pass
1138 def __call__(_mock_self, *args, **kwargs):
1139 # can't use self in-case a function / method we are mocking uses self
1140 # in the signature
1141 _mock_self._mock_check_sig(*args, **kwargs)
1142 _mock_self._increment_mock_call(*args, **kwargs)
1143 return _mock_self._mock_call(*args, **kwargs)
1146 def _mock_call(_mock_self, *args, **kwargs):
1147 return _mock_self._execute_mock_call(*args, **kwargs)
1149 def _increment_mock_call(_mock_self, *args, **kwargs):
1150 self = _mock_self
1151 self.called = True
1152 self.call_count += 1
1154 # handle call_args
1155 # needs to be set here so assertions on call arguments pass before
1156 # execution in the case of awaited calls
1157 _call = _Call((args, kwargs), two=True)
1158 self.call_args = _call
1159 self.call_args_list.append(_call)
1161 # initial stuff for method_calls:
1162 do_method_calls = self._mock_parent is not None
1163 method_call_name = self._mock_name
1165 # initial stuff for mock_calls:
1166 mock_call_name = self._mock_new_name
1167 is_a_call = mock_call_name == '()'
1168 self.mock_calls.append(_Call(('', args, kwargs)))
1170 # follow up the chain of mocks:
1171 _new_parent = self._mock_new_parent
1172 while _new_parent is not None:
1174 # handle method_calls:
1175 if do_method_calls:
1176 _new_parent.method_calls.append(_Call((method_call_name, args, kwargs)))
1177 do_method_calls = _new_parent._mock_parent is not None
1178 if do_method_calls:
1179 method_call_name = _new_parent._mock_name + '.' + method_call_name
1181 # handle mock_calls:
1182 this_mock_call = _Call((mock_call_name, args, kwargs))
1183 _new_parent.mock_calls.append(this_mock_call)
1185 if _new_parent._mock_new_name:
1186 if is_a_call:
1187 dot = ''
1188 else:
1189 dot = '.'
1190 is_a_call = _new_parent._mock_new_name == '()'
1191 mock_call_name = _new_parent._mock_new_name + dot + mock_call_name
1193 # follow the parental chain:
1194 _new_parent = _new_parent._mock_new_parent
1196 def _execute_mock_call(_mock_self, *args, **kwargs):
1197 self = _mock_self
1198 # separate from _increment_mock_call so that awaited functions are
1199 # executed separately from their call, also AsyncMock overrides this method
1201 effect = self.side_effect
1202 if effect is not None:
1203 if _is_exception(effect):
1204 raise effect
1205 elif not _callable(effect):
1206 result = next(effect)
1207 if _is_exception(result):
1208 raise result
1209 else:
1210 result = effect(*args, **kwargs)
1212 if result is not DEFAULT:
1213 return result
1215 if self._mock_return_value is not DEFAULT:
1216 return self.return_value
1218 if self._mock_wraps is not None:
1219 return self._mock_wraps(*args, **kwargs)
1221 return self.return_value
1225class Mock(CallableMixin, NonCallableMock):
1226 """
1227 Create a new `Mock` object. `Mock` takes several optional arguments
1228 that specify the behaviour of the Mock object:
1230 * `spec`: This can be either a list of strings or an existing object (a
1231 class or instance) that acts as the specification for the mock object. If
1232 you pass in an object then a list of strings is formed by calling dir on
1233 the object (excluding unsupported magic attributes and methods). Accessing
1234 any attribute not in this list will raise an `AttributeError`.
1236 If `spec` is an object (rather than a list of strings) then
1237 `mock.__class__` returns the class of the spec object. This allows mocks
1238 to pass `isinstance` tests.
1240 * `spec_set`: A stricter variant of `spec`. If used, attempting to *set*
1241 or get an attribute on the mock that isn't on the object passed as
1242 `spec_set` will raise an `AttributeError`.
1244 * `side_effect`: A function to be called whenever the Mock is called. See
1245 the `side_effect` attribute. Useful for raising exceptions or
1246 dynamically changing return values. The function is called with the same
1247 arguments as the mock, and unless it returns `DEFAULT`, the return
1248 value of this function is used as the return value.
1250 If `side_effect` is an iterable then each call to the mock will return
1251 the next value from the iterable. If any of the members of the iterable
1252 are exceptions they will be raised instead of returned.
1254 * `return_value`: The value returned when the mock is called. By default
1255 this is a new Mock (created on first access). See the
1256 `return_value` attribute.
1258 * `unsafe`: By default, accessing any attribute whose name starts with
1259 *assert*, *assret*, *asert*, *aseert*, or *assrt* raises an AttributeError.
1260 Additionally, an AttributeError is raised when accessing
1261 attributes that match the name of an assertion method without the prefix
1262 `assert_`, e.g. accessing `called_once` instead of `assert_called_once`.
1263 Passing `unsafe=True` will allow access to these attributes.
1265 * `wraps`: Item for the mock object to wrap. If `wraps` is not None then
1266 calling the Mock will pass the call through to the wrapped object
1267 (returning the real result). Attribute access on the mock will return a
1268 Mock object that wraps the corresponding attribute of the wrapped object
1269 (so attempting to access an attribute that doesn't exist will raise an
1270 `AttributeError`).
1272 If the mock has an explicit `return_value` set then calls are not passed
1273 to the wrapped object and the `return_value` is returned instead.
1275 * `name`: If the mock has a name then it will be used in the repr of the
1276 mock. This can be useful for debugging. The name is propagated to child
1277 mocks.
1279 Mocks can also be called with arbitrary keyword arguments. These will be
1280 used to set attributes on the mock after it is created.
1281 """
1284def _dot_lookup(thing, comp, import_path):
1285 try:
1286 return getattr(thing, comp)
1287 except AttributeError:
1288 __import__(import_path)
1289 return getattr(thing, comp)
1292def _importer(target):
1293 components = target.split('.')
1294 import_path = components.pop(0)
1295 thing = __import__(import_path)
1297 for comp in components:
1298 import_path += ".%s" % comp
1299 thing = _dot_lookup(thing, comp, import_path)
1300 return thing
1303# _check_spec_arg_typos takes kwargs from commands like patch and checks that
1304# they don't contain common misspellings of arguments related to autospeccing.
1305def _check_spec_arg_typos(kwargs_to_check):
1306 typos = ("autospect", "auto_spec", "set_spec")
1307 for typo in typos:
1308 if typo in kwargs_to_check:
1309 raise RuntimeError(
1310 f"{typo!r} might be a typo; use unsafe=True if this is intended"
1311 )
1314class _patch(object):
1316 attribute_name = None
1317 _active_patches = []
1319 def __init__(
1320 self, getter, attribute, new, spec, create,
1321 spec_set, autospec, new_callable, kwargs, *, unsafe=False
1322 ):
1323 if new_callable is not None:
1324 if new is not DEFAULT:
1325 raise ValueError(
1326 "Cannot use 'new' and 'new_callable' together"
1327 )
1328 if autospec is not None:
1329 raise ValueError(
1330 "Cannot use 'autospec' and 'new_callable' together"
1331 )
1332 if not unsafe:
1333 _check_spec_arg_typos(kwargs)
1334 if _is_instance_mock(spec):
1335 raise InvalidSpecError(
1336 f'Cannot spec attr {attribute!r} as the spec '
1337 f'has already been mocked out. [spec={spec!r}]')
1338 if _is_instance_mock(spec_set):
1339 raise InvalidSpecError(
1340 f'Cannot spec attr {attribute!r} as the spec_set '
1341 f'target has already been mocked out. [spec_set={spec_set!r}]')
1343 self.getter = getter
1344 self.attribute = attribute
1345 self.new = new
1346 self.new_callable = new_callable
1347 self.spec = spec
1348 self.create = create
1349 self.has_local = False
1350 self.spec_set = spec_set
1351 self.autospec = autospec
1352 self.kwargs = kwargs
1353 self.additional_patchers = []
1356 def copy(self):
1357 patcher = _patch(
1358 self.getter, self.attribute, self.new, self.spec,
1359 self.create, self.spec_set,
1360 self.autospec, self.new_callable, self.kwargs
1361 )
1362 patcher.attribute_name = self.attribute_name
1363 patcher.additional_patchers = [
1364 p.copy() for p in self.additional_patchers
1365 ]
1366 return patcher
1369 def __call__(self, func):
1370 if isinstance(func, type):
1371 return self.decorate_class(func)
1372 if inspect.iscoroutinefunction(func):
1373 return self.decorate_async_callable(func)
1374 return self.decorate_callable(func)
1377 def decorate_class(self, klass):
1378 for attr in dir(klass):
1379 if not attr.startswith(patch.TEST_PREFIX):
1380 continue
1382 attr_value = getattr(klass, attr)
1383 if not hasattr(attr_value, "__call__"):
1384 continue
1386 patcher = self.copy()
1387 setattr(klass, attr, patcher(attr_value))
1388 return klass
1391 @contextlib.contextmanager
1392 def decoration_helper(self, patched, args, keywargs):
1393 extra_args = []
1394 with contextlib.ExitStack() as exit_stack:
1395 for patching in patched.patchings:
1396 arg = exit_stack.enter_context(patching)
1397 if patching.attribute_name is not None:
1398 keywargs.update(arg)
1399 elif patching.new is DEFAULT:
1400 extra_args.append(arg)
1402 args += tuple(extra_args)
1403 yield (args, keywargs)
1406 def decorate_callable(self, func):
1407 # NB. Keep the method in sync with decorate_async_callable()
1408 if hasattr(func, 'patchings'):
1409 func.patchings.append(self)
1410 return func
1412 @wraps(func)
1413 def patched(*args, **keywargs):
1414 with self.decoration_helper(patched,
1415 args,
1416 keywargs) as (newargs, newkeywargs):
1417 return func(*newargs, **newkeywargs)
1419 patched.patchings = [self]
1420 return patched
1423 def decorate_async_callable(self, func):
1424 # NB. Keep the method in sync with decorate_callable()
1425 if hasattr(func, 'patchings'):
1426 func.patchings.append(self)
1427 return func
1429 @wraps(func)
1430 async def patched(*args, **keywargs):
1431 with self.decoration_helper(patched,
1432 args,
1433 keywargs) as (newargs, newkeywargs):
1434 return await func(*newargs, **newkeywargs)
1436 patched.patchings = [self]
1437 return patched
1440 def get_original(self):
1441 target = self.getter()
1442 name = self.attribute
1444 original = DEFAULT
1445 local = False
1447 try:
1448 original = target.__dict__[name]
1449 except (AttributeError, KeyError):
1450 original = getattr(target, name, DEFAULT)
1451 else:
1452 local = True
1454 if name in _builtins and isinstance(target, ModuleType):
1455 self.create = True
1457 if not self.create and original is DEFAULT:
1458 raise AttributeError(
1459 "%s does not have the attribute %r" % (target, name)
1460 )
1461 return original, local
1464 def __enter__(self):
1465 """Perform the patch."""
1466 new, spec, spec_set = self.new, self.spec, self.spec_set
1467 autospec, kwargs = self.autospec, self.kwargs
1468 new_callable = self.new_callable
1469 self.target = self.getter()
1471 # normalise False to None
1472 if spec is False:
1473 spec = None
1474 if spec_set is False:
1475 spec_set = None
1476 if autospec is False:
1477 autospec = None
1479 if spec is not None and autospec is not None:
1480 raise TypeError("Can't specify spec and autospec")
1481 if ((spec is not None or autospec is not None) and
1482 spec_set not in (True, None)):
1483 raise TypeError("Can't provide explicit spec_set *and* spec or autospec")
1485 original, local = self.get_original()
1487 if new is DEFAULT and autospec is None:
1488 inherit = False
1489 if spec is True:
1490 # set spec to the object we are replacing
1491 spec = original
1492 if spec_set is True:
1493 spec_set = original
1494 spec = None
1495 elif spec is not None:
1496 if spec_set is True:
1497 spec_set = spec
1498 spec = None
1499 elif spec_set is True:
1500 spec_set = original
1502 if spec is not None or spec_set is not None:
1503 if original is DEFAULT:
1504 raise TypeError("Can't use 'spec' with create=True")
1505 if isinstance(original, type):
1506 # If we're patching out a class and there is a spec
1507 inherit = True
1508 if spec is None and _is_async_obj(original):
1509 Klass = AsyncMock
1510 else:
1511 Klass = MagicMock
1512 _kwargs = {}
1513 if new_callable is not None:
1514 Klass = new_callable
1515 elif spec is not None or spec_set is not None:
1516 this_spec = spec
1517 if spec_set is not None:
1518 this_spec = spec_set
1519 if _is_list(this_spec):
1520 not_callable = '__call__' not in this_spec
1521 else:
1522 not_callable = not callable(this_spec)
1523 if _is_async_obj(this_spec):
1524 Klass = AsyncMock
1525 elif not_callable:
1526 Klass = NonCallableMagicMock
1528 if spec is not None:
1529 _kwargs['spec'] = spec
1530 if spec_set is not None:
1531 _kwargs['spec_set'] = spec_set
1533 # add a name to mocks
1534 if (isinstance(Klass, type) and
1535 issubclass(Klass, NonCallableMock) and self.attribute):
1536 _kwargs['name'] = self.attribute
1538 _kwargs.update(kwargs)
1539 new = Klass(**_kwargs)
1541 if inherit and _is_instance_mock(new):
1542 # we can only tell if the instance should be callable if the
1543 # spec is not a list
1544 this_spec = spec
1545 if spec_set is not None:
1546 this_spec = spec_set
1547 if (not _is_list(this_spec) and not
1548 _instance_callable(this_spec)):
1549 Klass = NonCallableMagicMock
1551 _kwargs.pop('name')
1552 new.return_value = Klass(_new_parent=new, _new_name='()',
1553 **_kwargs)
1554 elif autospec is not None:
1555 # spec is ignored, new *must* be default, spec_set is treated
1556 # as a boolean. Should we check spec is not None and that spec_set
1557 # is a bool?
1558 if new is not DEFAULT:
1559 raise TypeError(
1560 "autospec creates the mock for you. Can't specify "
1561 "autospec and new."
1562 )
1563 if original is DEFAULT:
1564 raise TypeError("Can't use 'autospec' with create=True")
1565 spec_set = bool(spec_set)
1566 if autospec is True:
1567 autospec = original
1569 if _is_instance_mock(self.target):
1570 raise InvalidSpecError(
1571 f'Cannot autospec attr {self.attribute!r} as the patch '
1572 f'target has already been mocked out. '
1573 f'[target={self.target!r}, attr={autospec!r}]')
1574 if _is_instance_mock(autospec):
1575 target_name = getattr(self.target, '__name__', self.target)
1576 raise InvalidSpecError(
1577 f'Cannot autospec attr {self.attribute!r} from target '
1578 f'{target_name!r} as it has already been mocked out. '
1579 f'[target={self.target!r}, attr={autospec!r}]')
1581 new = create_autospec(autospec, spec_set=spec_set,
1582 _name=self.attribute, **kwargs)
1583 elif kwargs:
1584 # can't set keyword args when we aren't creating the mock
1585 # XXXX If new is a Mock we could call new.configure_mock(**kwargs)
1586 raise TypeError("Can't pass kwargs to a mock we aren't creating")
1588 new_attr = new
1590 self.temp_original = original
1591 self.is_local = local
1592 self._exit_stack = contextlib.ExitStack()
1593 try:
1594 setattr(self.target, self.attribute, new_attr)
1595 if self.attribute_name is not None:
1596 extra_args = {}
1597 if self.new is DEFAULT:
1598 extra_args[self.attribute_name] = new
1599 for patching in self.additional_patchers:
1600 arg = self._exit_stack.enter_context(patching)
1601 if patching.new is DEFAULT:
1602 extra_args.update(arg)
1603 return extra_args
1605 return new
1606 except:
1607 if not self.__exit__(*sys.exc_info()):
1608 raise
1610 def __exit__(self, *exc_info):
1611 """Undo the patch."""
1612 if self.is_local and self.temp_original is not DEFAULT:
1613 setattr(self.target, self.attribute, self.temp_original)
1614 else:
1615 delattr(self.target, self.attribute)
1616 if not self.create and (not hasattr(self.target, self.attribute) or
1617 self.attribute in ('__doc__', '__module__',
1618 '__defaults__', '__annotations__',
1619 '__kwdefaults__')):
1620 # needed for proxy objects like django settings
1621 setattr(self.target, self.attribute, self.temp_original)
1623 del self.temp_original
1624 del self.is_local
1625 del self.target
1626 exit_stack = self._exit_stack
1627 del self._exit_stack
1628 return exit_stack.__exit__(*exc_info)
1631 def start(self):
1632 """Activate a patch, returning any created mock."""
1633 result = self.__enter__()
1634 self._active_patches.append(self)
1635 return result
1638 def stop(self):
1639 """Stop an active patch."""
1640 try:
1641 self._active_patches.remove(self)
1642 except ValueError:
1643 # If the patch hasn't been started this will fail
1644 return None
1646 return self.__exit__(None, None, None)
1650def _get_target(target):
1651 try:
1652 target, attribute = target.rsplit('.', 1)
1653 except (TypeError, ValueError, AttributeError):
1654 raise TypeError(
1655 f"Need a valid target to patch. You supplied: {target!r}")
1656 getter = lambda: _importer(target)
1657 return getter, attribute
1660def _patch_object(
1661 target, attribute, new=DEFAULT, spec=None,
1662 create=False, spec_set=None, autospec=None,
1663 new_callable=None, *, unsafe=False, **kwargs
1664 ):
1665 """
1666 patch the named member (`attribute`) on an object (`target`) with a mock
1667 object.
1669 `patch.object` can be used as a decorator, class decorator or a context
1670 manager. Arguments `new`, `spec`, `create`, `spec_set`,
1671 `autospec` and `new_callable` have the same meaning as for `patch`. Like
1672 `patch`, `patch.object` takes arbitrary keyword arguments for configuring
1673 the mock object it creates.
1675 When used as a class decorator `patch.object` honours `patch.TEST_PREFIX`
1676 for choosing which methods to wrap.
1677 """
1678 if type(target) is str:
1679 raise TypeError(
1680 f"{target!r} must be the actual object to be patched, not a str"
1681 )
1682 getter = lambda: target
1683 return _patch(
1684 getter, attribute, new, spec, create,
1685 spec_set, autospec, new_callable, kwargs, unsafe=unsafe
1686 )
1689def _patch_multiple(target, spec=None, create=False, spec_set=None,
1690 autospec=None, new_callable=None, **kwargs):
1691 """Perform multiple patches in a single call. It takes the object to be
1692 patched (either as an object or a string to fetch the object by importing)
1693 and keyword arguments for the patches::
1695 with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'):
1696 ...
1698 Use `DEFAULT` as the value if you want `patch.multiple` to create
1699 mocks for you. In this case the created mocks are passed into a decorated
1700 function by keyword, and a dictionary is returned when `patch.multiple` is
1701 used as a context manager.
1703 `patch.multiple` can be used as a decorator, class decorator or a context
1704 manager. The arguments `spec`, `spec_set`, `create`,
1705 `autospec` and `new_callable` have the same meaning as for `patch`. These
1706 arguments will be applied to *all* patches done by `patch.multiple`.
1708 When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX`
1709 for choosing which methods to wrap.
1710 """
1711 if type(target) is str:
1712 getter = lambda: _importer(target)
1713 else:
1714 getter = lambda: target
1716 if not kwargs:
1717 raise ValueError(
1718 'Must supply at least one keyword argument with patch.multiple'
1719 )
1720 # need to wrap in a list for python 3, where items is a view
1721 items = list(kwargs.items())
1722 attribute, new = items[0]
1723 patcher = _patch(
1724 getter, attribute, new, spec, create, spec_set,
1725 autospec, new_callable, {}
1726 )
1727 patcher.attribute_name = attribute
1728 for attribute, new in items[1:]:
1729 this_patcher = _patch(
1730 getter, attribute, new, spec, create, spec_set,
1731 autospec, new_callable, {}
1732 )
1733 this_patcher.attribute_name = attribute
1734 patcher.additional_patchers.append(this_patcher)
1735 return patcher
1738def patch(
1739 target, new=DEFAULT, spec=None, create=False,
1740 spec_set=None, autospec=None, new_callable=None, *, unsafe=False, **kwargs
1741 ):
1742 """
1743 `patch` acts as a function decorator, class decorator or a context
1744 manager. Inside the body of the function or with statement, the `target`
1745 is patched with a `new` object. When the function/with statement exits
1746 the patch is undone.
1748 If `new` is omitted, then the target is replaced with an
1749 `AsyncMock if the patched object is an async function or a
1750 `MagicMock` otherwise. If `patch` is used as a decorator and `new` is
1751 omitted, the created mock is passed in as an extra argument to the
1752 decorated function. If `patch` is used as a context manager the created
1753 mock is returned by the context manager.
1755 `target` should be a string in the form `'package.module.ClassName'`. The
1756 `target` is imported and the specified object replaced with the `new`
1757 object, so the `target` must be importable from the environment you are
1758 calling `patch` from. The target is imported when the decorated function
1759 is executed, not at decoration time.
1761 The `spec` and `spec_set` keyword arguments are passed to the `MagicMock`
1762 if patch is creating one for you.
1764 In addition you can pass `spec=True` or `spec_set=True`, which causes
1765 patch to pass in the object being mocked as the spec/spec_set object.
1767 `new_callable` allows you to specify a different class, or callable object,
1768 that will be called to create the `new` object. By default `AsyncMock` is
1769 used for async functions and `MagicMock` for the rest.
1771 A more powerful form of `spec` is `autospec`. If you set `autospec=True`
1772 then the mock will be created with a spec from the object being replaced.
1773 All attributes of the mock will also have the spec of the corresponding
1774 attribute of the object being replaced. Methods and functions being
1775 mocked will have their arguments checked and will raise a `TypeError` if
1776 they are called with the wrong signature. For mocks replacing a class,
1777 their return value (the 'instance') will have the same spec as the class.
1779 Instead of `autospec=True` you can pass `autospec=some_object` to use an
1780 arbitrary object as the spec instead of the one being replaced.
1782 By default `patch` will fail to replace attributes that don't exist. If
1783 you pass in `create=True`, and the attribute doesn't exist, patch will
1784 create the attribute for you when the patched function is called, and
1785 delete it again afterwards. This is useful for writing tests against
1786 attributes that your production code creates at runtime. It is off by
1787 default because it can be dangerous. With it switched on you can write
1788 passing tests against APIs that don't actually exist!
1790 Patch can be used as a `TestCase` class decorator. It works by
1791 decorating each test method in the class. This reduces the boilerplate
1792 code when your test methods share a common patchings set. `patch` finds
1793 tests by looking for method names that start with `patch.TEST_PREFIX`.
1794 By default this is `test`, which matches the way `unittest` finds tests.
1795 You can specify an alternative prefix by setting `patch.TEST_PREFIX`.
1797 Patch can be used as a context manager, with the with statement. Here the
1798 patching applies to the indented block after the with statement. If you
1799 use "as" then the patched object will be bound to the name after the
1800 "as"; very useful if `patch` is creating a mock object for you.
1802 Patch will raise a `RuntimeError` if passed some common misspellings of
1803 the arguments autospec and spec_set. Pass the argument `unsafe` with the
1804 value True to disable that check.
1806 `patch` takes arbitrary keyword arguments. These will be passed to
1807 `AsyncMock` if the patched object is asynchronous, to `MagicMock`
1808 otherwise or to `new_callable` if specified.
1810 `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are
1811 available for alternate use-cases.
1812 """
1813 getter, attribute = _get_target(target)
1814 return _patch(
1815 getter, attribute, new, spec, create,
1816 spec_set, autospec, new_callable, kwargs, unsafe=unsafe
1817 )
1820class _patch_dict(object):
1821 """
1822 Patch a dictionary, or dictionary like object, and restore the dictionary
1823 to its original state after the test.
1825 `in_dict` can be a dictionary or a mapping like container. If it is a
1826 mapping then it must at least support getting, setting and deleting items
1827 plus iterating over keys.
1829 `in_dict` can also be a string specifying the name of the dictionary, which
1830 will then be fetched by importing it.
1832 `values` can be a dictionary of values to set in the dictionary. `values`
1833 can also be an iterable of `(key, value)` pairs.
1835 If `clear` is True then the dictionary will be cleared before the new
1836 values are set.
1838 `patch.dict` can also be called with arbitrary keyword arguments to set
1839 values in the dictionary::
1841 with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()):
1842 ...
1844 `patch.dict` can be used as a context manager, decorator or class
1845 decorator. When used as a class decorator `patch.dict` honours
1846 `patch.TEST_PREFIX` for choosing which methods to wrap.
1847 """
1849 def __init__(self, in_dict, values=(), clear=False, **kwargs):
1850 self.in_dict = in_dict
1851 # support any argument supported by dict(...) constructor
1852 self.values = dict(values)
1853 self.values.update(kwargs)
1854 self.clear = clear
1855 self._original = None
1858 def __call__(self, f):
1859 if isinstance(f, type):
1860 return self.decorate_class(f)
1861 if inspect.iscoroutinefunction(f):
1862 return self.decorate_async_callable(f)
1863 return self.decorate_callable(f)
1866 def decorate_callable(self, f):
1867 @wraps(f)
1868 def _inner(*args, **kw):
1869 self._patch_dict()
1870 try:
1871 return f(*args, **kw)
1872 finally:
1873 self._unpatch_dict()
1875 return _inner
1878 def decorate_async_callable(self, f):
1879 @wraps(f)
1880 async def _inner(*args, **kw):
1881 self._patch_dict()
1882 try:
1883 return await f(*args, **kw)
1884 finally:
1885 self._unpatch_dict()
1887 return _inner
1890 def decorate_class(self, klass):
1891 for attr in dir(klass):
1892 attr_value = getattr(klass, attr)
1893 if (attr.startswith(patch.TEST_PREFIX) and
1894 hasattr(attr_value, "__call__")):
1895 decorator = _patch_dict(self.in_dict, self.values, self.clear)
1896 decorated = decorator(attr_value)
1897 setattr(klass, attr, decorated)
1898 return klass
1901 def __enter__(self):
1902 """Patch the dict."""
1903 self._patch_dict()
1904 return self.in_dict
1907 def _patch_dict(self):
1908 values = self.values
1909 if isinstance(self.in_dict, str):
1910 self.in_dict = _importer(self.in_dict)
1911 in_dict = self.in_dict
1912 clear = self.clear
1914 try:
1915 original = in_dict.copy()
1916 except AttributeError:
1917 # dict like object with no copy method
1918 # must support iteration over keys
1919 original = {}
1920 for key in in_dict:
1921 original[key] = in_dict[key]
1922 self._original = original
1924 if clear:
1925 _clear_dict(in_dict)
1927 try:
1928 in_dict.update(values)
1929 except AttributeError:
1930 # dict like object with no update method
1931 for key in values:
1932 in_dict[key] = values[key]
1935 def _unpatch_dict(self):
1936 in_dict = self.in_dict
1937 original = self._original
1939 _clear_dict(in_dict)
1941 try:
1942 in_dict.update(original)
1943 except AttributeError:
1944 for key in original:
1945 in_dict[key] = original[key]
1948 def __exit__(self, *args):
1949 """Unpatch the dict."""
1950 if self._original is not None:
1951 self._unpatch_dict()
1952 return False
1955 def start(self):
1956 """Activate a patch, returning any created mock."""
1957 result = self.__enter__()
1958 _patch._active_patches.append(self)
1959 return result
1962 def stop(self):
1963 """Stop an active patch."""
1964 try:
1965 _patch._active_patches.remove(self)
1966 except ValueError:
1967 # If the patch hasn't been started this will fail
1968 return None
1970 return self.__exit__(None, None, None)
1973def _clear_dict(in_dict):
1974 try:
1975 in_dict.clear()
1976 except AttributeError:
1977 keys = list(in_dict)
1978 for key in keys:
1979 del in_dict[key]
1982def _patch_stopall():
1983 """Stop all active patches. LIFO to unroll nested patches."""
1984 for patch in reversed(_patch._active_patches):
1985 patch.stop()
1988patch.object = _patch_object
1989patch.dict = _patch_dict
1990patch.multiple = _patch_multiple
1991patch.stopall = _patch_stopall
1992patch.TEST_PREFIX = 'test'
1994magic_methods = (
1995 "lt le gt ge eq ne "
1996 "getitem setitem delitem "
1997 "len contains iter "
1998 "hash str sizeof "
1999 "enter exit "
2000 # we added divmod and rdivmod here instead of numerics
2001 # because there is no idivmod
2002 "divmod rdivmod neg pos abs invert "
2003 "complex int float index "
2004 "round trunc floor ceil "
2005 "bool next "
2006 "fspath "
2007 "aiter "
2008)
2010if IS_PYPY:
2011 # PyPy has no __sizeof__: http://doc.pypy.org/en/latest/cpython_differences.html
2012 magic_methods = magic_methods.replace('sizeof ', '')
2014numerics = (
2015 "add sub mul matmul truediv floordiv mod lshift rshift and xor or pow"
2016)
2017inplace = ' '.join('i%s' % n for n in numerics.split())
2018right = ' '.join('r%s' % n for n in numerics.split())
2020# not including __prepare__, __instancecheck__, __subclasscheck__
2021# (as they are metaclass methods)
2022# __del__ is not supported at all as it causes problems if it exists
2024_non_defaults = {
2025 '__get__', '__set__', '__delete__', '__reversed__', '__missing__',
2026 '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__',
2027 '__getstate__', '__setstate__', '__getformat__',
2028 '__repr__', '__dir__', '__subclasses__', '__format__',
2029 '__getnewargs_ex__',
2030}
2033def _get_method(name, func):
2034 "Turns a callable object (like a mock) into a real function"
2035 def method(self, *args, **kw):
2036 return func(self, *args, **kw)
2037 method.__name__ = name
2038 return method
2041_magics = {
2042 '__%s__' % method for method in
2043 ' '.join([magic_methods, numerics, inplace, right]).split()
2044}
2046# Magic methods used for async `with` statements
2047_async_method_magics = {"__aenter__", "__aexit__", "__anext__"}
2048# Magic methods that are only used with async calls but are synchronous functions themselves
2049_sync_async_magics = {"__aiter__"}
2050_async_magics = _async_method_magics | _sync_async_magics
2052_all_sync_magics = _magics | _non_defaults
2053_all_magics = _all_sync_magics | _async_magics
2055_unsupported_magics = {
2056 '__getattr__', '__setattr__',
2057 '__init__', '__new__', '__prepare__',
2058 '__instancecheck__', '__subclasscheck__',
2059 '__del__'
2060}
2062_calculate_return_value = {
2063 '__hash__': lambda self: object.__hash__(self),
2064 '__str__': lambda self: object.__str__(self),
2065 '__sizeof__': lambda self: object.__sizeof__(self),
2066 '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}",
2067}
2069_return_values = {
2070 '__lt__': NotImplemented,
2071 '__gt__': NotImplemented,
2072 '__le__': NotImplemented,
2073 '__ge__': NotImplemented,
2074 '__int__': 1,
2075 '__contains__': False,
2076 '__len__': 0,
2077 '__exit__': False,
2078 '__complex__': 1j,
2079 '__float__': 1.0,
2080 '__bool__': True,
2081 '__index__': 1,
2082 '__aexit__': False,
2083}
2086def _get_eq(self):
2087 def __eq__(other):
2088 ret_val = self.__eq__._mock_return_value
2089 if ret_val is not DEFAULT:
2090 return ret_val
2091 if self is other:
2092 return True
2093 return NotImplemented
2094 return __eq__
2096def _get_ne(self):
2097 def __ne__(other):
2098 if self.__ne__._mock_return_value is not DEFAULT:
2099 return DEFAULT
2100 if self is other:
2101 return False
2102 return NotImplemented
2103 return __ne__
2105def _get_iter(self):
2106 def __iter__():
2107 ret_val = self.__iter__._mock_return_value
2108 if ret_val is DEFAULT:
2109 return iter([])
2110 # if ret_val was already an iterator, then calling iter on it should
2111 # return the iterator unchanged
2112 return iter(ret_val)
2113 return __iter__
2115def _get_async_iter(self):
2116 def __aiter__():
2117 ret_val = self.__aiter__._mock_return_value
2118 if ret_val is DEFAULT:
2119 return _AsyncIterator(iter([]))
2120 return _AsyncIterator(iter(ret_val))
2121 return __aiter__
2123_side_effect_methods = {
2124 '__eq__': _get_eq,
2125 '__ne__': _get_ne,
2126 '__iter__': _get_iter,
2127 '__aiter__': _get_async_iter
2128}
2132def _set_return_value(mock, method, name):
2133 fixed = _return_values.get(name, DEFAULT)
2134 if fixed is not DEFAULT:
2135 method.return_value = fixed
2136 return
2138 return_calculator = _calculate_return_value.get(name)
2139 if return_calculator is not None:
2140 return_value = return_calculator(mock)
2141 method.return_value = return_value
2142 return
2144 side_effector = _side_effect_methods.get(name)
2145 if side_effector is not None:
2146 method.side_effect = side_effector(mock)
2150class MagicMixin(Base):
2151 def __init__(self, *args, **kw):
2152 self._mock_set_magics() # make magic work for kwargs in init
2153 _safe_super(MagicMixin, self).__init__(*args, **kw)
2154 self._mock_set_magics() # fix magic broken by upper level init
2157 def _mock_set_magics(self):
2158 orig_magics = _magics | _async_method_magics
2159 these_magics = orig_magics
2161 if getattr(self, "_mock_methods", None) is not None:
2162 these_magics = orig_magics.intersection(self._mock_methods)
2164 remove_magics = set()
2165 remove_magics = orig_magics - these_magics
2167 for entry in remove_magics:
2168 if entry in type(self).__dict__:
2169 # remove unneeded magic methods
2170 delattr(self, entry)
2172 # don't overwrite existing attributes if called a second time
2173 these_magics = these_magics - set(type(self).__dict__)
2175 _type = type(self)
2176 for entry in these_magics:
2177 setattr(_type, entry, MagicProxy(entry, self))
2181class NonCallableMagicMock(MagicMixin, NonCallableMock):
2182 """A version of `MagicMock` that isn't callable."""
2183 def mock_add_spec(self, spec, spec_set=False):
2184 """Add a spec to a mock. `spec` can either be an object or a
2185 list of strings. Only attributes on the `spec` can be fetched as
2186 attributes from the mock.
2188 If `spec_set` is True then only attributes on the spec can be set."""
2189 self._mock_add_spec(spec, spec_set)
2190 self._mock_set_magics()
2193class AsyncMagicMixin(MagicMixin):
2194 pass
2197class MagicMock(MagicMixin, Mock):
2198 """
2199 MagicMock is a subclass of Mock with default implementations
2200 of most of the magic methods. You can use MagicMock without having to
2201 configure the magic methods yourself.
2203 If you use the `spec` or `spec_set` arguments then *only* magic
2204 methods that exist in the spec will be created.
2206 Attributes and the return value of a `MagicMock` will also be `MagicMocks`.
2207 """
2208 def mock_add_spec(self, spec, spec_set=False):
2209 """Add a spec to a mock. `spec` can either be an object or a
2210 list of strings. Only attributes on the `spec` can be fetched as
2211 attributes from the mock.
2213 If `spec_set` is True then only attributes on the spec can be set."""
2214 self._mock_add_spec(spec, spec_set)
2215 self._mock_set_magics()
2219class MagicProxy(Base):
2220 def __init__(self, name, parent):
2221 self.name = name
2222 self.parent = parent
2224 def create_mock(self):
2225 entry = self.name
2226 parent = self.parent
2227 m = parent._get_child_mock(name=entry, _new_name=entry,
2228 _new_parent=parent)
2229 setattr(parent, entry, m)
2230 _set_return_value(parent, m, entry)
2231 return m
2233 def __get__(self, obj, _type=None):
2234 return self.create_mock()
2237_CODE_ATTRS = dir(CodeType)
2238_CODE_SIG = inspect.signature(partial(CodeType.__init__, None))
2241class AsyncMockMixin(Base):
2242 await_count = _delegating_property('await_count')
2243 await_args = _delegating_property('await_args')
2244 await_args_list = _delegating_property('await_args_list')
2246 def __init__(self, *args, **kwargs):
2247 super().__init__(*args, **kwargs)
2248 # iscoroutinefunction() checks _is_coroutine property to say if an
2249 # object is a coroutine. Without this check it looks to see if it is a
2250 # function/method, which in this case it is not (since it is an
2251 # AsyncMock).
2252 # It is set through __dict__ because when spec_set is True, this
2253 # attribute is likely undefined.
2254 self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine
2255 self.__dict__['_mock_await_count'] = 0
2256 self.__dict__['_mock_await_args'] = None
2257 self.__dict__['_mock_await_args_list'] = _CallList()
2258 code_mock = NonCallableMock(spec_set=_CODE_ATTRS)
2259 code_mock.__dict__["_spec_class"] = CodeType
2260 code_mock.__dict__["_spec_signature"] = _CODE_SIG
2261 code_mock.co_flags = (
2262 inspect.CO_COROUTINE
2263 + inspect.CO_VARARGS
2264 + inspect.CO_VARKEYWORDS
2265 )
2266 code_mock.co_argcount = 0
2267 code_mock.co_varnames = ('args', 'kwargs')
2268 try:
2269 code_mock.co_posonlyargcount = 0
2270 except AttributeError:
2271 # Python 3.7 and earlier.
2272 pass
2273 code_mock.co_kwonlyargcount = 0
2274 self.__dict__['__code__'] = code_mock
2275 self.__dict__['__name__'] = 'AsyncMock'
2276 self.__dict__['__defaults__'] = tuple()
2277 self.__dict__['__kwdefaults__'] = {}
2278 self.__dict__['__annotations__'] = None
2280 async def _execute_mock_call(_mock_self, *args, **kwargs):
2281 self = _mock_self
2282 # This is nearly just like super(), except for special handling
2283 # of coroutines
2285 _call = _Call((args, kwargs), two=True)
2286 self.await_count += 1
2287 self.await_args = _call
2288 self.await_args_list.append(_call)
2290 effect = self.side_effect
2291 if effect is not None:
2292 if _is_exception(effect):
2293 raise effect
2294 elif not _callable(effect):
2295 try:
2296 result = next(effect)
2297 except StopIteration:
2298 # It is impossible to propagate a StopIteration
2299 # through coroutines because of PEP 479
2300 raise StopAsyncIteration
2301 if _is_exception(result):
2302 raise result
2303 elif iscoroutinefunction(effect):
2304 result = await effect(*args, **kwargs)
2305 else:
2306 result = effect(*args, **kwargs)
2308 if result is not DEFAULT:
2309 return result
2311 if self._mock_return_value is not DEFAULT:
2312 return self.return_value
2314 if self._mock_wraps is not None:
2315 if iscoroutinefunction(self._mock_wraps):
2316 return await self._mock_wraps(*args, **kwargs)
2317 return self._mock_wraps(*args, **kwargs)
2319 return self.return_value
2321 def assert_awaited(_mock_self):
2322 """
2323 Assert that the mock was awaited at least once.
2324 """
2325 self = _mock_self
2326 if self.await_count == 0:
2327 msg = f"Expected {self._mock_name or 'mock'} to have been awaited."
2328 raise AssertionError(msg)
2330 def assert_awaited_once(_mock_self):
2331 """
2332 Assert that the mock was awaited exactly once.
2333 """
2334 self = _mock_self
2335 if not self.await_count == 1:
2336 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2337 f" Awaited {self.await_count} times.")
2338 raise AssertionError(msg)
2340 def assert_awaited_with(_mock_self, *args, **kwargs):
2341 """
2342 Assert that the last await was with the specified arguments.
2343 """
2344 self = _mock_self
2345 if self.await_args is None:
2346 expected = self._format_mock_call_signature(args, kwargs)
2347 raise AssertionError(f'Expected await: {expected}\nNot awaited')
2349 def _error_message():
2350 msg = self._format_mock_failure_message(args, kwargs, action='await')
2351 return msg
2353 expected = self._call_matcher(_Call((args, kwargs), two=True))
2354 actual = self._call_matcher(self.await_args)
2355 if actual != expected:
2356 cause = expected if isinstance(expected, Exception) else None
2357 raise AssertionError(_error_message()) from cause
2359 def assert_awaited_once_with(_mock_self, *args, **kwargs):
2360 """
2361 Assert that the mock was awaited exactly once and with the specified
2362 arguments.
2363 """
2364 self = _mock_self
2365 if not self.await_count == 1:
2366 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2367 f" Awaited {self.await_count} times.")
2368 raise AssertionError(msg)
2369 return self.assert_awaited_with(*args, **kwargs)
2371 def assert_any_await(_mock_self, *args, **kwargs):
2372 """
2373 Assert the mock has ever been awaited with the specified arguments.
2374 """
2375 self = _mock_self
2376 expected = self._call_matcher(_Call((args, kwargs), two=True))
2377 cause = expected if isinstance(expected, Exception) else None
2378 actual = [self._call_matcher(c) for c in self.await_args_list]
2379 if cause or expected not in _AnyComparer(actual):
2380 expected_string = self._format_mock_call_signature(args, kwargs)
2381 raise AssertionError(
2382 '%s await not found' % expected_string
2383 ) from cause
2385 def assert_has_awaits(_mock_self, calls, any_order=False):
2386 """
2387 Assert the mock has been awaited with the specified calls.
2388 The :attr:`await_args_list` list is checked for the awaits.
2390 If `any_order` is False (the default) then the awaits must be
2391 sequential. There can be extra calls before or after the
2392 specified awaits.
2394 If `any_order` is True then the awaits can be in any order, but
2395 they must all appear in :attr:`await_args_list`.
2396 """
2397 self = _mock_self
2398 expected = [self._call_matcher(c) for c in calls]
2399 cause = cause = next((e for e in expected if isinstance(e, Exception)), None)
2400 all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list)
2401 if not any_order:
2402 if expected not in all_awaits:
2403 if cause is None:
2404 problem = 'Awaits not found.'
2405 else:
2406 problem = ('Error processing expected awaits.\n'
2407 'Errors: {}').format(
2408 [e if isinstance(e, Exception) else None
2409 for e in expected])
2410 raise AssertionError(
2411 f'{problem}\n'
2412 f'Expected: {_CallList(calls)}\n'
2413 f'Actual: {self.await_args_list}'
2414 ) from cause
2415 return
2417 all_awaits = list(all_awaits)
2419 not_found = []
2420 for kall in expected:
2421 try:
2422 all_awaits.remove(kall)
2423 except ValueError:
2424 not_found.append(kall)
2425 if not_found:
2426 raise AssertionError(
2427 '%r not all found in await list' % (tuple(not_found),)
2428 ) from cause
2430 def assert_not_awaited(_mock_self):
2431 """
2432 Assert that the mock was never awaited.
2433 """
2434 self = _mock_self
2435 if self.await_count != 0:
2436 msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited."
2437 f" Awaited {self.await_count} times.")
2438 raise AssertionError(msg)
2440 def reset_mock(self, *args, **kwargs):
2441 """
2442 See :func:`.Mock.reset_mock()`
2443 """
2444 super().reset_mock(*args, **kwargs)
2445 self.await_count = 0
2446 self.await_args = None
2447 self.await_args_list = _CallList()
2450class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock):
2451 """
2452 Enhance :class:`Mock` with features allowing to mock
2453 an async function.
2455 The :class:`AsyncMock` object will behave so the object is
2456 recognized as an async function, and the result of a call is an awaitable:
2458 >>> mock = AsyncMock()
2459 >>> iscoroutinefunction(mock)
2460 True
2461 >>> inspect.isawaitable(mock())
2462 True
2465 The result of ``mock()`` is an async function which will have the outcome
2466 of ``side_effect`` or ``return_value``:
2468 - if ``side_effect`` is a function, the async function will return the
2469 result of that function,
2470 - if ``side_effect`` is an exception, the async function will raise the
2471 exception,
2472 - if ``side_effect`` is an iterable, the async function will return the
2473 next value of the iterable, however, if the sequence of result is
2474 exhausted, ``StopIteration`` is raised immediately,
2475 - if ``side_effect`` is not defined, the async function will return the
2476 value defined by ``return_value``, hence, by default, the async function
2477 returns a new :class:`AsyncMock` object.
2479 If the outcome of ``side_effect`` or ``return_value`` is an async function,
2480 the mock async function obtained when the mock object is called will be this
2481 async function itself (and not an async function returning an async
2482 function).
2484 The test author can also specify a wrapped object with ``wraps``. In this
2485 case, the :class:`Mock` object behavior is the same as with an
2486 :class:`.Mock` object: the wrapped object may have methods
2487 defined as async function functions.
2489 Based on Martin Richard's asynctest project.
2490 """
2493class _ANY(object):
2494 "A helper object that compares equal to everything."
2496 def __eq__(self, other):
2497 return True
2499 def __ne__(self, other):
2500 return False
2502 def __repr__(self):
2503 return '<ANY>'
2505ANY = _ANY()
2509def _format_call_signature(name, args, kwargs):
2510 message = '%s(%%s)' % name
2511 formatted_args = ''
2512 args_string = ', '.join([repr(arg) for arg in args])
2513 kwargs_string = ', '.join([
2514 '%s=%r' % (key, value) for key, value in kwargs.items()
2515 ])
2516 if args_string:
2517 formatted_args = args_string
2518 if kwargs_string:
2519 if formatted_args:
2520 formatted_args += ', '
2521 formatted_args += kwargs_string
2523 return message % formatted_args
2527class _Call(tuple):
2528 """
2529 A tuple for holding the results of a call to a mock, either in the form
2530 `(args, kwargs)` or `(name, args, kwargs)`.
2532 If args or kwargs are empty then a call tuple will compare equal to
2533 a tuple without those values. This makes comparisons less verbose::
2535 _Call(('name', (), {})) == ('name',)
2536 _Call(('name', (1,), {})) == ('name', (1,))
2537 _Call(((), {'a': 'b'})) == ({'a': 'b'},)
2539 The `_Call` object provides a useful shortcut for comparing with call::
2541 _Call(((1, 2), {'a': 3})) == call(1, 2, a=3)
2542 _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3)
2544 If the _Call has no name then it will match any name.
2545 """
2546 def __new__(cls, value=(), name='', parent=None, two=False,
2547 from_kall=True):
2548 args = ()
2549 kwargs = {}
2550 _len = len(value)
2551 if _len == 3:
2552 name, args, kwargs = value
2553 elif _len == 2:
2554 first, second = value
2555 if isinstance(first, str):
2556 name = first
2557 if isinstance(second, tuple):
2558 args = second
2559 else:
2560 kwargs = second
2561 else:
2562 args, kwargs = first, second
2563 elif _len == 1:
2564 value, = value
2565 if isinstance(value, str):
2566 name = value
2567 elif isinstance(value, tuple):
2568 args = value
2569 else:
2570 kwargs = value
2572 if two:
2573 return tuple.__new__(cls, (args, kwargs))
2575 return tuple.__new__(cls, (name, args, kwargs))
2578 def __init__(self, value=(), name=None, parent=None, two=False,
2579 from_kall=True):
2580 self._mock_name = name
2581 self._mock_parent = parent
2582 self._mock_from_kall = from_kall
2585 def __eq__(self, other):
2586 try:
2587 len_other = len(other)
2588 except TypeError:
2589 return NotImplemented
2591 self_name = ''
2592 if len(self) == 2:
2593 self_args, self_kwargs = self
2594 else:
2595 self_name, self_args, self_kwargs = self
2597 if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None)
2598 and self._mock_parent != other._mock_parent):
2599 return False
2601 other_name = ''
2602 if len_other == 0:
2603 other_args, other_kwargs = (), {}
2604 elif len_other == 3:
2605 other_name, other_args, other_kwargs = other
2606 elif len_other == 1:
2607 value, = other
2608 if isinstance(value, tuple):
2609 other_args = value
2610 other_kwargs = {}
2611 elif isinstance(value, str):
2612 other_name = value
2613 other_args, other_kwargs = (), {}
2614 else:
2615 other_args = ()
2616 other_kwargs = value
2617 elif len_other == 2:
2618 # could be (name, args) or (name, kwargs) or (args, kwargs)
2619 first, second = other
2620 if isinstance(first, str):
2621 other_name = first
2622 if isinstance(second, tuple):
2623 other_args, other_kwargs = second, {}
2624 else:
2625 other_args, other_kwargs = (), second
2626 else:
2627 other_args, other_kwargs = first, second
2628 else:
2629 return False
2631 if self_name and other_name != self_name:
2632 return False
2634 # this order is important for ANY to work!
2635 return (other_args, other_kwargs) == (self_args, self_kwargs)
2638 __ne__ = object.__ne__
2641 def __call__(self, *args, **kwargs):
2642 if self._mock_name is None:
2643 return _Call(('', args, kwargs), name='()')
2645 name = self._mock_name + '()'
2646 return _Call((self._mock_name, args, kwargs), name=name, parent=self)
2649 def __getattr__(self, attr):
2650 if self._mock_name is None:
2651 return _Call(name=attr, from_kall=False)
2652 name = '%s.%s' % (self._mock_name, attr)
2653 return _Call(name=name, parent=self, from_kall=False)
2656 def __getattribute__(self, attr):
2657 if attr in tuple.__dict__:
2658 raise AttributeError
2659 return tuple.__getattribute__(self, attr)
2662 def _get_call_arguments(self):
2663 if len(self) == 2:
2664 args, kwargs = self
2665 else:
2666 name, args, kwargs = self
2668 return args, kwargs
2670 @property
2671 def args(self):
2672 return self._get_call_arguments()[0]
2674 @property
2675 def kwargs(self):
2676 return self._get_call_arguments()[1]
2678 def __repr__(self):
2679 if not self._mock_from_kall:
2680 name = self._mock_name or 'call'
2681 if name.startswith('()'):
2682 name = 'call%s' % name
2683 return name
2685 if len(self) == 2:
2686 name = 'call'
2687 args, kwargs = self
2688 else:
2689 name, args, kwargs = self
2690 if not name:
2691 name = 'call'
2692 elif not name.startswith('()'):
2693 name = 'call.%s' % name
2694 else:
2695 name = 'call%s' % name
2696 return _format_call_signature(name, args, kwargs)
2699 def call_list(self):
2700 """For a call object that represents multiple calls, `call_list`
2701 returns a list of all the intermediate calls as well as the
2702 final call."""
2703 vals = []
2704 thing = self
2705 while thing is not None:
2706 if thing._mock_from_kall:
2707 vals.append(thing)
2708 thing = thing._mock_parent
2709 return _CallList(reversed(vals))
2712call = _Call(from_kall=False)
2715def create_autospec(spec, spec_set=False, instance=False, _parent=None,
2716 _name=None, *, unsafe=False, **kwargs):
2717 """Create a mock object using another object as a spec. Attributes on the
2718 mock will use the corresponding attribute on the `spec` object as their
2719 spec.
2721 Functions or methods being mocked will have their arguments checked
2722 to check that they are called with the correct signature.
2724 If `spec_set` is True then attempting to set attributes that don't exist
2725 on the spec object will raise an `AttributeError`.
2727 If a class is used as a spec then the return value of the mock (the
2728 instance of the class) will have the same spec. You can use a class as the
2729 spec for an instance object by passing `instance=True`. The returned mock
2730 will only be callable if instances of the mock are callable.
2732 `create_autospec` will raise a `RuntimeError` if passed some common
2733 misspellings of the arguments autospec and spec_set. Pass the argument
2734 `unsafe` with the value True to disable that check.
2736 `create_autospec` also takes arbitrary keyword arguments that are passed to
2737 the constructor of the created mock."""
2738 if _is_list(spec):
2739 # can't pass a list instance to the mock constructor as it will be
2740 # interpreted as a list of strings
2741 spec = type(spec)
2743 is_type = isinstance(spec, type)
2744 if _is_instance_mock(spec):
2745 raise InvalidSpecError(f'Cannot autospec a Mock object. '
2746 f'[object={spec!r}]')
2747 is_async_func = _is_async_func(spec)
2748 _kwargs = {'spec': spec}
2749 if spec_set:
2750 _kwargs = {'spec_set': spec}
2751 elif spec is None:
2752 # None we mock with a normal mock without a spec
2753 _kwargs = {}
2754 if _kwargs and instance:
2755 _kwargs['_spec_as_instance'] = True
2756 if not unsafe:
2757 _check_spec_arg_typos(kwargs)
2759 _kwargs.update(kwargs)
2761 Klass = MagicMock
2762 if inspect.isdatadescriptor(spec):
2763 # descriptors don't have a spec
2764 # because we don't know what type they return
2765 _kwargs = {}
2766 elif is_async_func:
2767 if instance:
2768 raise RuntimeError("Instance can not be True when create_autospec "
2769 "is mocking an async function")
2770 Klass = AsyncMock
2771 elif not _callable(spec):
2772 Klass = NonCallableMagicMock
2773 elif is_type and instance and not _instance_callable(spec):
2774 Klass = NonCallableMagicMock
2776 _name = _kwargs.pop('name', _name)
2778 _new_name = _name
2779 if _parent is None:
2780 # for a top level object no _new_name should be set
2781 _new_name = ''
2783 mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name,
2784 name=_name, **_kwargs)
2786 if isinstance(spec, FunctionTypes):
2787 # should only happen at the top level because we don't
2788 # recurse for functions
2789 mock = _set_signature(mock, spec)
2790 if is_async_func:
2791 _setup_async_mock(mock)
2792 else:
2793 _check_signature(spec, mock, is_type, instance)
2795 if _parent is not None and not instance:
2796 _parent._mock_children[_name] = mock
2798 if is_type and not instance and 'return_value' not in kwargs:
2799 mock.return_value = create_autospec(spec, spec_set, instance=True,
2800 _name='()', _parent=mock)
2802 for entry in dir(spec):
2803 if _is_magic(entry):
2804 # MagicMock already does the useful magic methods for us
2805 continue
2807 # XXXX do we need a better way of getting attributes without
2808 # triggering code execution (?) Probably not - we need the actual
2809 # object to mock it so we would rather trigger a property than mock
2810 # the property descriptor. Likewise we want to mock out dynamically
2811 # provided attributes.
2812 # XXXX what about attributes that raise exceptions other than
2813 # AttributeError on being fetched?
2814 # we could be resilient against it, or catch and propagate the
2815 # exception when the attribute is fetched from the mock
2816 try:
2817 original = getattr(spec, entry)
2818 except AttributeError:
2819 continue
2821 kwargs = {'spec': original}
2822 if spec_set:
2823 kwargs = {'spec_set': original}
2825 if not isinstance(original, FunctionTypes):
2826 new = _SpecState(original, spec_set, mock, entry, instance)
2827 mock._mock_children[entry] = new
2828 else:
2829 parent = mock
2830 if isinstance(spec, FunctionTypes):
2831 parent = mock.mock
2833 skipfirst = _must_skip(spec, entry, is_type)
2834 kwargs['_eat_self'] = skipfirst
2835 if iscoroutinefunction(original):
2836 child_klass = AsyncMock
2837 else:
2838 child_klass = MagicMock
2839 new = child_klass(parent=parent, name=entry, _new_name=entry,
2840 _new_parent=parent,
2841 **kwargs)
2842 mock._mock_children[entry] = new
2843 new.return_value = child_klass()
2844 _check_signature(original, new, skipfirst=skipfirst)
2846 # so functions created with _set_signature become instance attributes,
2847 # *plus* their underlying mock exists in _mock_children of the parent
2848 # mock. Adding to _mock_children may be unnecessary where we are also
2849 # setting as an instance attribute?
2850 if isinstance(new, FunctionTypes):
2851 setattr(mock, entry, new)
2853 return mock
2856def _must_skip(spec, entry, is_type):
2857 """
2858 Return whether we should skip the first argument on spec's `entry`
2859 attribute.
2860 """
2861 if not isinstance(spec, type):
2862 if entry in getattr(spec, '__dict__', {}):
2863 # instance attribute - shouldn't skip
2864 return False
2865 spec = spec.__class__
2867 for klass in spec.__mro__:
2868 result = klass.__dict__.get(entry, DEFAULT)
2869 if result is DEFAULT:
2870 continue
2871 if isinstance(result, (staticmethod, classmethod)):
2872 return False
2873 elif isinstance(result, FunctionTypes):
2874 # Normal method => skip if looked up on type
2875 # (if looked up on instance, self is already skipped)
2876 return is_type
2877 else:
2878 return False
2880 # function is a dynamically provided attribute
2881 return is_type
2884class _SpecState(object):
2886 def __init__(self, spec, spec_set=False, parent=None,
2887 name=None, ids=None, instance=False):
2888 self.spec = spec
2889 self.ids = ids
2890 self.spec_set = spec_set
2891 self.parent = parent
2892 self.instance = instance
2893 self.name = name
2896FunctionTypes = (
2897 # python function
2898 type(create_autospec),
2899 # instance method
2900 type(ANY.__eq__),
2901)
2904file_spec = None
2905open_spec = None
2908def _to_stream(read_data):
2909 if isinstance(read_data, bytes):
2910 return io.BytesIO(read_data)
2911 else:
2912 return io.StringIO(read_data)
2915def mock_open(mock=None, read_data=''):
2916 """
2917 A helper function to create a mock to replace the use of `open`. It works
2918 for `open` called directly or used as a context manager.
2920 The `mock` argument is the mock object to configure. If `None` (the
2921 default) then a `MagicMock` will be created for you, with the API limited
2922 to methods or attributes available on standard file handles.
2924 `read_data` is a string for the `read`, `readline` and `readlines` of the
2925 file handle to return. This is an empty string by default.
2926 """
2927 _read_data = _to_stream(read_data)
2928 _state = [_read_data, None]
2930 def _readlines_side_effect(*args, **kwargs):
2931 if handle.readlines.return_value is not None:
2932 return handle.readlines.return_value
2933 return _state[0].readlines(*args, **kwargs)
2935 def _read_side_effect(*args, **kwargs):
2936 if handle.read.return_value is not None:
2937 return handle.read.return_value
2938 return _state[0].read(*args, **kwargs)
2940 def _readline_side_effect(*args, **kwargs):
2941 yield from _iter_side_effect()
2942 while True:
2943 yield _state[0].readline(*args, **kwargs)
2945 def _iter_side_effect():
2946 if handle.readline.return_value is not None:
2947 while True:
2948 yield handle.readline.return_value
2949 for line in _state[0]:
2950 yield line
2952 def _next_side_effect():
2953 if handle.readline.return_value is not None:
2954 return handle.readline.return_value
2955 return next(_state[0])
2957 global file_spec
2958 if file_spec is None:
2959 import _io
2960 file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO))))
2962 global open_spec
2963 if open_spec is None:
2964 import _io
2965 open_spec = list(set(dir(_io.open)))
2966 if mock is None:
2967 mock = MagicMock(name='open', spec=open_spec)
2969 handle = MagicMock(spec=file_spec)
2970 handle.__enter__.return_value = handle
2972 handle.write.return_value = None
2973 handle.read.return_value = None
2974 handle.readline.return_value = None
2975 handle.readlines.return_value = None
2977 handle.read.side_effect = _read_side_effect
2978 _state[1] = _readline_side_effect()
2979 handle.readline.side_effect = _state[1]
2980 handle.readlines.side_effect = _readlines_side_effect
2981 handle.__iter__.side_effect = _iter_side_effect
2982 handle.__next__.side_effect = _next_side_effect
2984 def reset_data(*args, **kwargs):
2985 _state[0] = _to_stream(read_data)
2986 if handle.readline.side_effect == _state[1]:
2987 # Only reset the side effect if the user hasn't overridden it.
2988 _state[1] = _readline_side_effect()
2989 handle.readline.side_effect = _state[1]
2990 return DEFAULT
2992 mock.side_effect = reset_data
2993 mock.return_value = handle
2994 return mock
2997class PropertyMock(Mock):
2998 """
2999 A mock intended to be used as a property, or other descriptor, on a class.
3000 `PropertyMock` provides `__get__` and `__set__` methods so you can specify
3001 a return value when it is fetched.
3003 Fetching a `PropertyMock` instance from an object calls the mock, with
3004 no args. Setting it calls the mock with the value being set.
3005 """
3006 def _get_child_mock(self, **kwargs):
3007 return MagicMock(**kwargs)
3009 def __get__(self, obj, obj_type=None):
3010 return self()
3011 def __set__(self, obj, val):
3012 self(val)
3015def seal(mock):
3016 """Disable the automatic generation of child mocks.
3018 Given an input Mock, seals it to ensure no further mocks will be generated
3019 when accessing an attribute that was not already defined.
3021 The operation recursively seals the mock passed in, meaning that
3022 the mock itself, any mocks generated by accessing one of its attributes,
3023 and all assigned mocks without a name or spec will be sealed.
3024 """
3025 mock._mock_sealed = True
3026 for attr in dir(mock):
3027 try:
3028 m = getattr(mock, attr)
3029 except AttributeError:
3030 continue
3031 if not isinstance(m, NonCallableMock):
3032 continue
3033 if isinstance(m._mock_children.get(attr), _SpecState):
3034 continue
3035 if m._mock_new_parent is mock:
3036 seal(m)
3039class _AsyncIterator:
3040 """
3041 Wraps an iterator in an asynchronous iterator.
3042 """
3043 def __init__(self, iterator):
3044 self.iterator = iterator
3045 code_mock = NonCallableMock(spec_set=CodeType)
3046 code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE
3047 self.__dict__['__code__'] = code_mock
3049 async def __anext__(self):
3050 try:
3051 return next(self.iterator)
3052 except StopIteration:
3053 pass
3054 raise StopAsyncIteration