Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/mock/mock.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# mock.py
2# Test tools for mocking and patching.
3# Maintained by Michael Foord
4# Backport for other versions of Python available from
5# https://pypi.org/project/mock
7__all__ = (
8 'Mock',
9 'MagicMock',
10 'patch',
11 'sentinel',
12 'DEFAULT',
13 'ANY',
14 'call',
15 'create_autospec',
16 'AsyncMock',
17 'ThreadingMock',
18 'FILTER_DIR',
19 'NonCallableMock',
20 'NonCallableMagicMock',
21 'mock_open',
22 'PropertyMock',
23 'seal',
24)
27import asyncio
28import contextlib
29import io
30import inspect
31import pprint
32import sys
33import threading
34import builtins
36try:
37 from dataclasses import fields, is_dataclass
38 HAS_DATACLASSES = True
39except ImportError:
40 HAS_DATACLASSES = False
42from types import CodeType, ModuleType, MethodType
43from unittest.util import safe_repr
44from functools import wraps, partial
45from threading import RLock
48from mock import IS_PYPY
49from .backports import iscoroutinefunction
52class InvalidSpecError(Exception):
53 """Indicates that an invalid value was used as a mock spec."""
56_builtins = {name for name in dir(builtins) if not name.startswith('_')}
58FILTER_DIR = True
60# Workaround for issue #12370
61# Without this, the __class__ properties wouldn't be set correctly
62_safe_super = super
64def _is_async_obj(obj):
65 if _is_instance_mock(obj) and not isinstance(obj, AsyncMock):
66 return False
67 if hasattr(obj, '__func__'):
68 obj = getattr(obj, '__func__')
69 return iscoroutinefunction(obj) or inspect.isawaitable(obj)
72def _is_async_func(func):
73 if getattr(func, '__code__', None):
74 return iscoroutinefunction(func)
75 else:
76 return False
79def _is_instance_mock(obj):
80 # can't use isinstance on Mock objects because they override __class__
81 # The base class for all mocks is NonCallableMock
82 return issubclass(type(obj), NonCallableMock)
85def _is_exception(obj):
86 return (
87 isinstance(obj, BaseException) or
88 isinstance(obj, type) and issubclass(obj, BaseException)
89 )
92def _extract_mock(obj):
93 # Autospecced functions will return a FunctionType with "mock" attribute
94 # which is the actual mock object that needs to be used.
95 if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'):
96 return obj.mock
97 else:
98 return obj
101def _get_signature_object(func, as_instance, eat_self):
102 """
103 Given an arbitrary, possibly callable object, try to create a suitable
104 signature object.
105 Return a (reduced func, signature) tuple, or None.
106 """
107 if isinstance(func, type) and not as_instance:
108 # If it's a type and should be modelled as a type, use __init__.
109 func = func.__init__
110 # Skip the `self` argument in __init__
111 eat_self = True
112 elif isinstance(func, (classmethod, staticmethod)):
113 if isinstance(func, classmethod):
114 # Skip the `cls` argument of a class method
115 eat_self = True
116 # Use the original decorated method to extract the correct function signature
117 func = func.__func__
118 elif not isinstance(func, FunctionTypes):
119 # If we really want to model an instance of the passed type,
120 # __call__ should be looked up, not __init__.
121 try:
122 func = func.__call__
123 except AttributeError:
124 return None
125 if eat_self:
126 sig_func = partial(func, None)
127 else:
128 sig_func = func
129 try:
130 return func, inspect.signature(sig_func)
131 except ValueError:
132 # Certain callable types are not supported by inspect.signature()
133 return None
136def _check_signature(func, mock, skipfirst, instance=False):
137 sig = _get_signature_object(func, instance, skipfirst)
138 if sig is None:
139 return
140 func, sig = sig
141 def checksig(_mock_self, *args, **kwargs):
142 sig.bind(*args, **kwargs)
143 _copy_func_details(func, checksig)
144 type(mock)._mock_check_sig = checksig
145 type(mock).__signature__ = sig
148def _copy_func_details(func, funcopy):
149 # we explicitly don't copy func.__dict__ into this copy as it would
150 # expose original attributes that should be mocked
151 for attribute in (
152 '__name__', '__doc__', '__text_signature__',
153 '__module__', '__defaults__', '__kwdefaults__',
154 ):
155 try:
156 setattr(funcopy, attribute, getattr(func, attribute))
157 except AttributeError:
158 pass
161def _callable(obj):
162 if isinstance(obj, type):
163 return True
164 if isinstance(obj, (staticmethod, classmethod, MethodType)):
165 return _callable(obj.__func__)
166 if getattr(obj, '__call__', None) is not None:
167 return True
168 return False
171def _is_list(obj):
172 # checks for list or tuples
173 # XXXX badly named!
174 return type(obj) in (list, tuple)
177def _instance_callable(obj):
178 """Given an object, return True if the object is callable.
179 For classes, return True if instances would be callable."""
180 if not isinstance(obj, type):
181 # already an instance
182 return getattr(obj, '__call__', None) is not None
184 # *could* be broken by a class overriding __mro__ or __dict__ via
185 # a metaclass
186 for base in (obj,) + obj.__mro__:
187 if base.__dict__.get('__call__') is not None:
188 return True
189 return False
192def _set_signature(mock, original, instance=False):
193 # creates a function with signature (*args, **kwargs) that delegates to a
194 # mock. It still does signature checking by calling a lambda with the same
195 # signature as the original.
197 skipfirst = isinstance(original, type)
198 result = _get_signature_object(original, instance, skipfirst)
199 if result is None:
200 return mock
201 func, sig = result
202 def checksig(*args, **kwargs):
203 sig.bind(*args, **kwargs)
204 _copy_func_details(func, checksig)
206 name = original.__name__
207 if not name.isidentifier():
208 name = 'funcopy'
209 context = {'_checksig_': checksig, 'mock': mock}
210 src = """def %s(*args, **kwargs):
211 _checksig_(*args, **kwargs)
212 return mock(*args, **kwargs)""" % name
213 exec (src, context)
214 funcopy = context[name]
215 _setup_func(funcopy, mock, sig)
216 return funcopy
218def _set_async_signature(mock, original, instance=False, is_async_mock=False):
219 # creates an async function with signature (*args, **kwargs) that delegates to a
220 # mock. It still does signature checking by calling a lambda with the same
221 # signature as the original.
223 skipfirst = isinstance(original, type)
224 func, sig = _get_signature_object(original, instance, skipfirst)
225 def checksig(*args, **kwargs):
226 sig.bind(*args, **kwargs)
227 _copy_func_details(func, checksig)
229 name = original.__name__
230 context = {'_checksig_': checksig, 'mock': mock}
231 src = """async def %s(*args, **kwargs):
232 _checksig_(*args, **kwargs)
233 return await mock(*args, **kwargs)""" % name
234 exec (src, context)
235 funcopy = context[name]
236 _setup_func(funcopy, mock, sig)
237 _setup_async_mock(funcopy)
238 return funcopy
241def _setup_func(funcopy, mock, sig):
242 funcopy.mock = mock
244 def assert_called_with(*args, **kwargs):
245 return mock.assert_called_with(*args, **kwargs)
246 def assert_called(*args, **kwargs):
247 return mock.assert_called(*args, **kwargs)
248 def assert_not_called(*args, **kwargs):
249 return mock.assert_not_called(*args, **kwargs)
250 def assert_called_once(*args, **kwargs):
251 return mock.assert_called_once(*args, **kwargs)
252 def assert_called_once_with(*args, **kwargs):
253 return mock.assert_called_once_with(*args, **kwargs)
254 def assert_has_calls(*args, **kwargs):
255 return mock.assert_has_calls(*args, **kwargs)
256 def assert_any_call(*args, **kwargs):
257 return mock.assert_any_call(*args, **kwargs)
258 def reset_mock():
259 funcopy.method_calls = _CallList()
260 funcopy.mock_calls = _CallList()
261 mock.reset_mock()
262 ret = funcopy.return_value
263 if _is_instance_mock(ret) and not ret is mock:
264 ret.reset_mock()
266 funcopy.called = False
267 funcopy.call_count = 0
268 funcopy.call_args = None
269 funcopy.call_args_list = _CallList()
270 funcopy.method_calls = _CallList()
271 funcopy.mock_calls = _CallList()
273 funcopy.return_value = mock.return_value
274 funcopy.side_effect = mock.side_effect
275 funcopy._mock_children = mock._mock_children
277 funcopy.assert_called_with = assert_called_with
278 funcopy.assert_called_once_with = assert_called_once_with
279 funcopy.assert_has_calls = assert_has_calls
280 funcopy.assert_any_call = assert_any_call
281 funcopy.reset_mock = reset_mock
282 funcopy.assert_called = assert_called
283 funcopy.assert_not_called = assert_not_called
284 funcopy.assert_called_once = assert_called_once
285 funcopy.__signature__ = sig
287 mock._mock_delegate = funcopy
290def _setup_async_mock(mock):
291 mock._is_coroutine = asyncio.coroutines._is_coroutine
292 mock.await_count = 0
293 mock.await_args = None
294 mock.await_args_list = _CallList()
296 # Mock is not configured yet so the attributes are set
297 # to a function and then the corresponding mock helper function
298 # is called when the helper is accessed similar to _setup_func.
299 def wrapper(attr, *args, **kwargs):
300 return getattr(mock.mock, attr)(*args, **kwargs)
302 for attribute in ('assert_awaited',
303 'assert_awaited_once',
304 'assert_awaited_with',
305 'assert_awaited_once_with',
306 'assert_any_await',
307 'assert_has_awaits',
308 'assert_not_awaited'):
310 # setattr(mock, attribute, wrapper) causes late binding
311 # hence attribute will always be the last value in the loop
312 # Use partial(wrapper, attribute) to ensure the attribute is bound
313 # correctly.
314 setattr(mock, attribute, partial(wrapper, attribute))
317def _is_magic(name):
318 return '__%s__' % name[2:-2] == name
321class _SentinelObject(object):
322 "A unique, named, sentinel object."
323 def __init__(self, name):
324 self.name = name
326 def __repr__(self):
327 return 'sentinel.%s' % self.name
329 def __reduce__(self):
330 return 'sentinel.%s' % self.name
333class _Sentinel(object):
334 """Access attributes to return a named object, usable as a sentinel."""
335 def __init__(self):
336 self._sentinels = {}
338 def __getattr__(self, name):
339 if name == '__bases__':
340 # Without this help(unittest.mock) raises an exception
341 raise AttributeError
342 return self._sentinels.setdefault(name, _SentinelObject(name))
344 def __reduce__(self):
345 return 'sentinel'
348sentinel = _Sentinel()
350DEFAULT = sentinel.DEFAULT
351_missing = sentinel.MISSING
352_deleted = sentinel.DELETED
355_allowed_names = {
356 'return_value', '_mock_return_value', 'side_effect',
357 '_mock_side_effect', '_mock_parent', '_mock_new_parent',
358 '_mock_name', '_mock_new_name'
359}
362def _delegating_property(name):
363 _allowed_names.add(name)
364 _the_name = '_mock_' + name
365 def _get(self, name=name, _the_name=_the_name):
366 sig = self._mock_delegate
367 if sig is None:
368 return getattr(self, _the_name)
369 return getattr(sig, name)
370 def _set(self, value, name=name, _the_name=_the_name):
371 sig = self._mock_delegate
372 if sig is None:
373 self.__dict__[_the_name] = value
374 else:
375 setattr(sig, name, value)
377 return property(_get, _set)
381class _CallList(list):
383 def __contains__(self, value):
384 if not isinstance(value, list):
385 return list.__contains__(self, value)
386 len_value = len(value)
387 len_self = len(self)
388 if len_value > len_self:
389 return False
391 for i in range(0, len_self - len_value + 1):
392 sub_list = self[i:i+len_value]
393 if sub_list == value:
394 return True
395 return False
397 def __repr__(self):
398 return pprint.pformat(list(self))
401def _check_and_set_parent(parent, value, name, new_name):
402 value = _extract_mock(value)
404 if not _is_instance_mock(value):
405 return False
406 if ((value._mock_name or value._mock_new_name) or
407 (value._mock_parent is not None) or
408 (value._mock_new_parent is not None)):
409 return False
411 _parent = parent
412 while _parent is not None:
413 # setting a mock (value) as a child or return value of itself
414 # should not modify the mock
415 if _parent is value:
416 return False
417 _parent = _parent._mock_new_parent
419 if new_name:
420 value._mock_new_parent = parent
421 value._mock_new_name = new_name
422 if name:
423 value._mock_parent = parent
424 value._mock_name = name
425 return True
427# Internal class to identify if we wrapped an iterator object or not.
428class _MockIter(object):
429 def __init__(self, obj):
430 self.obj = iter(obj)
431 def __next__(self):
432 return next(self.obj)
434class Base(object):
435 _mock_return_value = DEFAULT
436 _mock_side_effect = None
437 def __init__(self, *args, **kwargs):
438 pass
442class NonCallableMock(Base):
443 """A non-callable version of `Mock`"""
445 # Store a mutex as a class attribute in order to protect concurrent access
446 # to mock attributes. Using a class attribute allows all NonCallableMock
447 # instances to share the mutex for simplicity.
448 #
449 # See https://github.com/python/cpython/issues/98624 for why this is
450 # necessary.
451 _lock = RLock()
453 def __new__(
454 cls, spec=None, wraps=None, name=None, spec_set=None,
455 parent=None, _spec_state=None, _new_name='', _new_parent=None,
456 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs
457 ):
458 # every instance has its own class
459 # so we can create magic methods on the
460 # class without stomping on other mocks
461 bases = (cls,)
462 if not issubclass(cls, AsyncMockMixin):
463 # Check if spec is an async object or function
464 spec_arg = spec_set or spec
465 if spec_arg is not None and _is_async_obj(spec_arg):
466 bases = (AsyncMockMixin, cls)
467 new = type(cls.__name__, bases, {'__doc__': cls.__doc__})
468 instance = _safe_super(NonCallableMock, cls).__new__(new)
469 return instance
472 def __init__(
473 self, spec=None, wraps=None, name=None, spec_set=None,
474 parent=None, _spec_state=None, _new_name='', _new_parent=None,
475 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs
476 ):
477 if _new_parent is None:
478 _new_parent = parent
480 __dict__ = self.__dict__
481 __dict__['_mock_parent'] = parent
482 __dict__['_mock_name'] = name
483 __dict__['_mock_new_name'] = _new_name
484 __dict__['_mock_new_parent'] = _new_parent
485 __dict__['_mock_sealed'] = False
487 if spec_set is not None:
488 spec = spec_set
489 spec_set = True
490 if _eat_self is None:
491 _eat_self = parent is not None
493 self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self)
495 __dict__['_mock_children'] = {}
496 __dict__['_mock_wraps'] = wraps
497 __dict__['_mock_delegate'] = None
499 __dict__['_mock_called'] = False
500 __dict__['_mock_call_args'] = None
501 __dict__['_mock_call_count'] = 0
502 __dict__['_mock_call_args_list'] = _CallList()
503 __dict__['_mock_mock_calls'] = _CallList()
505 __dict__['method_calls'] = _CallList()
506 __dict__['_mock_unsafe'] = unsafe
508 if kwargs:
509 self.configure_mock(**kwargs)
511 _safe_super(NonCallableMock, self).__init__(
512 spec, wraps, name, spec_set, parent,
513 _spec_state
514 )
517 def attach_mock(self, mock, attribute):
518 """
519 Attach a mock as an attribute of this one, replacing its name and
520 parent. Calls to the attached mock will be recorded in the
521 `method_calls` and `mock_calls` attributes of this one."""
522 inner_mock = _extract_mock(mock)
524 inner_mock._mock_parent = None
525 inner_mock._mock_new_parent = None
526 inner_mock._mock_name = ''
527 inner_mock._mock_new_name = None
529 setattr(self, attribute, mock)
532 def mock_add_spec(self, spec, spec_set=False):
533 """Add a spec to a mock. `spec` can either be an object or a
534 list of strings. Only attributes on the `spec` can be fetched as
535 attributes from the mock.
537 If `spec_set` is True then only attributes on the spec can be set."""
538 self._mock_add_spec(spec, spec_set)
541 def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False,
542 _eat_self=False):
543 if _is_instance_mock(spec):
544 raise InvalidSpecError(f'Cannot spec a Mock object. [object={spec!r}]')
546 _spec_class = None
547 _spec_signature = None
548 _spec_asyncs = []
550 if spec is not None and not _is_list(spec):
551 if isinstance(spec, type):
552 _spec_class = spec
553 else:
554 _spec_class = type(spec)
555 res = _get_signature_object(spec,
556 _spec_as_instance, _eat_self)
557 _spec_signature = res and res[1]
559 spec_list = dir(spec)
561 for attr in spec_list:
562 static_attr = inspect.getattr_static(spec, attr, None)
563 unwrapped_attr = static_attr
564 try:
565 unwrapped_attr = inspect.unwrap(unwrapped_attr)
566 except ValueError:
567 pass
568 if iscoroutinefunction(unwrapped_attr):
569 _spec_asyncs.append(attr)
571 spec = spec_list
573 __dict__ = self.__dict__
574 __dict__['_spec_class'] = _spec_class
575 __dict__['_spec_set'] = spec_set
576 __dict__['_spec_signature'] = _spec_signature
577 __dict__['_mock_methods'] = spec
578 __dict__['_spec_asyncs'] = _spec_asyncs
580 def __get_return_value(self):
581 ret = self._mock_return_value
582 if self._mock_delegate is not None:
583 ret = self._mock_delegate.return_value
585 if ret is DEFAULT and self._mock_wraps is None:
586 ret = self._get_child_mock(
587 _new_parent=self, _new_name='()'
588 )
589 self.return_value = ret
590 return ret
593 def __set_return_value(self, value):
594 if self._mock_delegate is not None:
595 self._mock_delegate.return_value = value
596 else:
597 self._mock_return_value = value
598 _check_and_set_parent(self, value, None, '()')
600 __return_value_doc = "The value to be returned when the mock is called."
601 return_value = property(__get_return_value, __set_return_value,
602 __return_value_doc)
605 @property
606 def __class__(self):
607 if self._spec_class is None:
608 return type(self)
609 return self._spec_class
611 called = _delegating_property('called')
612 call_count = _delegating_property('call_count')
613 call_args = _delegating_property('call_args')
614 call_args_list = _delegating_property('call_args_list')
615 mock_calls = _delegating_property('mock_calls')
618 def __get_side_effect(self):
619 delegated = self._mock_delegate
620 if delegated is None:
621 return self._mock_side_effect
622 sf = delegated.side_effect
623 if (sf is not None and not callable(sf)
624 and not isinstance(sf, _MockIter) and not _is_exception(sf)):
625 sf = _MockIter(sf)
626 delegated.side_effect = sf
627 return sf
629 def __set_side_effect(self, value):
630 value = _try_iter(value)
631 delegated = self._mock_delegate
632 if delegated is None:
633 self._mock_side_effect = value
634 else:
635 delegated.side_effect = value
637 side_effect = property(__get_side_effect, __set_side_effect)
640 def reset_mock(self, visited=None, *,
641 return_value: bool = False,
642 side_effect: bool = False):
643 "Restore the mock object to its initial state."
644 if visited is None:
645 visited = []
646 if id(self) in visited:
647 return
648 visited.append(id(self))
650 self.called = False
651 self.call_args = None
652 self.call_count = 0
653 self.mock_calls = _CallList()
654 self.call_args_list = _CallList()
655 self.method_calls = _CallList()
657 if return_value:
658 self._mock_return_value = DEFAULT
659 if side_effect:
660 self._mock_side_effect = None
662 for child in self._mock_children.values():
663 if isinstance(child, _SpecState) or child is _deleted:
664 continue
665 child.reset_mock(visited, return_value=return_value, side_effect=side_effect)
667 ret = self._mock_return_value
668 if _is_instance_mock(ret) and ret is not self:
669 ret.reset_mock(visited)
672 def configure_mock(self, **kwargs):
673 """Set attributes on the mock through keyword arguments.
675 Attributes plus return values and side effects can be set on child
676 mocks using standard dot notation and unpacking a dictionary in the
677 method call:
679 >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError}
680 >>> mock.configure_mock(**attrs)"""
681 for arg, val in sorted(kwargs.items(),
682 # we sort on the number of dots so that
683 # attributes are set before we set attributes on
684 # attributes
685 key=lambda entry: entry[0].count('.')):
686 args = arg.split('.')
687 final = args.pop()
688 obj = self
689 for entry in args:
690 obj = getattr(obj, entry)
691 setattr(obj, final, val)
694 def __getattr__(self, name):
695 if name in {'_mock_methods', '_mock_unsafe'}:
696 raise AttributeError(name)
697 elif self._mock_methods is not None:
698 if name not in self._mock_methods or name in _all_magics:
699 raise AttributeError("Mock object has no attribute %r" % name)
700 elif _is_magic(name):
701 raise AttributeError(name)
702 if not self._mock_unsafe and (not self._mock_methods or name not in self._mock_methods):
703 if name.startswith(('assert', 'assret', 'asert', 'aseert', 'assrt')) or name in _ATTRIB_DENY_LIST:
704 raise AttributeError(
705 f"{name!r} is not a valid assertion. Use a spec "
706 f"for the mock if {name!r} is meant to be an attribute.")
708 with NonCallableMock._lock:
709 result = self._mock_children.get(name)
710 if result is _deleted:
711 raise AttributeError(name)
712 elif result is None:
713 wraps = None
714 if self._mock_wraps is not None:
715 # XXXX should we get the attribute without triggering code
716 # execution?
717 wraps = getattr(self._mock_wraps, name)
719 result = self._get_child_mock(
720 parent=self, name=name, wraps=wraps, _new_name=name,
721 _new_parent=self
722 )
723 self._mock_children[name] = result
725 elif isinstance(result, _SpecState):
726 try:
727 result = create_autospec(
728 result.spec, result.spec_set, result.instance,
729 result.parent, result.name
730 )
731 except InvalidSpecError:
732 target_name = self.__dict__['_mock_name'] or self
733 raise InvalidSpecError(
734 f'Cannot autospec attr {name!r} from target '
735 f'{target_name!r} as it has already been mocked out. '
736 f'[target={self!r}, attr={result.spec!r}]')
737 self._mock_children[name] = result
739 return result
742 def _extract_mock_name(self):
743 _name_list = [self._mock_new_name]
744 _parent = self._mock_new_parent
745 last = self
747 dot = '.'
748 if _name_list == ['()']:
749 dot = ''
751 while _parent is not None:
752 last = _parent
754 _name_list.append(_parent._mock_new_name + dot)
755 dot = '.'
756 if _parent._mock_new_name == '()':
757 dot = ''
759 _parent = _parent._mock_new_parent
761 _name_list = list(reversed(_name_list))
762 _first = last._mock_name or 'mock'
763 if len(_name_list) > 1:
764 if _name_list[1] not in ('()', '().'):
765 _first += '.'
766 _name_list[0] = _first
767 return ''.join(_name_list)
769 def __repr__(self):
770 name = self._extract_mock_name()
772 name_string = ''
773 if name not in ('mock', 'mock.'):
774 name_string = ' name=%r' % name
776 spec_string = ''
777 if self._spec_class is not None:
778 spec_string = ' spec=%r'
779 if self._spec_set:
780 spec_string = ' spec_set=%r'
781 spec_string = spec_string % self._spec_class.__name__
782 return "<%s%s%s id='%s'>" % (
783 type(self).__name__,
784 name_string,
785 spec_string,
786 id(self)
787 )
790 def __dir__(self):
791 """Filter the output of `dir(mock)` to only useful members."""
792 if not FILTER_DIR:
793 return object.__dir__(self)
795 extras = self._mock_methods or []
796 from_type = dir(type(self))
797 from_dict = list(self.__dict__)
798 from_child_mocks = [
799 m_name for m_name, m_value in self._mock_children.items()
800 if m_value is not _deleted]
802 from_type = [e for e in from_type if not e.startswith('_')]
803 from_dict = [e for e in from_dict if not e.startswith('_') or
804 _is_magic(e)]
805 return sorted(set(extras + from_type + from_dict + from_child_mocks))
808 def __setattr__(self, name, value):
809 if name in _allowed_names:
810 # property setters go through here
811 return object.__setattr__(self, name, value)
812 elif (self._spec_set and self._mock_methods is not None and
813 name not in self._mock_methods and
814 name not in self.__dict__):
815 raise AttributeError("Mock object has no attribute '%s'" % name)
816 elif name in _unsupported_magics:
817 msg = 'Attempting to set unsupported magic method %r.' % name
818 raise AttributeError(msg)
819 elif name in _all_magics:
820 if self._mock_methods is not None and name not in self._mock_methods:
821 raise AttributeError("Mock object has no attribute '%s'" % name)
823 if not _is_instance_mock(value):
824 setattr(type(self), name, _get_method(name, value))
825 original = value
826 value = lambda *args, **kw: original(self, *args, **kw)
827 else:
828 # only set _new_name and not name so that mock_calls is tracked
829 # but not method calls
830 _check_and_set_parent(self, value, None, name)
831 setattr(type(self), name, value)
832 self._mock_children[name] = value
833 elif name == '__class__':
834 self._spec_class = value
835 return
836 else:
837 if _check_and_set_parent(self, value, name, name):
838 self._mock_children[name] = value
840 if self._mock_sealed and not hasattr(self, name):
841 mock_name = f'{self._extract_mock_name()}.{name}'
842 raise AttributeError(f'Cannot set {mock_name}')
844 if isinstance(value, PropertyMock):
845 self.__dict__[name] = value
846 return
847 return object.__setattr__(self, name, value)
850 def __delattr__(self, name):
851 if name in _all_magics and name in type(self).__dict__:
852 delattr(type(self), name)
853 if name not in self.__dict__:
854 # for magic methods that are still MagicProxy objects and
855 # not set on the instance itself
856 return
858 obj = self._mock_children.get(name, _missing)
859 if name in self.__dict__:
860 _safe_super(NonCallableMock, self).__delattr__(name)
861 elif obj is _deleted:
862 raise AttributeError(name)
863 if obj is not _missing:
864 del self._mock_children[name]
865 self._mock_children[name] = _deleted
868 def _format_mock_call_signature(self, args, kwargs):
869 name = self._mock_name or 'mock'
870 return _format_call_signature(name, args, kwargs)
873 def _format_mock_failure_message(self, args, kwargs, action='call'):
874 message = 'expected %s not found.\nExpected: %s\n Actual: %s'
875 expected_string = self._format_mock_call_signature(args, kwargs)
876 call_args = self.call_args
877 actual_string = self._format_mock_call_signature(*call_args)
878 return message % (action, expected_string, actual_string)
881 def _get_call_signature_from_name(self, name):
882 """
883 * If call objects are asserted against a method/function like obj.meth1
884 then there could be no name for the call object to lookup. Hence just
885 return the spec_signature of the method/function being asserted against.
886 * If the name is not empty then remove () and split by '.' to get
887 list of names to iterate through the children until a potential
888 match is found. A child mock is created only during attribute access
889 so if we get a _SpecState then no attributes of the spec were accessed
890 and can be safely exited.
891 """
892 if not name:
893 return self._spec_signature
895 sig = None
896 names = name.replace('()', '').split('.')
897 children = self._mock_children
899 for name in names:
900 child = children.get(name)
901 if child is None or isinstance(child, _SpecState):
902 break
903 else:
904 # If an autospecced object is attached using attach_mock the
905 # child would be a function with mock object as attribute from
906 # which signature has to be derived.
907 child = _extract_mock(child)
908 children = child._mock_children
909 sig = child._spec_signature
911 return sig
914 def _call_matcher(self, _call):
915 """
916 Given a call (or simply an (args, kwargs) tuple), return a
917 comparison key suitable for matching with other calls.
918 This is a best effort method which relies on the spec's signature,
919 if available, or falls back on the arguments themselves.
920 """
922 if isinstance(_call, tuple) and len(_call) > 2:
923 sig = self._get_call_signature_from_name(_call[0])
924 else:
925 sig = self._spec_signature
927 if sig is not None:
928 if len(_call) == 2:
929 name = ''
930 args, kwargs = _call
931 else:
932 name, args, kwargs = _call
933 try:
934 bound_call = sig.bind(*args, **kwargs)
935 return call(name, bound_call.args, bound_call.kwargs)
936 except TypeError as e:
937 return e.with_traceback(None)
938 else:
939 return _call
941 def assert_not_called(_mock_self):
942 """assert that the mock was never called.
943 """
944 self = _mock_self
945 if self.call_count != 0:
946 msg = ("Expected '%s' to not have been called. Called %s times.%s"
947 % (self._mock_name or 'mock',
948 self.call_count,
949 self._calls_repr()))
950 raise AssertionError(msg)
952 def assert_called(_mock_self):
953 """assert that the mock was called at least once
954 """
955 self = _mock_self
956 if self.call_count == 0:
957 msg = ("Expected '%s' to have been called." %
958 (self._mock_name or 'mock'))
959 raise AssertionError(msg)
961 def assert_called_once(_mock_self):
962 """assert that the mock was called only once.
963 """
964 self = _mock_self
965 if not self.call_count == 1:
966 msg = ("Expected '%s' to have been called once. Called %s times.%s"
967 % (self._mock_name or 'mock',
968 self.call_count,
969 self._calls_repr()))
970 raise AssertionError(msg)
972 def assert_called_with(_mock_self, *args, **kwargs):
973 """assert that the last call was made with the specified arguments.
975 Raises an AssertionError if the args and keyword args passed in are
976 different to the last call to the mock."""
977 self = _mock_self
978 if self.call_args is None:
979 expected = self._format_mock_call_signature(args, kwargs)
980 actual = 'not called.'
981 error_message = ('expected call not found.\nExpected: %s\n Actual: %s'
982 % (expected, actual))
983 raise AssertionError(error_message)
985 def _error_message():
986 msg = self._format_mock_failure_message(args, kwargs)
987 return msg
988 expected = self._call_matcher(_Call((args, kwargs), two=True))
989 actual = self._call_matcher(self.call_args)
990 if actual != expected:
991 cause = expected if isinstance(expected, Exception) else None
992 raise AssertionError(_error_message()) from cause
995 def assert_called_once_with(_mock_self, *args, **kwargs):
996 """assert that the mock was called exactly once and that that call was
997 with the specified arguments."""
998 self = _mock_self
999 if not self.call_count == 1:
1000 msg = ("Expected '%s' to be called once. Called %s times.%s"
1001 % (self._mock_name or 'mock',
1002 self.call_count,
1003 self._calls_repr()))
1004 raise AssertionError(msg)
1005 return self.assert_called_with(*args, **kwargs)
1008 def assert_has_calls(self, calls, any_order=False):
1009 """assert the mock has been called with the specified calls.
1010 The `mock_calls` list is checked for the calls.
1012 If `any_order` is False (the default) then the calls must be
1013 sequential. There can be extra calls before or after the
1014 specified calls.
1016 If `any_order` is True then the calls can be in any order, but
1017 they must all appear in `mock_calls`."""
1018 expected = [self._call_matcher(c) for c in calls]
1019 cause = next((e for e in expected if isinstance(e, Exception)), None)
1020 all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls)
1021 if not any_order:
1022 if expected not in all_calls:
1023 if cause is None:
1024 problem = 'Calls not found.'
1025 else:
1026 problem = ('Error processing expected calls.\n'
1027 'Errors: {}').format(
1028 [e if isinstance(e, Exception) else None
1029 for e in expected])
1030 raise AssertionError(
1031 f'{problem}\n'
1032 f'Expected: {_CallList(calls)}\n'
1033 f' Actual: {safe_repr(self.mock_calls)}'
1034 ) from cause
1035 return
1037 all_calls = list(all_calls)
1039 not_found = []
1040 for kall in expected:
1041 try:
1042 all_calls.remove(kall)
1043 except ValueError:
1044 not_found.append(kall)
1045 if not_found:
1046 raise AssertionError(
1047 '%r does not contain all of %r in its call list, '
1048 'found %r instead' % (self._mock_name or 'mock',
1049 tuple(not_found), all_calls)
1050 ) from cause
1053 def assert_any_call(self, *args, **kwargs):
1054 """assert the mock has been called with the specified arguments.
1056 The assert passes if the mock has *ever* been called, unlike
1057 `assert_called_with` and `assert_called_once_with` that only pass if
1058 the call is the most recent one."""
1059 expected = self._call_matcher(_Call((args, kwargs), two=True))
1060 cause = expected if isinstance(expected, Exception) else None
1061 actual = [self._call_matcher(c) for c in self.call_args_list]
1062 if cause or expected not in _AnyComparer(actual):
1063 expected_string = self._format_mock_call_signature(args, kwargs)
1064 raise AssertionError(
1065 '%s call not found' % expected_string
1066 ) from cause
1069 def _get_child_mock(self, **kw):
1070 """Create the child mocks for attributes and return value.
1071 By default child mocks will be the same type as the parent.
1072 Subclasses of Mock may want to override this to customize the way
1073 child mocks are made.
1075 For non-callable mocks the callable variant will be used (rather than
1076 any custom subclass)."""
1077 if self._mock_sealed:
1078 attribute = f".{kw['name']}" if "name" in kw else "()"
1079 mock_name = self._extract_mock_name() + attribute
1080 raise AttributeError(mock_name)
1082 _new_name = kw.get("_new_name")
1083 if _new_name in self.__dict__['_spec_asyncs']:
1084 return AsyncMock(**kw)
1086 _type = type(self)
1087 if issubclass(_type, MagicMock) and _new_name in _async_method_magics:
1088 # Any asynchronous magic becomes an AsyncMock
1089 klass = AsyncMock
1090 elif issubclass(_type, AsyncMockMixin):
1091 if (_new_name in _all_sync_magics or
1092 self._mock_methods and _new_name in self._mock_methods):
1093 # Any synchronous method on AsyncMock becomes a MagicMock
1094 klass = MagicMock
1095 else:
1096 klass = AsyncMock
1097 elif not issubclass(_type, CallableMixin):
1098 if issubclass(_type, NonCallableMagicMock):
1099 klass = MagicMock
1100 elif issubclass(_type, NonCallableMock):
1101 klass = Mock
1102 else:
1103 klass = _type.__mro__[1]
1104 return klass(**kw)
1107 def _calls_repr(self):
1108 """Renders self.mock_calls as a string.
1110 Example: "\nCalls: [call(1), call(2)]."
1112 If self.mock_calls is empty, an empty string is returned. The
1113 output will be truncated if very long.
1114 """
1115 if not self.mock_calls:
1116 return ""
1117 return f"\nCalls: {safe_repr(self.mock_calls)}."
1120try:
1121 removeprefix = str.removeprefix
1122except AttributeError:
1123 # Py 3.8 and earlier:
1124 def removeprefix(name, prefix):
1125 return name[len(prefix):]
1127# Denylist for forbidden attribute names in safe mode
1128_ATTRIB_DENY_LIST = frozenset({
1129 removeprefix(name, "assert_")
1130 for name in dir(NonCallableMock)
1131 if name.startswith("assert_")
1132})
1135class _AnyComparer(list):
1136 """A list which checks if it contains a call which may have an
1137 argument of ANY, flipping the components of item and self from
1138 their traditional locations so that ANY is guaranteed to be on
1139 the left."""
1140 def __contains__(self, item):
1141 for _call in self:
1142 assert len(item) == len(_call)
1143 if all([
1144 expected == actual
1145 for expected, actual in zip(item, _call)
1146 ]):
1147 return True
1148 return False
1151def _try_iter(obj):
1152 if obj is None:
1153 return obj
1154 if _is_exception(obj):
1155 return obj
1156 if _callable(obj):
1157 return obj
1158 try:
1159 return iter(obj)
1160 except TypeError:
1161 # XXXX backwards compatibility
1162 # but this will blow up on first call - so maybe we should fail early?
1163 return obj
1166class CallableMixin(Base):
1168 def __init__(self, spec=None, side_effect=None, return_value=DEFAULT,
1169 wraps=None, name=None, spec_set=None, parent=None,
1170 _spec_state=None, _new_name='', _new_parent=None, **kwargs):
1171 self.__dict__['_mock_return_value'] = return_value
1172 _safe_super(CallableMixin, self).__init__(
1173 spec, wraps, name, spec_set, parent,
1174 _spec_state, _new_name, _new_parent, **kwargs
1175 )
1177 self.side_effect = side_effect
1180 def _mock_check_sig(self, *args, **kwargs):
1181 # stub method that can be replaced with one with a specific signature
1182 pass
1185 def __call__(_mock_self, *args, **kwargs):
1186 # can't use self in-case a function / method we are mocking uses self
1187 # in the signature
1188 _mock_self._mock_check_sig(*args, **kwargs)
1189 _mock_self._increment_mock_call(*args, **kwargs)
1190 return _mock_self._mock_call(*args, **kwargs)
1193 def _mock_call(_mock_self, *args, **kwargs):
1194 return _mock_self._execute_mock_call(*args, **kwargs)
1196 def _increment_mock_call(_mock_self, *args, **kwargs):
1197 self = _mock_self
1198 self.called = True
1199 self.call_count += 1
1201 # handle call_args
1202 # needs to be set here so assertions on call arguments pass before
1203 # execution in the case of awaited calls
1204 _call = _Call((args, kwargs), two=True)
1205 self.call_args = _call
1206 self.call_args_list.append(_call)
1208 # initial stuff for method_calls:
1209 do_method_calls = self._mock_parent is not None
1210 method_call_name = self._mock_name
1212 # initial stuff for mock_calls:
1213 mock_call_name = self._mock_new_name
1214 is_a_call = mock_call_name == '()'
1215 self.mock_calls.append(_Call(('', args, kwargs)))
1217 # follow up the chain of mocks:
1218 _new_parent = self._mock_new_parent
1219 while _new_parent is not None:
1221 # handle method_calls:
1222 if do_method_calls:
1223 _new_parent.method_calls.append(_Call((method_call_name, args, kwargs)))
1224 do_method_calls = _new_parent._mock_parent is not None
1225 if do_method_calls:
1226 method_call_name = _new_parent._mock_name + '.' + method_call_name
1228 # handle mock_calls:
1229 this_mock_call = _Call((mock_call_name, args, kwargs))
1230 _new_parent.mock_calls.append(this_mock_call)
1232 if _new_parent._mock_new_name:
1233 if is_a_call:
1234 dot = ''
1235 else:
1236 dot = '.'
1237 is_a_call = _new_parent._mock_new_name == '()'
1238 mock_call_name = _new_parent._mock_new_name + dot + mock_call_name
1240 # follow the parental chain:
1241 _new_parent = _new_parent._mock_new_parent
1243 def _execute_mock_call(_mock_self, *args, **kwargs):
1244 self = _mock_self
1245 # separate from _increment_mock_call so that awaited functions are
1246 # executed separately from their call, also AsyncMock overrides this method
1248 effect = self.side_effect
1249 if effect is not None:
1250 if _is_exception(effect):
1251 raise effect
1252 elif not _callable(effect):
1253 result = next(effect)
1254 if _is_exception(result):
1255 raise result
1256 else:
1257 result = effect(*args, **kwargs)
1259 if result is not DEFAULT:
1260 return result
1262 if self._mock_return_value is not DEFAULT:
1263 return self.return_value
1265 if self._mock_delegate and self._mock_delegate.return_value is not DEFAULT:
1266 return self.return_value
1268 if self._mock_wraps is not None:
1269 return self._mock_wraps(*args, **kwargs)
1271 return self.return_value
1275class Mock(CallableMixin, NonCallableMock):
1276 """
1277 Create a new `Mock` object. `Mock` takes several optional arguments
1278 that specify the behaviour of the Mock object:
1280 * `spec`: This can be either a list of strings or an existing object (a
1281 class or instance) that acts as the specification for the mock object. If
1282 you pass in an object then a list of strings is formed by calling dir on
1283 the object (excluding unsupported magic attributes and methods). Accessing
1284 any attribute not in this list will raise an `AttributeError`.
1286 If `spec` is an object (rather than a list of strings) then
1287 `mock.__class__` returns the class of the spec object. This allows mocks
1288 to pass `isinstance` tests.
1290 * `spec_set`: A stricter variant of `spec`. If used, attempting to *set*
1291 or get an attribute on the mock that isn't on the object passed as
1292 `spec_set` will raise an `AttributeError`.
1294 * `side_effect`: A function to be called whenever the Mock is called. See
1295 the `side_effect` attribute. Useful for raising exceptions or
1296 dynamically changing return values. The function is called with the same
1297 arguments as the mock, and unless it returns `DEFAULT`, the return
1298 value of this function is used as the return value.
1300 If `side_effect` is an iterable then each call to the mock will return
1301 the next value from the iterable. If any of the members of the iterable
1302 are exceptions they will be raised instead of returned.
1304 * `return_value`: The value returned when the mock is called. By default
1305 this is a new Mock (created on first access). See the
1306 `return_value` attribute.
1308 * `unsafe`: By default, accessing any attribute whose name starts with
1309 *assert*, *assret*, *asert*, *aseert*, or *assrt* raises an AttributeError.
1310 Additionally, an AttributeError is raised when accessing
1311 attributes that match the name of an assertion method without the prefix
1312 `assert_`, e.g. accessing `called_once` instead of `assert_called_once`.
1313 Passing `unsafe=True` will allow access to these attributes.
1315 * `wraps`: Item for the mock object to wrap. If `wraps` is not None then
1316 calling the Mock will pass the call through to the wrapped object
1317 (returning the real result). Attribute access on the mock will return a
1318 Mock object that wraps the corresponding attribute of the wrapped object
1319 (so attempting to access an attribute that doesn't exist will raise an
1320 `AttributeError`).
1322 If the mock has an explicit `return_value` set then calls are not passed
1323 to the wrapped object and the `return_value` is returned instead.
1325 * `name`: If the mock has a name then it will be used in the repr of the
1326 mock. This can be useful for debugging. The name is propagated to child
1327 mocks.
1329 Mocks can also be called with arbitrary keyword arguments. These will be
1330 used to set attributes on the mock after it is created.
1331 """
1334def _dot_lookup(thing, comp, import_path):
1335 try:
1336 return getattr(thing, comp)
1337 except AttributeError:
1338 __import__(import_path)
1339 return getattr(thing, comp)
1342def _importer(target):
1343 components = target.split('.')
1344 import_path = components.pop(0)
1345 thing = __import__(import_path)
1347 for comp in components:
1348 import_path += ".%s" % comp
1349 thing = _dot_lookup(thing, comp, import_path)
1350 return thing
1353# _check_spec_arg_typos takes kwargs from commands like patch and checks that
1354# they don't contain common misspellings of arguments related to autospeccing.
1355def _check_spec_arg_typos(kwargs_to_check):
1356 typos = ("autospect", "auto_spec", "set_spec")
1357 for typo in typos:
1358 if typo in kwargs_to_check:
1359 raise RuntimeError(
1360 f"{typo!r} might be a typo; use unsafe=True if this is intended"
1361 )
1364class _patch(object):
1366 attribute_name = None
1367 _active_patches = []
1369 def __init__(
1370 self, getter, attribute, new, spec, create,
1371 spec_set, autospec, new_callable, kwargs, *, unsafe=False
1372 ):
1373 if new_callable is not None:
1374 if new is not DEFAULT:
1375 raise ValueError(
1376 "Cannot use 'new' and 'new_callable' together"
1377 )
1378 if autospec is not None:
1379 raise ValueError(
1380 "Cannot use 'autospec' and 'new_callable' together"
1381 )
1382 if not unsafe:
1383 _check_spec_arg_typos(kwargs)
1384 if _is_instance_mock(spec):
1385 raise InvalidSpecError(
1386 f'Cannot spec attr {attribute!r} as the spec '
1387 f'has already been mocked out. [spec={spec!r}]')
1388 if _is_instance_mock(spec_set):
1389 raise InvalidSpecError(
1390 f'Cannot spec attr {attribute!r} as the spec_set '
1391 f'target has already been mocked out. [spec_set={spec_set!r}]')
1393 self.getter = getter
1394 self.attribute = attribute
1395 self.new = new
1396 self.new_callable = new_callable
1397 self.spec = spec
1398 self.create = create
1399 self.has_local = False
1400 self.spec_set = spec_set
1401 self.autospec = autospec
1402 self.kwargs = kwargs
1403 self.additional_patchers = []
1404 self.is_started = False
1407 def copy(self):
1408 patcher = _patch(
1409 self.getter, self.attribute, self.new, self.spec,
1410 self.create, self.spec_set,
1411 self.autospec, self.new_callable, self.kwargs
1412 )
1413 patcher.attribute_name = self.attribute_name
1414 patcher.additional_patchers = [
1415 p.copy() for p in self.additional_patchers
1416 ]
1417 return patcher
1420 def __call__(self, func):
1421 if isinstance(func, type):
1422 return self.decorate_class(func)
1423 if iscoroutinefunction(func):
1424 return self.decorate_async_callable(func)
1425 return self.decorate_callable(func)
1428 def decorate_class(self, klass):
1429 for attr in dir(klass):
1430 if not attr.startswith(patch.TEST_PREFIX):
1431 continue
1433 attr_value = getattr(klass, attr)
1434 if not hasattr(attr_value, "__call__"):
1435 continue
1437 patcher = self.copy()
1438 setattr(klass, attr, patcher(attr_value))
1439 return klass
1442 @contextlib.contextmanager
1443 def decoration_helper(self, patched, args, keywargs):
1444 extra_args = []
1445 with contextlib.ExitStack() as exit_stack:
1446 for patching in patched.patchings:
1447 arg = exit_stack.enter_context(patching)
1448 if patching.attribute_name is not None:
1449 keywargs.update(arg)
1450 elif patching.new is DEFAULT:
1451 extra_args.append(arg)
1453 args += tuple(extra_args)
1454 yield (args, keywargs)
1457 def decorate_callable(self, func):
1458 # NB. Keep the method in sync with decorate_async_callable()
1459 if hasattr(func, 'patchings'):
1460 func.patchings.append(self)
1461 return func
1463 @wraps(func)
1464 def patched(*args, **keywargs):
1465 with self.decoration_helper(patched,
1466 args,
1467 keywargs) as (newargs, newkeywargs):
1468 return func(*newargs, **newkeywargs)
1470 patched.patchings = [self]
1471 return patched
1474 def decorate_async_callable(self, func):
1475 # NB. Keep the method in sync with decorate_callable()
1476 if hasattr(func, 'patchings'):
1477 func.patchings.append(self)
1478 return func
1480 @wraps(func)
1481 async def patched(*args, **keywargs):
1482 with self.decoration_helper(patched,
1483 args,
1484 keywargs) as (newargs, newkeywargs):
1485 return await func(*newargs, **newkeywargs)
1487 patched.patchings = [self]
1488 return patched
1491 def get_original(self):
1492 target = self.getter()
1493 name = self.attribute
1495 original = DEFAULT
1496 local = False
1498 try:
1499 original = target.__dict__[name]
1500 except (AttributeError, KeyError):
1501 original = getattr(target, name, DEFAULT)
1502 else:
1503 local = True
1505 if name in _builtins and isinstance(target, ModuleType):
1506 self.create = True
1508 if not self.create and original is DEFAULT:
1509 raise AttributeError(
1510 "%s does not have the attribute %r" % (target, name)
1511 )
1512 return original, local
1515 def __enter__(self):
1516 """Perform the patch."""
1517 if self.is_started:
1518 raise RuntimeError("Patch is already started")
1520 new, spec, spec_set = self.new, self.spec, self.spec_set
1521 autospec, kwargs = self.autospec, self.kwargs
1522 new_callable = self.new_callable
1523 self.target = self.getter()
1525 # normalise False to None
1526 if spec is False:
1527 spec = None
1528 if spec_set is False:
1529 spec_set = None
1530 if autospec is False:
1531 autospec = None
1533 if spec is not None and autospec is not None:
1534 raise TypeError("Can't specify spec and autospec")
1535 if ((spec is not None or autospec is not None) and
1536 spec_set not in (True, None)):
1537 raise TypeError("Can't provide explicit spec_set *and* spec or autospec")
1539 original, local = self.get_original()
1541 if new is DEFAULT and autospec is None:
1542 inherit = False
1543 if spec is True:
1544 # set spec to the object we are replacing
1545 spec = original
1546 if spec_set is True:
1547 spec_set = original
1548 spec = None
1549 elif spec is not None:
1550 if spec_set is True:
1551 spec_set = spec
1552 spec = None
1553 elif spec_set is True:
1554 spec_set = original
1556 if spec is not None or spec_set is not None:
1557 if original is DEFAULT:
1558 raise TypeError("Can't use 'spec' with create=True")
1559 if isinstance(original, type):
1560 # If we're patching out a class and there is a spec
1561 inherit = True
1563 # Determine the Klass to use
1564 if new_callable is not None:
1565 Klass = new_callable
1566 elif spec is None and _is_async_obj(original):
1567 Klass = AsyncMock
1568 elif spec is not None or spec_set is not None:
1569 this_spec = spec
1570 if spec_set is not None:
1571 this_spec = spec_set
1572 if _is_list(this_spec):
1573 not_callable = '__call__' not in this_spec
1574 else:
1575 not_callable = not callable(this_spec)
1576 if _is_async_obj(this_spec):
1577 Klass = AsyncMock
1578 elif not_callable:
1579 Klass = NonCallableMagicMock
1580 else:
1581 Klass = MagicMock
1582 else:
1583 Klass = MagicMock
1585 _kwargs = {}
1586 if spec is not None:
1587 _kwargs['spec'] = spec
1588 if spec_set is not None:
1589 _kwargs['spec_set'] = spec_set
1591 # add a name to mocks
1592 if (isinstance(Klass, type) and
1593 issubclass(Klass, NonCallableMock) and self.attribute):
1594 _kwargs['name'] = self.attribute
1596 _kwargs.update(kwargs)
1597 new = Klass(**_kwargs)
1599 if inherit and _is_instance_mock(new):
1600 # we can only tell if the instance should be callable if the
1601 # spec is not a list
1602 this_spec = spec
1603 if spec_set is not None:
1604 this_spec = spec_set
1605 if (not _is_list(this_spec) and not
1606 _instance_callable(this_spec)):
1607 Klass = NonCallableMagicMock
1609 _kwargs.pop('name')
1610 new.return_value = Klass(_new_parent=new, _new_name='()',
1611 **_kwargs)
1612 elif autospec is not None:
1613 # spec is ignored, new *must* be default, spec_set is treated
1614 # as a boolean. Should we check spec is not None and that spec_set
1615 # is a bool?
1616 if new is not DEFAULT:
1617 raise TypeError(
1618 "autospec creates the mock for you. Can't specify "
1619 "autospec and new."
1620 )
1621 if original is DEFAULT:
1622 raise TypeError("Can't use 'autospec' with create=True")
1623 spec_set = bool(spec_set)
1624 if autospec is True:
1625 autospec = original
1627 if _is_instance_mock(self.target):
1628 raise InvalidSpecError(
1629 f'Cannot autospec attr {self.attribute!r} as the patch '
1630 f'target has already been mocked out. '
1631 f'[target={self.target!r}, attr={autospec!r}]')
1632 if _is_instance_mock(autospec):
1633 target_name = getattr(self.target, '__name__', self.target)
1634 raise InvalidSpecError(
1635 f'Cannot autospec attr {self.attribute!r} from target '
1636 f'{target_name!r} as it has already been mocked out. '
1637 f'[target={self.target!r}, attr={autospec!r}]')
1639 new = create_autospec(autospec, spec_set=spec_set,
1640 _name=self.attribute, **kwargs)
1641 elif kwargs:
1642 # can't set keyword args when we aren't creating the mock
1643 # XXXX If new is a Mock we could call new.configure_mock(**kwargs)
1644 raise TypeError("Can't pass kwargs to a mock we aren't creating")
1646 new_attr = new
1648 self.temp_original = original
1649 self.is_local = local
1650 self._exit_stack = contextlib.ExitStack()
1651 self.is_started = True
1652 try:
1653 setattr(self.target, self.attribute, new_attr)
1654 if self.attribute_name is not None:
1655 extra_args = {}
1656 if self.new is DEFAULT:
1657 extra_args[self.attribute_name] = new
1658 for patching in self.additional_patchers:
1659 arg = self._exit_stack.enter_context(patching)
1660 if patching.new is DEFAULT:
1661 extra_args.update(arg)
1662 return extra_args
1664 return new
1665 except:
1666 if not self.__exit__(*sys.exc_info()):
1667 raise
1669 def __exit__(self, *exc_info):
1670 """Undo the patch."""
1671 if not self.is_started:
1672 return
1674 if self.is_local and self.temp_original is not DEFAULT:
1675 setattr(self.target, self.attribute, self.temp_original)
1676 else:
1677 delattr(self.target, self.attribute)
1678 if not self.create and (not hasattr(self.target, self.attribute) or
1679 self.attribute in ('__doc__', '__module__',
1680 '__defaults__', '__annotations__',
1681 '__kwdefaults__')):
1682 # needed for proxy objects like django settings
1683 setattr(self.target, self.attribute, self.temp_original)
1685 del self.temp_original
1686 del self.is_local
1687 del self.target
1688 exit_stack = self._exit_stack
1689 del self._exit_stack
1690 self.is_started = False
1691 return exit_stack.__exit__(*exc_info)
1694 def start(self):
1695 """Activate a patch, returning any created mock."""
1696 result = self.__enter__()
1697 self._active_patches.append(self)
1698 return result
1701 def stop(self):
1702 """Stop an active patch."""
1703 try:
1704 self._active_patches.remove(self)
1705 except ValueError:
1706 # If the patch hasn't been started this will fail
1707 return None
1709 return self.__exit__(None, None, None)
1713def _get_target(target):
1714 try:
1715 target, attribute = target.rsplit('.', 1)
1716 except (TypeError, ValueError, AttributeError):
1717 raise TypeError(
1718 f"Need a valid target to patch. You supplied: {target!r}")
1719 getter = lambda: _importer(target)
1720 return getter, attribute
1723def _patch_object(
1724 target, attribute, new=DEFAULT, spec=None,
1725 create=False, spec_set=None, autospec=None,
1726 new_callable=None, *, unsafe=False, **kwargs
1727 ):
1728 """
1729 patch the named member (`attribute`) on an object (`target`) with a mock
1730 object.
1732 `patch.object` can be used as a decorator, class decorator or a context
1733 manager. Arguments `new`, `spec`, `create`, `spec_set`,
1734 `autospec` and `new_callable` have the same meaning as for `patch`. Like
1735 `patch`, `patch.object` takes arbitrary keyword arguments for configuring
1736 the mock object it creates.
1738 When used as a class decorator `patch.object` honours `patch.TEST_PREFIX`
1739 for choosing which methods to wrap.
1740 """
1741 if type(target) is str:
1742 raise TypeError(
1743 f"{target!r} must be the actual object to be patched, not a str"
1744 )
1745 getter = lambda: target
1746 return _patch(
1747 getter, attribute, new, spec, create,
1748 spec_set, autospec, new_callable, kwargs, unsafe=unsafe
1749 )
1752def _patch_multiple(target, spec=None, create=False, spec_set=None,
1753 autospec=None, new_callable=None, **kwargs):
1754 """Perform multiple patches in a single call. It takes the object to be
1755 patched (either as an object or a string to fetch the object by importing)
1756 and keyword arguments for the patches::
1758 with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'):
1759 ...
1761 Use `DEFAULT` as the value if you want `patch.multiple` to create
1762 mocks for you. In this case the created mocks are passed into a decorated
1763 function by keyword, and a dictionary is returned when `patch.multiple` is
1764 used as a context manager.
1766 `patch.multiple` can be used as a decorator, class decorator or a context
1767 manager. The arguments `spec`, `spec_set`, `create`,
1768 `autospec` and `new_callable` have the same meaning as for `patch`. These
1769 arguments will be applied to *all* patches done by `patch.multiple`.
1771 When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX`
1772 for choosing which methods to wrap.
1773 """
1774 if type(target) is str:
1775 getter = lambda: _importer(target)
1776 else:
1777 getter = lambda: target
1779 if not kwargs:
1780 raise ValueError(
1781 'Must supply at least one keyword argument with patch.multiple'
1782 )
1783 # need to wrap in a list for python 3, where items is a view
1784 items = list(kwargs.items())
1785 attribute, new = items[0]
1786 patcher = _patch(
1787 getter, attribute, new, spec, create, spec_set,
1788 autospec, new_callable, {}
1789 )
1790 patcher.attribute_name = attribute
1791 for attribute, new in items[1:]:
1792 this_patcher = _patch(
1793 getter, attribute, new, spec, create, spec_set,
1794 autospec, new_callable, {}
1795 )
1796 this_patcher.attribute_name = attribute
1797 patcher.additional_patchers.append(this_patcher)
1798 return patcher
1801def patch(
1802 target, new=DEFAULT, spec=None, create=False,
1803 spec_set=None, autospec=None, new_callable=None, *, unsafe=False, **kwargs
1804 ):
1805 """
1806 `patch` acts as a function decorator, class decorator or a context
1807 manager. Inside the body of the function or with statement, the `target`
1808 is patched with a `new` object. When the function/with statement exits
1809 the patch is undone.
1811 If `new` is omitted, then the target is replaced with an
1812 `AsyncMock` if the patched object is an async function or a
1813 `MagicMock` otherwise. If `patch` is used as a decorator and `new` is
1814 omitted, the created mock is passed in as an extra argument to the
1815 decorated function. If `patch` is used as a context manager the created
1816 mock is returned by the context manager.
1818 `target` should be a string in the form `'package.module.ClassName'`. The
1819 `target` is imported and the specified object replaced with the `new`
1820 object, so the `target` must be importable from the environment you are
1821 calling `patch` from. The target is imported when the decorated function
1822 is executed, not at decoration time.
1824 The `spec` and `spec_set` keyword arguments are passed to the `MagicMock`
1825 if patch is creating one for you.
1827 In addition you can pass `spec=True` or `spec_set=True`, which causes
1828 patch to pass in the object being mocked as the spec/spec_set object.
1830 `new_callable` allows you to specify a different class, or callable object,
1831 that will be called to create the `new` object. By default `AsyncMock` is
1832 used for async functions and `MagicMock` for the rest.
1834 A more powerful form of `spec` is `autospec`. If you set `autospec=True`
1835 then the mock will be created with a spec from the object being replaced.
1836 All attributes of the mock will also have the spec of the corresponding
1837 attribute of the object being replaced. Methods and functions being
1838 mocked will have their arguments checked and will raise a `TypeError` if
1839 they are called with the wrong signature. For mocks replacing a class,
1840 their return value (the 'instance') will have the same spec as the class.
1842 Instead of `autospec=True` you can pass `autospec=some_object` to use an
1843 arbitrary object as the spec instead of the one being replaced.
1845 By default `patch` will fail to replace attributes that don't exist. If
1846 you pass in `create=True`, and the attribute doesn't exist, patch will
1847 create the attribute for you when the patched function is called, and
1848 delete it again afterwards. This is useful for writing tests against
1849 attributes that your production code creates at runtime. It is off by
1850 default because it can be dangerous. With it switched on you can write
1851 passing tests against APIs that don't actually exist!
1853 Patch can be used as a `TestCase` class decorator. It works by
1854 decorating each test method in the class. This reduces the boilerplate
1855 code when your test methods share a common patchings set. `patch` finds
1856 tests by looking for method names that start with `patch.TEST_PREFIX`.
1857 By default this is `test`, which matches the way `unittest` finds tests.
1858 You can specify an alternative prefix by setting `patch.TEST_PREFIX`.
1860 Patch can be used as a context manager, with the with statement. Here the
1861 patching applies to the indented block after the with statement. If you
1862 use "as" then the patched object will be bound to the name after the
1863 "as"; very useful if `patch` is creating a mock object for you.
1865 Patch will raise a `RuntimeError` if passed some common misspellings of
1866 the arguments autospec and spec_set. Pass the argument `unsafe` with the
1867 value True to disable that check.
1869 `patch` takes arbitrary keyword arguments. These will be passed to
1870 `AsyncMock` if the patched object is asynchronous, to `MagicMock`
1871 otherwise or to `new_callable` if specified.
1873 `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are
1874 available for alternate use-cases.
1875 """
1876 getter, attribute = _get_target(target)
1877 return _patch(
1878 getter, attribute, new, spec, create,
1879 spec_set, autospec, new_callable, kwargs, unsafe=unsafe
1880 )
1883class _patch_dict(object):
1884 """
1885 Patch a dictionary, or dictionary like object, and restore the dictionary
1886 to its original state after the test, where the restored dictionary is
1887 a copy of the dictionary as it was before the test.
1889 `in_dict` can be a dictionary or a mapping like container. If it is a
1890 mapping then it must at least support getting, setting and deleting items
1891 plus iterating over keys.
1893 `in_dict` can also be a string specifying the name of the dictionary, which
1894 will then be fetched by importing it.
1896 `values` can be a dictionary of values to set in the dictionary. `values`
1897 can also be an iterable of `(key, value)` pairs.
1899 If `clear` is True then the dictionary will be cleared before the new
1900 values are set.
1902 `patch.dict` can also be called with arbitrary keyword arguments to set
1903 values in the dictionary::
1905 with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()):
1906 ...
1908 `patch.dict` can be used as a context manager, decorator or class
1909 decorator. When used as a class decorator `patch.dict` honours
1910 `patch.TEST_PREFIX` for choosing which methods to wrap.
1911 """
1913 def __init__(self, in_dict, values=(), clear=False, **kwargs):
1914 self.in_dict = in_dict
1915 # support any argument supported by dict(...) constructor
1916 self.values = dict(values)
1917 self.values.update(kwargs)
1918 self.clear = clear
1919 self._original = None
1922 def __call__(self, f):
1923 if isinstance(f, type):
1924 return self.decorate_class(f)
1925 if iscoroutinefunction(f):
1926 return self.decorate_async_callable(f)
1927 return self.decorate_callable(f)
1930 def decorate_callable(self, f):
1931 @wraps(f)
1932 def _inner(*args, **kw):
1933 self._patch_dict()
1934 try:
1935 return f(*args, **kw)
1936 finally:
1937 self._unpatch_dict()
1939 return _inner
1942 def decorate_async_callable(self, f):
1943 @wraps(f)
1944 async def _inner(*args, **kw):
1945 self._patch_dict()
1946 try:
1947 return await f(*args, **kw)
1948 finally:
1949 self._unpatch_dict()
1951 return _inner
1954 def decorate_class(self, klass):
1955 for attr in dir(klass):
1956 attr_value = getattr(klass, attr)
1957 if (attr.startswith(patch.TEST_PREFIX) and
1958 hasattr(attr_value, "__call__")):
1959 decorator = _patch_dict(self.in_dict, self.values, self.clear)
1960 decorated = decorator(attr_value)
1961 setattr(klass, attr, decorated)
1962 return klass
1965 def __enter__(self):
1966 """Patch the dict."""
1967 self._patch_dict()
1968 return self.in_dict
1971 def _patch_dict(self):
1972 values = self.values
1973 if isinstance(self.in_dict, str):
1974 self.in_dict = _importer(self.in_dict)
1975 in_dict = self.in_dict
1976 clear = self.clear
1978 try:
1979 original = in_dict.copy()
1980 except AttributeError:
1981 # dict like object with no copy method
1982 # must support iteration over keys
1983 original = {}
1984 for key in in_dict:
1985 original[key] = in_dict[key]
1986 self._original = original
1988 if clear:
1989 _clear_dict(in_dict)
1991 try:
1992 in_dict.update(values)
1993 except AttributeError:
1994 # dict like object with no update method
1995 for key in values:
1996 in_dict[key] = values[key]
1999 def _unpatch_dict(self):
2000 in_dict = self.in_dict
2001 original = self._original
2003 _clear_dict(in_dict)
2005 try:
2006 in_dict.update(original)
2007 except AttributeError:
2008 for key in original:
2009 in_dict[key] = original[key]
2012 def __exit__(self, *args):
2013 """Unpatch the dict."""
2014 if self._original is not None:
2015 self._unpatch_dict()
2016 return False
2019 def start(self):
2020 """Activate a patch, returning any created mock."""
2021 result = self.__enter__()
2022 _patch._active_patches.append(self)
2023 return result
2026 def stop(self):
2027 """Stop an active patch."""
2028 try:
2029 _patch._active_patches.remove(self)
2030 except ValueError:
2031 # If the patch hasn't been started this will fail
2032 return None
2034 return self.__exit__(None, None, None)
2037def _clear_dict(in_dict):
2038 try:
2039 in_dict.clear()
2040 except AttributeError:
2041 keys = list(in_dict)
2042 for key in keys:
2043 del in_dict[key]
2046def _patch_stopall():
2047 """Stop all active patches. LIFO to unroll nested patches."""
2048 for patch in reversed(_patch._active_patches):
2049 patch.stop()
2052patch.object = _patch_object
2053patch.dict = _patch_dict
2054patch.multiple = _patch_multiple
2055patch.stopall = _patch_stopall
2056patch.TEST_PREFIX = 'test'
2058magic_methods = (
2059 "lt le gt ge eq ne "
2060 "getitem setitem delitem "
2061 "len contains iter "
2062 "hash str sizeof "
2063 "enter exit "
2064 # we added divmod and rdivmod here instead of numerics
2065 # because there is no idivmod
2066 "divmod rdivmod neg pos abs invert "
2067 "complex int float index "
2068 "round trunc floor ceil "
2069 "bool next "
2070 "fspath "
2071 "aiter "
2072)
2074if IS_PYPY:
2075 # PyPy has no __sizeof__: http://doc.pypy.org/en/latest/cpython_differences.html
2076 magic_methods = magic_methods.replace('sizeof ', '')
2078numerics = (
2079 "add sub mul matmul truediv floordiv mod lshift rshift and xor or pow"
2080)
2081inplace = ' '.join('i%s' % n for n in numerics.split())
2082right = ' '.join('r%s' % n for n in numerics.split())
2084# not including __prepare__, __instancecheck__, __subclasscheck__
2085# (as they are metaclass methods)
2086# __del__ is not supported at all as it causes problems if it exists
2088_non_defaults = {
2089 '__get__', '__set__', '__delete__', '__reversed__', '__missing__',
2090 '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__',
2091 '__getstate__', '__setstate__', '__getformat__',
2092 '__repr__', '__dir__', '__subclasses__', '__format__',
2093 '__getnewargs_ex__',
2094}
2097def _get_method(name, func):
2098 "Turns a callable object (like a mock) into a real function"
2099 def method(self, *args, **kw):
2100 return func(self, *args, **kw)
2101 method.__name__ = name
2102 return method
2105_magics = {
2106 '__%s__' % method for method in
2107 ' '.join([magic_methods, numerics, inplace, right]).split()
2108}
2110# Magic methods used for async `with` statements
2111_async_method_magics = {"__aenter__", "__aexit__", "__anext__"}
2112# Magic methods that are only used with async calls but are synchronous functions themselves
2113_sync_async_magics = {"__aiter__"}
2114_async_magics = _async_method_magics | _sync_async_magics
2116_all_sync_magics = _magics | _non_defaults
2117_all_magics = _all_sync_magics | _async_magics
2119_unsupported_magics = {
2120 '__getattr__', '__setattr__',
2121 '__init__', '__new__', '__prepare__',
2122 '__instancecheck__', '__subclasscheck__',
2123 '__del__'
2124}
2126_calculate_return_value = {
2127 '__hash__': lambda self: object.__hash__(self),
2128 '__str__': lambda self: object.__str__(self),
2129 '__sizeof__': lambda self: object.__sizeof__(self),
2130 '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}",
2131}
2133_return_values = {
2134 '__lt__': NotImplemented,
2135 '__gt__': NotImplemented,
2136 '__le__': NotImplemented,
2137 '__ge__': NotImplemented,
2138 '__int__': 1,
2139 '__contains__': False,
2140 '__len__': 0,
2141 '__exit__': False,
2142 '__complex__': 1j,
2143 '__float__': 1.0,
2144 '__bool__': True,
2145 '__index__': 1,
2146 '__aexit__': False,
2147}
2150def _get_eq(self):
2151 def __eq__(other):
2152 ret_val = self.__eq__._mock_return_value
2153 if ret_val is not DEFAULT:
2154 return ret_val
2155 if self is other:
2156 return True
2157 return NotImplemented
2158 return __eq__
2160def _get_ne(self):
2161 def __ne__(other):
2162 if self.__ne__._mock_return_value is not DEFAULT:
2163 return DEFAULT
2164 if self is other:
2165 return False
2166 return NotImplemented
2167 return __ne__
2169def _get_iter(self):
2170 def __iter__():
2171 ret_val = self.__iter__._mock_return_value
2172 if ret_val is DEFAULT:
2173 return iter([])
2174 # if ret_val was already an iterator, then calling iter on it should
2175 # return the iterator unchanged
2176 return iter(ret_val)
2177 return __iter__
2179def _get_async_iter(self):
2180 def __aiter__():
2181 ret_val = self.__aiter__._mock_return_value
2182 if ret_val is DEFAULT:
2183 return _AsyncIterator(iter([]))
2184 return _AsyncIterator(iter(ret_val))
2185 return __aiter__
2187_side_effect_methods = {
2188 '__eq__': _get_eq,
2189 '__ne__': _get_ne,
2190 '__iter__': _get_iter,
2191 '__aiter__': _get_async_iter
2192}
2196def _set_return_value(mock, method, name):
2197 fixed = _return_values.get(name, DEFAULT)
2198 if fixed is not DEFAULT:
2199 method.return_value = fixed
2200 return
2202 return_calculator = _calculate_return_value.get(name)
2203 if return_calculator is not None:
2204 return_value = return_calculator(mock)
2205 method.return_value = return_value
2206 return
2208 side_effector = _side_effect_methods.get(name)
2209 if side_effector is not None:
2210 method.side_effect = side_effector(mock)
2214class MagicMixin(Base):
2215 def __init__(self, *args, **kw):
2216 self._mock_set_magics() # make magic work for kwargs in init
2217 _safe_super(MagicMixin, self).__init__(*args, **kw)
2218 self._mock_set_magics() # fix magic broken by upper level init
2221 def _mock_set_magics(self):
2222 orig_magics = _magics | _async_method_magics
2223 these_magics = orig_magics
2225 if getattr(self, "_mock_methods", None) is not None:
2226 these_magics = orig_magics.intersection(self._mock_methods)
2227 remove_magics = orig_magics - these_magics
2229 for entry in remove_magics:
2230 if entry in type(self).__dict__:
2231 # remove unneeded magic methods
2232 delattr(self, entry)
2234 # don't overwrite existing attributes if called a second time
2235 these_magics = these_magics - set(type(self).__dict__)
2237 _type = type(self)
2238 for entry in these_magics:
2239 setattr(_type, entry, MagicProxy(entry, self))
2243class NonCallableMagicMock(MagicMixin, NonCallableMock):
2244 """A version of `MagicMock` that isn't callable."""
2245 def mock_add_spec(self, spec, spec_set=False):
2246 """Add a spec to a mock. `spec` can either be an object or a
2247 list of strings. Only attributes on the `spec` can be fetched as
2248 attributes from the mock.
2250 If `spec_set` is True then only attributes on the spec can be set."""
2251 self._mock_add_spec(spec, spec_set)
2252 self._mock_set_magics()
2255class AsyncMagicMixin(MagicMixin):
2256 pass
2259class MagicMock(MagicMixin, Mock):
2260 """
2261 MagicMock is a subclass of Mock with default implementations
2262 of most of the magic methods. You can use MagicMock without having to
2263 configure the magic methods yourself.
2265 If you use the `spec` or `spec_set` arguments then *only* magic
2266 methods that exist in the spec will be created.
2268 Attributes and the return value of a `MagicMock` will also be `MagicMocks`.
2269 """
2270 def mock_add_spec(self, spec, spec_set=False):
2271 """Add a spec to a mock. `spec` can either be an object or a
2272 list of strings. Only attributes on the `spec` can be fetched as
2273 attributes from the mock.
2275 If `spec_set` is True then only attributes on the spec can be set."""
2276 self._mock_add_spec(spec, spec_set)
2277 self._mock_set_magics()
2279 def reset_mock(self, *args, return_value: bool = False, **kwargs):
2280 if (
2281 return_value
2282 and self._mock_name
2283 and _is_magic(self._mock_name)
2284 ):
2285 # Don't reset return values for magic methods,
2286 # otherwise `m.__str__` will start
2287 # to return `MagicMock` instances, instead of `str` instances.
2288 return_value = False
2289 super().reset_mock(*args, return_value=return_value, **kwargs)
2292class MagicProxy(Base):
2293 def __init__(self, name, parent):
2294 self.name = name
2295 self.parent = parent
2297 def create_mock(self):
2298 entry = self.name
2299 parent = self.parent
2300 m = parent._get_child_mock(name=entry, _new_name=entry,
2301 _new_parent=parent)
2302 setattr(parent, entry, m)
2303 _set_return_value(parent, m, entry)
2304 return m
2306 def __get__(self, obj, _type=None):
2307 return self.create_mock()
2310try:
2311 _CODE_SIG = inspect.signature(partial(CodeType.__init__, None))
2312 _CODE_ATTRS = dir(CodeType)
2313except ValueError: # pragma: no cover - backport is only tested against builds with docstrings
2314 _CODE_SIG = None
2317class AsyncMockMixin(Base):
2318 await_count = _delegating_property('await_count')
2319 await_args = _delegating_property('await_args')
2320 await_args_list = _delegating_property('await_args_list')
2322 def __init__(self, *args, **kwargs):
2323 super().__init__(*args, **kwargs)
2324 # iscoroutinefunction() checks _is_coroutine property to say if an
2325 # object is a coroutine. Without this check it looks to see if it is a
2326 # function/method, which in this case it is not (since it is an
2327 # AsyncMock).
2328 # It is set through __dict__ because when spec_set is True, this
2329 # attribute is likely undefined.
2330 self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine
2331 self.__dict__['_mock_await_count'] = 0
2332 self.__dict__['_mock_await_args'] = None
2333 self.__dict__['_mock_await_args_list'] = _CallList()
2334 if _CODE_SIG:
2335 code_mock = NonCallableMock(spec_set=_CODE_ATTRS)
2336 code_mock.__dict__["_spec_class"] = CodeType
2337 code_mock.__dict__["_spec_signature"] = _CODE_SIG
2338 else: # pragma: no cover - backport is only tested against builds with docstrings
2339 code_mock = NonCallableMock(spec_set=CodeType)
2340 code_mock.co_flags = (
2341 inspect.CO_COROUTINE
2342 + inspect.CO_VARARGS
2343 + inspect.CO_VARKEYWORDS
2344 )
2345 code_mock.co_argcount = 0
2346 code_mock.co_varnames = ('args', 'kwargs')
2347 try:
2348 code_mock.co_posonlyargcount = 0
2349 except AttributeError:
2350 # Python 3.7 and earlier.
2351 pass
2352 code_mock.co_kwonlyargcount = 0
2353 self.__dict__['__code__'] = code_mock
2354 self.__dict__['__name__'] = 'AsyncMock'
2355 self.__dict__['__defaults__'] = tuple()
2356 self.__dict__['__kwdefaults__'] = {}
2357 self.__dict__['__annotations__'] = None
2359 async def _execute_mock_call(_mock_self, *args, **kwargs):
2360 self = _mock_self
2361 # This is nearly just like super(), except for special handling
2362 # of coroutines
2364 _call = _Call((args, kwargs), two=True)
2365 self.await_count += 1
2366 self.await_args = _call
2367 self.await_args_list.append(_call)
2369 effect = self.side_effect
2370 if effect is not None:
2371 if _is_exception(effect):
2372 raise effect
2373 elif not _callable(effect):
2374 try:
2375 result = next(effect)
2376 except StopIteration:
2377 # It is impossible to propagate a StopIteration
2378 # through coroutines because of PEP 479
2379 raise StopAsyncIteration
2380 if _is_exception(result):
2381 raise result
2382 elif iscoroutinefunction(effect):
2383 result = await effect(*args, **kwargs)
2384 else:
2385 result = effect(*args, **kwargs)
2387 if result is not DEFAULT:
2388 return result
2390 if self._mock_return_value is not DEFAULT:
2391 return self.return_value
2393 if self._mock_wraps is not None:
2394 if iscoroutinefunction(self._mock_wraps):
2395 return await self._mock_wraps(*args, **kwargs)
2396 return self._mock_wraps(*args, **kwargs)
2398 return self.return_value
2400 def assert_awaited(_mock_self):
2401 """
2402 Assert that the mock was awaited at least once.
2403 """
2404 self = _mock_self
2405 if self.await_count == 0:
2406 msg = f"Expected {self._mock_name or 'mock'} to have been awaited."
2407 raise AssertionError(msg)
2409 def assert_awaited_once(_mock_self):
2410 """
2411 Assert that the mock was awaited exactly once.
2412 """
2413 self = _mock_self
2414 if not self.await_count == 1:
2415 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2416 f" Awaited {self.await_count} times.")
2417 raise AssertionError(msg)
2419 def assert_awaited_with(_mock_self, *args, **kwargs):
2420 """
2421 Assert that the last await was with the specified arguments.
2422 """
2423 self = _mock_self
2424 if self.await_args is None:
2425 expected = self._format_mock_call_signature(args, kwargs)
2426 raise AssertionError(f'Expected await: {expected}\nNot awaited')
2428 def _error_message():
2429 msg = self._format_mock_failure_message(args, kwargs, action='await')
2430 return msg
2432 expected = self._call_matcher(_Call((args, kwargs), two=True))
2433 actual = self._call_matcher(self.await_args)
2434 if actual != expected:
2435 cause = expected if isinstance(expected, Exception) else None
2436 raise AssertionError(_error_message()) from cause
2438 def assert_awaited_once_with(_mock_self, *args, **kwargs):
2439 """
2440 Assert that the mock was awaited exactly once and with the specified
2441 arguments.
2442 """
2443 self = _mock_self
2444 if not self.await_count == 1:
2445 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2446 f" Awaited {self.await_count} times.")
2447 raise AssertionError(msg)
2448 return self.assert_awaited_with(*args, **kwargs)
2450 def assert_any_await(_mock_self, *args, **kwargs):
2451 """
2452 Assert the mock has ever been awaited with the specified arguments.
2453 """
2454 self = _mock_self
2455 expected = self._call_matcher(_Call((args, kwargs), two=True))
2456 cause = expected if isinstance(expected, Exception) else None
2457 actual = [self._call_matcher(c) for c in self.await_args_list]
2458 if cause or expected not in _AnyComparer(actual):
2459 expected_string = self._format_mock_call_signature(args, kwargs)
2460 raise AssertionError(
2461 '%s await not found' % expected_string
2462 ) from cause
2464 def assert_has_awaits(_mock_self, calls, any_order=False):
2465 """
2466 Assert the mock has been awaited with the specified calls.
2467 The :attr:`await_args_list` list is checked for the awaits.
2469 If `any_order` is False (the default) then the awaits must be
2470 sequential. There can be extra calls before or after the
2471 specified awaits.
2473 If `any_order` is True then the awaits can be in any order, but
2474 they must all appear in :attr:`await_args_list`.
2475 """
2476 self = _mock_self
2477 expected = [self._call_matcher(c) for c in calls]
2478 cause = cause = next((e for e in expected if isinstance(e, Exception)), None)
2479 all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list)
2480 if not any_order:
2481 if expected not in all_awaits:
2482 if cause is None:
2483 problem = 'Awaits not found.'
2484 else:
2485 problem = ('Error processing expected awaits.\n'
2486 'Errors: {}').format(
2487 [e if isinstance(e, Exception) else None
2488 for e in expected])
2489 raise AssertionError(
2490 f'{problem}\n'
2491 f'Expected: {_CallList(calls)}\n'
2492 f'Actual: {self.await_args_list}'
2493 ) from cause
2494 return
2496 all_awaits = list(all_awaits)
2498 not_found = []
2499 for kall in expected:
2500 try:
2501 all_awaits.remove(kall)
2502 except ValueError:
2503 not_found.append(kall)
2504 if not_found:
2505 raise AssertionError(
2506 '%r not all found in await list' % (tuple(not_found),)
2507 ) from cause
2509 def assert_not_awaited(_mock_self):
2510 """
2511 Assert that the mock was never awaited.
2512 """
2513 self = _mock_self
2514 if self.await_count != 0:
2515 msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited."
2516 f" Awaited {self.await_count} times.")
2517 raise AssertionError(msg)
2519 def reset_mock(self, *args, **kwargs):
2520 """
2521 See :func:`.Mock.reset_mock()`
2522 """
2523 super().reset_mock(*args, **kwargs)
2524 self.await_count = 0
2525 self.await_args = None
2526 self.await_args_list = _CallList()
2529class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock):
2530 """
2531 Enhance :class:`Mock` with features allowing to mock
2532 an async function.
2534 The :class:`AsyncMock` object will behave so the object is
2535 recognized as an async function, and the result of a call is an awaitable:
2537 >>> mock = AsyncMock()
2538 >>> iscoroutinefunction(mock)
2539 True
2540 >>> inspect.isawaitable(mock())
2541 True
2544 The result of ``mock()`` is an async function which will have the outcome
2545 of ``side_effect`` or ``return_value``:
2547 - if ``side_effect`` is a function, the async function will return the
2548 result of that function,
2549 - if ``side_effect`` is an exception, the async function will raise the
2550 exception,
2551 - if ``side_effect`` is an iterable, the async function will return the
2552 next value of the iterable, however, if the sequence of result is
2553 exhausted, ``StopIteration`` is raised immediately,
2554 - if ``side_effect`` is not defined, the async function will return the
2555 value defined by ``return_value``, hence, by default, the async function
2556 returns a new :class:`AsyncMock` object.
2558 If the outcome of ``side_effect`` or ``return_value`` is an async function,
2559 the mock async function obtained when the mock object is called will be this
2560 async function itself (and not an async function returning an async
2561 function).
2563 The test author can also specify a wrapped object with ``wraps``. In this
2564 case, the :class:`Mock` object behavior is the same as with an
2565 :class:`.Mock` object: the wrapped object may have methods
2566 defined as async function functions.
2568 Based on Martin Richard's asynctest project.
2569 """
2572class _ANY(object):
2573 "A helper object that compares equal to everything."
2575 def __eq__(self, other):
2576 return True
2578 def __ne__(self, other):
2579 return False
2581 def __repr__(self):
2582 return '<ANY>'
2584ANY = _ANY()
2588def _format_call_signature(name, args, kwargs):
2589 message = '%s(%%s)' % name
2590 formatted_args = ''
2591 args_string = ', '.join([repr(arg) for arg in args])
2592 kwargs_string = ', '.join([
2593 '%s=%r' % (key, value) for key, value in kwargs.items()
2594 ])
2595 if args_string:
2596 formatted_args = args_string
2597 if kwargs_string:
2598 if formatted_args:
2599 formatted_args += ', '
2600 formatted_args += kwargs_string
2602 return message % formatted_args
2606class _Call(tuple):
2607 """
2608 A tuple for holding the results of a call to a mock, either in the form
2609 `(args, kwargs)` or `(name, args, kwargs)`.
2611 If args or kwargs are empty then a call tuple will compare equal to
2612 a tuple without those values. This makes comparisons less verbose::
2614 _Call(('name', (), {})) == ('name',)
2615 _Call(('name', (1,), {})) == ('name', (1,))
2616 _Call(((), {'a': 'b'})) == ({'a': 'b'},)
2618 The `_Call` object provides a useful shortcut for comparing with call::
2620 _Call(((1, 2), {'a': 3})) == call(1, 2, a=3)
2621 _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3)
2623 If the _Call has no name then it will match any name.
2624 """
2625 def __new__(cls, value=(), name='', parent=None, two=False,
2626 from_kall=True):
2627 args = ()
2628 kwargs = {}
2629 _len = len(value)
2630 if _len == 3:
2631 name, args, kwargs = value
2632 elif _len == 2:
2633 first, second = value
2634 if isinstance(first, str):
2635 name = first
2636 if isinstance(second, tuple):
2637 args = second
2638 else:
2639 kwargs = second
2640 else:
2641 args, kwargs = first, second
2642 elif _len == 1:
2643 value, = value
2644 if isinstance(value, str):
2645 name = value
2646 elif isinstance(value, tuple):
2647 args = value
2648 else:
2649 kwargs = value
2651 if two:
2652 return tuple.__new__(cls, (args, kwargs))
2654 return tuple.__new__(cls, (name, args, kwargs))
2657 def __init__(self, value=(), name=None, parent=None, two=False,
2658 from_kall=True):
2659 self._mock_name = name
2660 self._mock_parent = parent
2661 self._mock_from_kall = from_kall
2664 def __eq__(self, other):
2665 try:
2666 len_other = len(other)
2667 except TypeError:
2668 return NotImplemented
2670 self_name = ''
2671 if len(self) == 2:
2672 self_args, self_kwargs = self
2673 else:
2674 self_name, self_args, self_kwargs = self
2676 if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None)
2677 and self._mock_parent != other._mock_parent):
2678 return False
2680 other_name = ''
2681 if len_other == 0:
2682 other_args, other_kwargs = (), {}
2683 elif len_other == 3:
2684 other_name, other_args, other_kwargs = other
2685 elif len_other == 1:
2686 value, = other
2687 if isinstance(value, tuple):
2688 other_args = value
2689 other_kwargs = {}
2690 elif isinstance(value, str):
2691 other_name = value
2692 other_args, other_kwargs = (), {}
2693 else:
2694 other_args = ()
2695 other_kwargs = value
2696 elif len_other == 2:
2697 # could be (name, args) or (name, kwargs) or (args, kwargs)
2698 first, second = other
2699 if isinstance(first, str):
2700 other_name = first
2701 if isinstance(second, tuple):
2702 other_args, other_kwargs = second, {}
2703 else:
2704 other_args, other_kwargs = (), second
2705 else:
2706 other_args, other_kwargs = first, second
2707 else:
2708 return False
2710 if self_name and other_name != self_name:
2711 return False
2713 # this order is important for ANY to work!
2714 return (other_args, other_kwargs) == (self_args, self_kwargs)
2717 __ne__ = object.__ne__
2720 def __call__(self, *args, **kwargs):
2721 if self._mock_name is None:
2722 return _Call(('', args, kwargs), name='()')
2724 name = self._mock_name + '()'
2725 return _Call((self._mock_name, args, kwargs), name=name, parent=self)
2728 def __getattr__(self, attr):
2729 if self._mock_name is None:
2730 return _Call(name=attr, from_kall=False)
2731 name = '%s.%s' % (self._mock_name, attr)
2732 return _Call(name=name, parent=self, from_kall=False)
2735 def __getattribute__(self, attr):
2736 if attr in tuple.__dict__:
2737 raise AttributeError
2738 return tuple.__getattribute__(self, attr)
2741 def _get_call_arguments(self):
2742 if len(self) == 2:
2743 args, kwargs = self
2744 else:
2745 name, args, kwargs = self
2747 return args, kwargs
2749 @property
2750 def args(self):
2751 return self._get_call_arguments()[0]
2753 @property
2754 def kwargs(self):
2755 return self._get_call_arguments()[1]
2757 def __repr__(self):
2758 if not self._mock_from_kall:
2759 name = self._mock_name or 'call'
2760 if name.startswith('()'):
2761 name = 'call%s' % name
2762 return name
2764 if len(self) == 2:
2765 name = 'call'
2766 args, kwargs = self
2767 else:
2768 name, args, kwargs = self
2769 if not name:
2770 name = 'call'
2771 elif not name.startswith('()'):
2772 name = 'call.%s' % name
2773 else:
2774 name = 'call%s' % name
2775 return _format_call_signature(name, args, kwargs)
2778 def call_list(self):
2779 """For a call object that represents multiple calls, `call_list`
2780 returns a list of all the intermediate calls as well as the
2781 final call."""
2782 vals = []
2783 thing = self
2784 while thing is not None:
2785 if thing._mock_from_kall:
2786 vals.append(thing)
2787 thing = thing._mock_parent
2788 return _CallList(reversed(vals))
2791call = _Call(from_kall=False)
2794def create_autospec(spec, spec_set=False, instance=False, _parent=None,
2795 _name=None, *, unsafe=False, **kwargs):
2796 """Create a mock object using another object as a spec. Attributes on the
2797 mock will use the corresponding attribute on the `spec` object as their
2798 spec.
2800 Functions or methods being mocked will have their arguments checked
2801 to check that they are called with the correct signature.
2803 If `spec_set` is True then attempting to set attributes that don't exist
2804 on the spec object will raise an `AttributeError`.
2806 If a class is used as a spec then the return value of the mock (the
2807 instance of the class) will have the same spec. You can use a class as the
2808 spec for an instance object by passing `instance=True`. The returned mock
2809 will only be callable if instances of the mock are callable.
2811 `create_autospec` will raise a `RuntimeError` if passed some common
2812 misspellings of the arguments autospec and spec_set. Pass the argument
2813 `unsafe` with the value True to disable that check.
2815 `create_autospec` also takes arbitrary keyword arguments that are passed to
2816 the constructor of the created mock."""
2817 if _is_list(spec):
2818 # can't pass a list instance to the mock constructor as it will be
2819 # interpreted as a list of strings
2820 spec = type(spec)
2822 is_type = isinstance(spec, type)
2823 if _is_instance_mock(spec):
2824 raise InvalidSpecError(f'Cannot autospec a Mock object. '
2825 f'[object={spec!r}]')
2826 is_async_func = _is_async_func(spec)
2828 entries = [(entry, _missing) for entry in dir(spec)]
2829 if is_type and instance and HAS_DATACLASSES and is_dataclass(spec):
2830 dataclass_fields = fields(spec)
2831 entries.extend((f.name, f.type) for f in dataclass_fields)
2832 _kwargs = {'spec': [f.name for f in dataclass_fields]}
2833 else:
2834 _kwargs = {'spec': spec}
2836 if spec_set:
2837 _kwargs = {'spec_set': spec}
2838 elif spec is None:
2839 # None we mock with a normal mock without a spec
2840 _kwargs = {}
2841 if _kwargs and instance:
2842 _kwargs['_spec_as_instance'] = True
2843 if not unsafe:
2844 _check_spec_arg_typos(kwargs)
2846 _name = kwargs.pop('name', _name)
2847 _new_name = _name
2848 if _parent is None:
2849 # for a top level object no _new_name should be set
2850 _new_name = ''
2852 _kwargs.update(kwargs)
2854 Klass = MagicMock
2855 if inspect.isdatadescriptor(spec):
2856 # descriptors don't have a spec
2857 # because we don't know what type they return
2858 _kwargs = {}
2859 elif is_async_func:
2860 if instance:
2861 raise RuntimeError("Instance can not be True when create_autospec "
2862 "is mocking an async function")
2863 Klass = AsyncMock
2864 elif not _callable(spec):
2865 Klass = NonCallableMagicMock
2866 elif is_type and instance and not _instance_callable(spec):
2867 Klass = NonCallableMagicMock
2869 mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name,
2870 name=_name, **_kwargs)
2872 if isinstance(spec, FunctionTypes):
2873 # should only happen at the top level because we don't
2874 # recurse for functions
2875 if is_async_func:
2876 mock = _set_async_signature(mock, spec)
2877 else:
2878 mock = _set_signature(mock, spec)
2879 else:
2880 _check_signature(spec, mock, is_type, instance)
2882 if _parent is not None and not instance:
2883 _parent._mock_children[_name] = mock
2885 # Pop wraps from kwargs because it must not be passed to configure_mock.
2886 wrapped = kwargs.pop('wraps', None)
2887 if is_type and not instance and 'return_value' not in kwargs:
2888 mock.return_value = create_autospec(spec, spec_set, instance=True,
2889 _name='()', _parent=mock,
2890 wraps=wrapped)
2892 for entry, original in entries:
2893 if _is_magic(entry):
2894 # MagicMock already does the useful magic methods for us
2895 continue
2897 # XXXX do we need a better way of getting attributes without
2898 # triggering code execution (?) Probably not - we need the actual
2899 # object to mock it so we would rather trigger a property than mock
2900 # the property descriptor. Likewise we want to mock out dynamically
2901 # provided attributes.
2902 # XXXX what about attributes that raise exceptions other than
2903 # AttributeError on being fetched?
2904 # we could be resilient against it, or catch and propagate the
2905 # exception when the attribute is fetched from the mock
2906 if original is _missing:
2907 try:
2908 original = getattr(spec, entry)
2909 except AttributeError:
2910 continue
2912 child_kwargs = {'spec': original}
2913 # Wrap child attributes also.
2914 if wrapped and hasattr(wrapped, entry):
2915 child_kwargs.update(wraps=original)
2916 if spec_set:
2917 child_kwargs = {'spec_set': original}
2919 if not isinstance(original, FunctionTypes):
2920 new = _SpecState(original, spec_set, mock, entry, instance)
2921 mock._mock_children[entry] = new
2922 else:
2923 parent = mock
2924 if isinstance(spec, FunctionTypes):
2925 parent = mock.mock
2927 skipfirst = _must_skip(spec, entry, is_type)
2928 child_kwargs['_eat_self'] = skipfirst
2929 if iscoroutinefunction(original):
2930 child_klass = AsyncMock
2931 else:
2932 child_klass = MagicMock
2933 new = child_klass(parent=parent, name=entry, _new_name=entry,
2934 _new_parent=parent, **child_kwargs)
2935 mock._mock_children[entry] = new
2936 new.return_value = child_klass()
2937 _check_signature(original, new, skipfirst=skipfirst)
2939 # so functions created with _set_signature become instance attributes,
2940 # *plus* their underlying mock exists in _mock_children of the parent
2941 # mock. Adding to _mock_children may be unnecessary where we are also
2942 # setting as an instance attribute?
2943 if isinstance(new, FunctionTypes):
2944 setattr(mock, entry, new)
2945 # kwargs are passed with respect to the parent mock so, they are not used
2946 # for creating return_value of the parent mock. So, this condition
2947 # should be true only for the parent mock if kwargs are given.
2948 if _is_instance_mock(mock) and kwargs:
2949 mock.configure_mock(**kwargs)
2951 return mock
2954def _must_skip(spec, entry, is_type):
2955 """
2956 Return whether we should skip the first argument on spec's `entry`
2957 attribute.
2958 """
2959 if not isinstance(spec, type):
2960 if entry in getattr(spec, '__dict__', {}):
2961 # instance attribute - shouldn't skip
2962 return False
2963 spec = spec.__class__
2965 for klass in spec.__mro__:
2966 result = klass.__dict__.get(entry, DEFAULT)
2967 if result is DEFAULT:
2968 continue
2969 if isinstance(result, (staticmethod, classmethod)):
2970 return False
2971 elif isinstance(result, FunctionTypes):
2972 # Normal method => skip if looked up on type
2973 # (if looked up on instance, self is already skipped)
2974 return is_type
2975 else:
2976 return False
2978 # function is a dynamically provided attribute
2979 return is_type
2982class _SpecState(object):
2984 def __init__(self, spec, spec_set=False, parent=None,
2985 name=None, ids=None, instance=False):
2986 self.spec = spec
2987 self.ids = ids
2988 self.spec_set = spec_set
2989 self.parent = parent
2990 self.instance = instance
2991 self.name = name
2994FunctionTypes = (
2995 # python function
2996 type(create_autospec),
2997 # instance method
2998 type(ANY.__eq__),
2999)
3002file_spec = None
3003open_spec = None
3006def _to_stream(read_data):
3007 if isinstance(read_data, bytes):
3008 return io.BytesIO(read_data)
3009 else:
3010 return io.StringIO(read_data)
3013def mock_open(mock=None, read_data=''):
3014 """
3015 A helper function to create a mock to replace the use of `open`. It works
3016 for `open` called directly or used as a context manager.
3018 The `mock` argument is the mock object to configure. If `None` (the
3019 default) then a `MagicMock` will be created for you, with the API limited
3020 to methods or attributes available on standard file handles.
3022 `read_data` is a string for the `read`, `readline` and `readlines` of the
3023 file handle to return. This is an empty string by default.
3024 """
3025 _read_data = _to_stream(read_data)
3026 _state = [_read_data, None]
3028 def _readlines_side_effect(*args, **kwargs):
3029 if handle.readlines.return_value is not None:
3030 return handle.readlines.return_value
3031 return _state[0].readlines(*args, **kwargs)
3033 def _read_side_effect(*args, **kwargs):
3034 if handle.read.return_value is not None:
3035 return handle.read.return_value
3036 return _state[0].read(*args, **kwargs)
3038 def _readline_side_effect(*args, **kwargs):
3039 yield from _iter_side_effect()
3040 while True:
3041 yield _state[0].readline(*args, **kwargs)
3043 def _iter_side_effect():
3044 if handle.readline.return_value is not None:
3045 while True:
3046 yield handle.readline.return_value
3047 for line in _state[0]:
3048 yield line
3050 def _next_side_effect():
3051 if handle.readline.return_value is not None:
3052 return handle.readline.return_value
3053 return next(_state[0])
3055 def _exit_side_effect(exctype, excinst, exctb):
3056 handle.close()
3058 global file_spec
3059 if file_spec is None:
3060 import _io
3061 file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO))))
3063 global open_spec
3064 if open_spec is None:
3065 import _io
3066 open_spec = list(set(dir(_io.open)))
3067 if mock is None:
3068 mock = MagicMock(name='open', spec=open_spec)
3070 handle = MagicMock(spec=file_spec)
3071 handle.__enter__.return_value = handle
3073 handle.write.return_value = None
3074 handle.read.return_value = None
3075 handle.readline.return_value = None
3076 handle.readlines.return_value = None
3078 handle.read.side_effect = _read_side_effect
3079 _state[1] = _readline_side_effect()
3080 handle.readline.side_effect = _state[1]
3081 handle.readlines.side_effect = _readlines_side_effect
3082 handle.__iter__.side_effect = _iter_side_effect
3083 handle.__next__.side_effect = _next_side_effect
3084 handle.__exit__.side_effect = _exit_side_effect
3086 def reset_data(*args, **kwargs):
3087 _state[0] = _to_stream(read_data)
3088 if handle.readline.side_effect == _state[1]:
3089 # Only reset the side effect if the user hasn't overridden it.
3090 _state[1] = _readline_side_effect()
3091 handle.readline.side_effect = _state[1]
3092 return DEFAULT
3094 mock.side_effect = reset_data
3095 mock.return_value = handle
3096 return mock
3099class PropertyMock(Mock):
3100 """
3101 A mock intended to be used as a property, or other descriptor, on a class.
3102 `PropertyMock` provides `__get__` and `__set__` methods so you can specify
3103 a return value when it is fetched.
3105 Fetching a `PropertyMock` instance from an object calls the mock, with
3106 no args. Setting it calls the mock with the value being set.
3107 """
3108 def _get_child_mock(self, **kwargs):
3109 return MagicMock(**kwargs)
3111 def __get__(self, obj, obj_type=None):
3112 return self()
3113 def __set__(self, obj, val):
3114 self(val)
3117_timeout_unset = sentinel.TIMEOUT_UNSET
3119class ThreadingMixin(Base):
3121 DEFAULT_TIMEOUT = None
3123 def _get_child_mock(self, **kw):
3124 if isinstance(kw.get("parent"), ThreadingMixin):
3125 kw["timeout"] = kw["parent"]._mock_wait_timeout
3126 elif isinstance(kw.get("_new_parent"), ThreadingMixin):
3127 kw["timeout"] = kw["_new_parent"]._mock_wait_timeout
3128 return super()._get_child_mock(**kw)
3130 def __init__(self, *args, timeout=_timeout_unset, **kwargs):
3131 super().__init__(*args, **kwargs)
3132 if timeout is _timeout_unset:
3133 timeout = self.DEFAULT_TIMEOUT
3134 self.__dict__["_mock_event"] = threading.Event() # Event for any call
3135 self.__dict__["_mock_calls_events"] = [] # Events for each of the calls
3136 self.__dict__["_mock_calls_events_lock"] = threading.Lock()
3137 self.__dict__["_mock_wait_timeout"] = timeout
3139 def reset_mock(self, *args, **kwargs):
3140 """
3141 See :func:`.Mock.reset_mock()`
3142 """
3143 super().reset_mock(*args, **kwargs)
3144 self.__dict__["_mock_event"] = threading.Event()
3145 self.__dict__["_mock_calls_events"] = []
3147 def __get_event(self, expected_args, expected_kwargs):
3148 with self._mock_calls_events_lock:
3149 for args, kwargs, event in self._mock_calls_events:
3150 if (args, kwargs) == (expected_args, expected_kwargs):
3151 return event
3152 new_event = threading.Event()
3153 self._mock_calls_events.append((expected_args, expected_kwargs, new_event))
3154 return new_event
3156 def _mock_call(self, *args, **kwargs):
3157 ret_value = super()._mock_call(*args, **kwargs)
3159 call_event = self.__get_event(args, kwargs)
3160 call_event.set()
3162 self._mock_event.set()
3164 return ret_value
3166 def wait_until_called(self, *, timeout=_timeout_unset):
3167 """Wait until the mock object is called.
3169 `timeout` - time to wait for in seconds, waits forever otherwise.
3170 Defaults to the constructor provided timeout.
3171 Use None to block undefinetively.
3172 """
3173 if timeout is _timeout_unset:
3174 timeout = self._mock_wait_timeout
3175 if not self._mock_event.wait(timeout=timeout):
3176 msg = (f"{self._mock_name or 'mock'} was not called before"
3177 f" timeout({timeout}).")
3178 raise AssertionError(msg)
3180 def wait_until_any_call_with(self, *args, **kwargs):
3181 """Wait until the mock object is called with given args.
3183 Waits for the timeout in seconds provided in the constructor.
3184 """
3185 event = self.__get_event(args, kwargs)
3186 if not event.wait(timeout=self._mock_wait_timeout):
3187 expected_string = self._format_mock_call_signature(args, kwargs)
3188 raise AssertionError(f'{expected_string} call not found')
3191class ThreadingMock(ThreadingMixin, MagicMixin, Mock):
3192 """
3193 A mock that can be used to wait until on calls happening
3194 in a different thread.
3196 The constructor can take a `timeout` argument which
3197 controls the timeout in seconds for all `wait` calls of the mock.
3199 You can change the default timeout of all instances via the
3200 `ThreadingMock.DEFAULT_TIMEOUT` attribute.
3202 If no timeout is set, it will block undefinetively.
3203 """
3204 pass
3207def seal(mock):
3208 """Disable the automatic generation of child mocks.
3210 Given an input Mock, seals it to ensure no further mocks will be generated
3211 when accessing an attribute that was not already defined.
3213 The operation recursively seals the mock passed in, meaning that
3214 the mock itself, any mocks generated by accessing one of its attributes,
3215 and all assigned mocks without a name or spec will be sealed.
3216 """
3217 mock._mock_sealed = True
3218 for attr in dir(mock):
3219 try:
3220 m = getattr(mock, attr)
3221 except AttributeError:
3222 continue
3223 if not isinstance(m, NonCallableMock):
3224 continue
3225 if isinstance(m._mock_children.get(attr), _SpecState):
3226 continue
3227 if m._mock_new_parent is mock:
3228 seal(m)
3231class _AsyncIterator:
3232 """
3233 Wraps an iterator in an asynchronous iterator.
3234 """
3235 def __init__(self, iterator):
3236 self.iterator = iterator
3237 code_mock = NonCallableMock(spec_set=CodeType)
3238 code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE
3239 self.__dict__['__code__'] = code_mock
3241 async def __anext__(self):
3242 try:
3243 return next(self.iterator)
3244 except StopIteration:
3245 pass
3246 raise StopAsyncIteration