Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/mock/mock.py: 25%

1697 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:17 +0000

1# mock.py 

2# Test tools for mocking and patching. 

3# Maintained by Michael Foord 

4# Backport for other versions of Python available from 

5# https://pypi.org/project/mock 

6 

7__all__ = ( 

8 'Mock', 

9 'MagicMock', 

10 'patch', 

11 'sentinel', 

12 'DEFAULT', 

13 'ANY', 

14 'call', 

15 'create_autospec', 

16 'AsyncMock', 

17 'ThreadingMock', 

18 'FILTER_DIR', 

19 'NonCallableMock', 

20 'NonCallableMagicMock', 

21 'mock_open', 

22 'PropertyMock', 

23 'seal', 

24) 

25 

26 

27import asyncio 

28import contextlib 

29import io 

30import inspect 

31import pprint 

32import sys 

33import threading 

34import builtins 

35from types import CodeType, ModuleType, MethodType 

36from unittest.util import safe_repr 

37from functools import wraps, partial 

38from threading import RLock 

39 

40 

41from mock import IS_PYPY 

42from .backports import iscoroutinefunction 

43 

44 

45class InvalidSpecError(Exception): 

46 """Indicates that an invalid value was used as a mock spec.""" 

47 

48 

49_builtins = {name for name in dir(builtins) if not name.startswith('_')} 

50 

51FILTER_DIR = True 

52 

53# Workaround for issue #12370 

54# Without this, the __class__ properties wouldn't be set correctly 

55_safe_super = super 

56 

57def _is_async_obj(obj): 

58 if _is_instance_mock(obj) and not isinstance(obj, AsyncMock): 

59 return False 

60 if hasattr(obj, '__func__'): 

61 obj = getattr(obj, '__func__') 

62 return iscoroutinefunction(obj) or inspect.isawaitable(obj) 

63 

64 

65def _is_async_func(func): 

66 if getattr(func, '__code__', None): 

67 return iscoroutinefunction(func) 

68 else: 

69 return False 

70 

71 

72def _is_instance_mock(obj): 

73 # can't use isinstance on Mock objects because they override __class__ 

74 # The base class for all mocks is NonCallableMock 

75 return issubclass(type(obj), NonCallableMock) 

76 

77 

78def _is_exception(obj): 

79 return ( 

80 isinstance(obj, BaseException) or 

81 isinstance(obj, type) and issubclass(obj, BaseException) 

82 ) 

83 

84 

85def _extract_mock(obj): 

86 # Autospecced functions will return a FunctionType with "mock" attribute 

87 # which is the actual mock object that needs to be used. 

88 if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'): 

89 return obj.mock 

90 else: 

91 return obj 

92 

93 

94def _get_signature_object(func, as_instance, eat_self): 

95 """ 

96 Given an arbitrary, possibly callable object, try to create a suitable 

97 signature object. 

98 Return a (reduced func, signature) tuple, or None. 

99 """ 

100 if isinstance(func, type) and not as_instance: 

101 # If it's a type and should be modelled as a type, use __init__. 

102 func = func.__init__ 

103 # Skip the `self` argument in __init__ 

104 eat_self = True 

105 elif isinstance(func, (classmethod, staticmethod)): 

106 if isinstance(func, classmethod): 

107 # Skip the `cls` argument of a class method 

108 eat_self = True 

109 # Use the original decorated method to extract the correct function signature 

110 func = func.__func__ 

111 elif not isinstance(func, FunctionTypes): 

112 # If we really want to model an instance of the passed type, 

113 # __call__ should be looked up, not __init__. 

114 try: 

115 func = func.__call__ 

116 except AttributeError: 

117 return None 

118 if eat_self: 

119 sig_func = partial(func, None) 

120 else: 

121 sig_func = func 

122 try: 

123 return func, inspect.signature(sig_func) 

124 except ValueError: 

125 # Certain callable types are not supported by inspect.signature() 

126 return None 

127 

128 

129def _check_signature(func, mock, skipfirst, instance=False): 

130 sig = _get_signature_object(func, instance, skipfirst) 

131 if sig is None: 

132 return 

133 func, sig = sig 

134 def checksig(_mock_self, *args, **kwargs): 

135 sig.bind(*args, **kwargs) 

136 _copy_func_details(func, checksig) 

137 type(mock)._mock_check_sig = checksig 

138 type(mock).__signature__ = sig 

139 

140 

141def _copy_func_details(func, funcopy): 

142 # we explicitly don't copy func.__dict__ into this copy as it would 

143 # expose original attributes that should be mocked 

144 for attribute in ( 

145 '__name__', '__doc__', '__text_signature__', 

146 '__module__', '__defaults__', '__kwdefaults__', 

147 ): 

148 try: 

149 setattr(funcopy, attribute, getattr(func, attribute)) 

150 except AttributeError: 

151 pass 

152 

153 

154def _callable(obj): 

155 if isinstance(obj, type): 

156 return True 

157 if isinstance(obj, (staticmethod, classmethod, MethodType)): 

158 return _callable(obj.__func__) 

159 if getattr(obj, '__call__', None) is not None: 

160 return True 

161 return False 

162 

163 

164def _is_list(obj): 

165 # checks for list or tuples 

166 # XXXX badly named! 

167 return type(obj) in (list, tuple) 

168 

169 

170def _instance_callable(obj): 

171 """Given an object, return True if the object is callable. 

172 For classes, return True if instances would be callable.""" 

173 if not isinstance(obj, type): 

174 # already an instance 

175 return getattr(obj, '__call__', None) is not None 

176 

177 # *could* be broken by a class overriding __mro__ or __dict__ via 

178 # a metaclass 

179 for base in (obj,) + obj.__mro__: 

180 if base.__dict__.get('__call__') is not None: 

181 return True 

182 return False 

183 

184 

185def _set_signature(mock, original, instance=False): 

186 # creates a function with signature (*args, **kwargs) that delegates to a 

187 # mock. It still does signature checking by calling a lambda with the same 

188 # signature as the original. 

189 

190 skipfirst = isinstance(original, type) 

191 result = _get_signature_object(original, instance, skipfirst) 

192 if result is None: 

193 return mock 

194 func, sig = result 

195 def checksig(*args, **kwargs): 

196 sig.bind(*args, **kwargs) 

197 _copy_func_details(func, checksig) 

198 

199 name = original.__name__ 

200 if not name.isidentifier(): 

201 name = 'funcopy' 

202 context = {'_checksig_': checksig, 'mock': mock} 

203 src = """def %s(*args, **kwargs): 

204 _checksig_(*args, **kwargs) 

205 return mock(*args, **kwargs)""" % name 

206 exec (src, context) 

207 funcopy = context[name] 

208 _setup_func(funcopy, mock, sig) 

209 return funcopy 

210 

211def _set_async_signature(mock, original, instance=False, is_async_mock=False): 

212 # creates an async function with signature (*args, **kwargs) that delegates to a 

213 # mock. It still does signature checking by calling a lambda with the same 

214 # signature as the original. 

215 

216 skipfirst = isinstance(original, type) 

217 func, sig = _get_signature_object(original, instance, skipfirst) 

218 def checksig(*args, **kwargs): 

219 sig.bind(*args, **kwargs) 

220 _copy_func_details(func, checksig) 

221 

222 name = original.__name__ 

223 context = {'_checksig_': checksig, 'mock': mock} 

224 src = """async def %s(*args, **kwargs): 

225 _checksig_(*args, **kwargs) 

226 return await mock(*args, **kwargs)""" % name 

227 exec (src, context) 

228 funcopy = context[name] 

229 _setup_func(funcopy, mock, sig) 

230 _setup_async_mock(funcopy) 

231 return funcopy 

232 

233 

234def _setup_func(funcopy, mock, sig): 

235 funcopy.mock = mock 

236 

237 def assert_called_with(*args, **kwargs): 

238 return mock.assert_called_with(*args, **kwargs) 

239 def assert_called(*args, **kwargs): 

240 return mock.assert_called(*args, **kwargs) 

241 def assert_not_called(*args, **kwargs): 

242 return mock.assert_not_called(*args, **kwargs) 

243 def assert_called_once(*args, **kwargs): 

244 return mock.assert_called_once(*args, **kwargs) 

245 def assert_called_once_with(*args, **kwargs): 

246 return mock.assert_called_once_with(*args, **kwargs) 

247 def assert_has_calls(*args, **kwargs): 

248 return mock.assert_has_calls(*args, **kwargs) 

249 def assert_any_call(*args, **kwargs): 

250 return mock.assert_any_call(*args, **kwargs) 

251 def reset_mock(): 

252 funcopy.method_calls = _CallList() 

253 funcopy.mock_calls = _CallList() 

254 mock.reset_mock() 

255 ret = funcopy.return_value 

256 if _is_instance_mock(ret) and not ret is mock: 

257 ret.reset_mock() 

258 

259 funcopy.called = False 

260 funcopy.call_count = 0 

261 funcopy.call_args = None 

262 funcopy.call_args_list = _CallList() 

263 funcopy.method_calls = _CallList() 

264 funcopy.mock_calls = _CallList() 

265 

266 funcopy.return_value = mock.return_value 

267 funcopy.side_effect = mock.side_effect 

268 funcopy._mock_children = mock._mock_children 

269 

270 funcopy.assert_called_with = assert_called_with 

271 funcopy.assert_called_once_with = assert_called_once_with 

272 funcopy.assert_has_calls = assert_has_calls 

273 funcopy.assert_any_call = assert_any_call 

274 funcopy.reset_mock = reset_mock 

275 funcopy.assert_called = assert_called 

276 funcopy.assert_not_called = assert_not_called 

277 funcopy.assert_called_once = assert_called_once 

278 funcopy.__signature__ = sig 

279 

280 mock._mock_delegate = funcopy 

281 

282 

283def _setup_async_mock(mock): 

284 mock._is_coroutine = asyncio.coroutines._is_coroutine 

285 mock.await_count = 0 

286 mock.await_args = None 

287 mock.await_args_list = _CallList() 

288 

289 # Mock is not configured yet so the attributes are set 

290 # to a function and then the corresponding mock helper function 

291 # is called when the helper is accessed similar to _setup_func. 

292 def wrapper(attr, *args, **kwargs): 

293 return getattr(mock.mock, attr)(*args, **kwargs) 

294 

295 for attribute in ('assert_awaited', 

296 'assert_awaited_once', 

297 'assert_awaited_with', 

298 'assert_awaited_once_with', 

299 'assert_any_await', 

300 'assert_has_awaits', 

301 'assert_not_awaited'): 

302 

303 # setattr(mock, attribute, wrapper) causes late binding 

304 # hence attribute will always be the last value in the loop 

305 # Use partial(wrapper, attribute) to ensure the attribute is bound 

306 # correctly. 

307 setattr(mock, attribute, partial(wrapper, attribute)) 

308 

309 

310def _is_magic(name): 

311 return '__%s__' % name[2:-2] == name 

312 

313 

314class _SentinelObject(object): 

315 "A unique, named, sentinel object." 

316 def __init__(self, name): 

317 self.name = name 

318 

319 def __repr__(self): 

320 return 'sentinel.%s' % self.name 

321 

322 def __reduce__(self): 

323 return 'sentinel.%s' % self.name 

324 

325 

326class _Sentinel(object): 

327 """Access attributes to return a named object, usable as a sentinel.""" 

328 def __init__(self): 

329 self._sentinels = {} 

330 

331 def __getattr__(self, name): 

332 if name == '__bases__': 

333 # Without this help(unittest.mock) raises an exception 

334 raise AttributeError 

335 return self._sentinels.setdefault(name, _SentinelObject(name)) 

336 

337 def __reduce__(self): 

338 return 'sentinel' 

339 

340 

341sentinel = _Sentinel() 

342 

343DEFAULT = sentinel.DEFAULT 

344_missing = sentinel.MISSING 

345_deleted = sentinel.DELETED 

346 

347 

348_allowed_names = { 

349 'return_value', '_mock_return_value', 'side_effect', 

350 '_mock_side_effect', '_mock_parent', '_mock_new_parent', 

351 '_mock_name', '_mock_new_name' 

352} 

353 

354 

355def _delegating_property(name): 

356 _allowed_names.add(name) 

357 _the_name = '_mock_' + name 

358 def _get(self, name=name, _the_name=_the_name): 

359 sig = self._mock_delegate 

360 if sig is None: 

361 return getattr(self, _the_name) 

362 return getattr(sig, name) 

363 def _set(self, value, name=name, _the_name=_the_name): 

364 sig = self._mock_delegate 

365 if sig is None: 

366 self.__dict__[_the_name] = value 

367 else: 

368 setattr(sig, name, value) 

369 

370 return property(_get, _set) 

371 

372 

373 

374class _CallList(list): 

375 

376 def __contains__(self, value): 

377 if not isinstance(value, list): 

378 return list.__contains__(self, value) 

379 len_value = len(value) 

380 len_self = len(self) 

381 if len_value > len_self: 

382 return False 

383 

384 for i in range(0, len_self - len_value + 1): 

385 sub_list = self[i:i+len_value] 

386 if sub_list == value: 

387 return True 

388 return False 

389 

390 def __repr__(self): 

391 return pprint.pformat(list(self)) 

392 

393 

394def _check_and_set_parent(parent, value, name, new_name): 

395 value = _extract_mock(value) 

396 

397 if not _is_instance_mock(value): 

398 return False 

399 if ((value._mock_name or value._mock_new_name) or 

400 (value._mock_parent is not None) or 

401 (value._mock_new_parent is not None)): 

402 return False 

403 

404 _parent = parent 

405 while _parent is not None: 

406 # setting a mock (value) as a child or return value of itself 

407 # should not modify the mock 

408 if _parent is value: 

409 return False 

410 _parent = _parent._mock_new_parent 

411 

412 if new_name: 

413 value._mock_new_parent = parent 

414 value._mock_new_name = new_name 

415 if name: 

416 value._mock_parent = parent 

417 value._mock_name = name 

418 return True 

419 

420# Internal class to identify if we wrapped an iterator object or not. 

421class _MockIter(object): 

422 def __init__(self, obj): 

423 self.obj = iter(obj) 

424 def __next__(self): 

425 return next(self.obj) 

426 

427class Base(object): 

428 _mock_return_value = DEFAULT 

429 _mock_side_effect = None 

430 def __init__(self, *args, **kwargs): 

431 pass 

432 

433 

434 

435class NonCallableMock(Base): 

436 """A non-callable version of `Mock`""" 

437 

438 # Store a mutex as a class attribute in order to protect concurrent access 

439 # to mock attributes. Using a class attribute allows all NonCallableMock 

440 # instances to share the mutex for simplicity. 

441 # 

442 # See https://github.com/python/cpython/issues/98624 for why this is 

443 # necessary. 

444 _lock = RLock() 

445 

446 def __new__( 

447 cls, spec=None, wraps=None, name=None, spec_set=None, 

448 parent=None, _spec_state=None, _new_name='', _new_parent=None, 

449 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs 

450 ): 

451 # every instance has its own class 

452 # so we can create magic methods on the 

453 # class without stomping on other mocks 

454 bases = (cls,) 

455 if not issubclass(cls, AsyncMockMixin): 

456 # Check if spec is an async object or function 

457 spec_arg = spec_set or spec 

458 if spec_arg is not None and _is_async_obj(spec_arg): 

459 bases = (AsyncMockMixin, cls) 

460 new = type(cls.__name__, bases, {'__doc__': cls.__doc__}) 

461 instance = _safe_super(NonCallableMock, cls).__new__(new) 

462 return instance 

463 

464 

465 def __init__( 

466 self, spec=None, wraps=None, name=None, spec_set=None, 

467 parent=None, _spec_state=None, _new_name='', _new_parent=None, 

468 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs 

469 ): 

470 if _new_parent is None: 

471 _new_parent = parent 

472 

473 __dict__ = self.__dict__ 

474 __dict__['_mock_parent'] = parent 

475 __dict__['_mock_name'] = name 

476 __dict__['_mock_new_name'] = _new_name 

477 __dict__['_mock_new_parent'] = _new_parent 

478 __dict__['_mock_sealed'] = False 

479 

480 if spec_set is not None: 

481 spec = spec_set 

482 spec_set = True 

483 if _eat_self is None: 

484 _eat_self = parent is not None 

485 

486 self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self) 

487 

488 __dict__['_mock_children'] = {} 

489 __dict__['_mock_wraps'] = wraps 

490 __dict__['_mock_delegate'] = None 

491 

492 __dict__['_mock_called'] = False 

493 __dict__['_mock_call_args'] = None 

494 __dict__['_mock_call_count'] = 0 

495 __dict__['_mock_call_args_list'] = _CallList() 

496 __dict__['_mock_mock_calls'] = _CallList() 

497 

498 __dict__['method_calls'] = _CallList() 

499 __dict__['_mock_unsafe'] = unsafe 

500 

501 if kwargs: 

502 self.configure_mock(**kwargs) 

503 

504 _safe_super(NonCallableMock, self).__init__( 

505 spec, wraps, name, spec_set, parent, 

506 _spec_state 

507 ) 

508 

509 

510 def attach_mock(self, mock, attribute): 

511 """ 

512 Attach a mock as an attribute of this one, replacing its name and 

513 parent. Calls to the attached mock will be recorded in the 

514 `method_calls` and `mock_calls` attributes of this one.""" 

515 inner_mock = _extract_mock(mock) 

516 

517 inner_mock._mock_parent = None 

518 inner_mock._mock_new_parent = None 

519 inner_mock._mock_name = '' 

520 inner_mock._mock_new_name = None 

521 

522 setattr(self, attribute, mock) 

523 

524 

525 def mock_add_spec(self, spec, spec_set=False): 

526 """Add a spec to a mock. `spec` can either be an object or a 

527 list of strings. Only attributes on the `spec` can be fetched as 

528 attributes from the mock. 

529 

530 If `spec_set` is True then only attributes on the spec can be set.""" 

531 self._mock_add_spec(spec, spec_set) 

532 

533 

534 def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False, 

535 _eat_self=False): 

536 if _is_instance_mock(spec): 

537 raise InvalidSpecError(f'Cannot spec a Mock object. [object={spec!r}]') 

538 

539 _spec_class = None 

540 _spec_signature = None 

541 _spec_asyncs = [] 

542 

543 if spec is not None and not _is_list(spec): 

544 if isinstance(spec, type): 

545 _spec_class = spec 

546 else: 

547 _spec_class = type(spec) 

548 res = _get_signature_object(spec, 

549 _spec_as_instance, _eat_self) 

550 _spec_signature = res and res[1] 

551 

552 spec_list = dir(spec) 

553 

554 for attr in spec_list: 

555 static_attr = inspect.getattr_static(spec, attr, None) 

556 unwrapped_attr = static_attr 

557 try: 

558 unwrapped_attr = inspect.unwrap(unwrapped_attr) 

559 except ValueError: 

560 pass 

561 if iscoroutinefunction(unwrapped_attr): 

562 _spec_asyncs.append(attr) 

563 

564 spec = spec_list 

565 

566 __dict__ = self.__dict__ 

567 __dict__['_spec_class'] = _spec_class 

568 __dict__['_spec_set'] = spec_set 

569 __dict__['_spec_signature'] = _spec_signature 

570 __dict__['_mock_methods'] = spec 

571 __dict__['_spec_asyncs'] = _spec_asyncs 

572 

573 def __get_return_value(self): 

574 ret = self._mock_return_value 

575 if self._mock_delegate is not None: 

576 ret = self._mock_delegate.return_value 

577 

578 if ret is DEFAULT: 

579 ret = self._get_child_mock( 

580 _new_parent=self, _new_name='()' 

581 ) 

582 self.return_value = ret 

583 return ret 

584 

585 

586 def __set_return_value(self, value): 

587 if self._mock_delegate is not None: 

588 self._mock_delegate.return_value = value 

589 else: 

590 self._mock_return_value = value 

591 _check_and_set_parent(self, value, None, '()') 

592 

593 __return_value_doc = "The value to be returned when the mock is called." 

594 return_value = property(__get_return_value, __set_return_value, 

595 __return_value_doc) 

596 

597 

598 @property 

599 def __class__(self): 

600 if self._spec_class is None: 

601 return type(self) 

602 return self._spec_class 

603 

604 called = _delegating_property('called') 

605 call_count = _delegating_property('call_count') 

606 call_args = _delegating_property('call_args') 

607 call_args_list = _delegating_property('call_args_list') 

608 mock_calls = _delegating_property('mock_calls') 

609 

610 

611 def __get_side_effect(self): 

612 delegated = self._mock_delegate 

613 if delegated is None: 

614 return self._mock_side_effect 

615 sf = delegated.side_effect 

616 if (sf is not None and not callable(sf) 

617 and not isinstance(sf, _MockIter) and not _is_exception(sf)): 

618 sf = _MockIter(sf) 

619 delegated.side_effect = sf 

620 return sf 

621 

622 def __set_side_effect(self, value): 

623 value = _try_iter(value) 

624 delegated = self._mock_delegate 

625 if delegated is None: 

626 self._mock_side_effect = value 

627 else: 

628 delegated.side_effect = value 

629 

630 side_effect = property(__get_side_effect, __set_side_effect) 

631 

632 

633 def reset_mock(self, visited=None,*, return_value=False, side_effect=False): 

634 "Restore the mock object to its initial state." 

635 if visited is None: 

636 visited = [] 

637 if id(self) in visited: 

638 return 

639 visited.append(id(self)) 

640 

641 self.called = False 

642 self.call_args = None 

643 self.call_count = 0 

644 self.mock_calls = _CallList() 

645 self.call_args_list = _CallList() 

646 self.method_calls = _CallList() 

647 

648 if return_value: 

649 self._mock_return_value = DEFAULT 

650 if side_effect: 

651 self._mock_side_effect = None 

652 

653 for child in self._mock_children.values(): 

654 if isinstance(child, _SpecState) or child is _deleted: 

655 continue 

656 child.reset_mock(visited, return_value=return_value, side_effect=side_effect) 

657 

658 ret = self._mock_return_value 

659 if _is_instance_mock(ret) and ret is not self: 

660 ret.reset_mock(visited) 

661 

662 

663 def configure_mock(self, **kwargs): 

664 """Set attributes on the mock through keyword arguments. 

665 

666 Attributes plus return values and side effects can be set on child 

667 mocks using standard dot notation and unpacking a dictionary in the 

668 method call: 

669 

670 >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError} 

671 >>> mock.configure_mock(**attrs)""" 

672 for arg, val in sorted(kwargs.items(), 

673 # we sort on the number of dots so that 

674 # attributes are set before we set attributes on 

675 # attributes 

676 key=lambda entry: entry[0].count('.')): 

677 args = arg.split('.') 

678 final = args.pop() 

679 obj = self 

680 for entry in args: 

681 obj = getattr(obj, entry) 

682 setattr(obj, final, val) 

683 

684 

685 def __getattr__(self, name): 

686 if name in {'_mock_methods', '_mock_unsafe'}: 

687 raise AttributeError(name) 

688 elif self._mock_methods is not None: 

689 if name not in self._mock_methods or name in _all_magics: 

690 raise AttributeError("Mock object has no attribute %r" % name) 

691 elif _is_magic(name): 

692 raise AttributeError(name) 

693 if not self._mock_unsafe and (not self._mock_methods or name not in self._mock_methods): 

694 if name.startswith(('assert', 'assret', 'asert', 'aseert', 'assrt')) or name in _ATTRIB_DENY_LIST: 

695 raise AttributeError( 

696 f"{name!r} is not a valid assertion. Use a spec " 

697 f"for the mock if {name!r} is meant to be an attribute.") 

698 

699 with NonCallableMock._lock: 

700 result = self._mock_children.get(name) 

701 if result is _deleted: 

702 raise AttributeError(name) 

703 elif result is None: 

704 wraps = None 

705 if self._mock_wraps is not None: 

706 # XXXX should we get the attribute without triggering code 

707 # execution? 

708 wraps = getattr(self._mock_wraps, name) 

709 

710 result = self._get_child_mock( 

711 parent=self, name=name, wraps=wraps, _new_name=name, 

712 _new_parent=self 

713 ) 

714 self._mock_children[name] = result 

715 

716 elif isinstance(result, _SpecState): 

717 try: 

718 result = create_autospec( 

719 result.spec, result.spec_set, result.instance, 

720 result.parent, result.name 

721 ) 

722 except InvalidSpecError: 

723 target_name = self.__dict__['_mock_name'] or self 

724 raise InvalidSpecError( 

725 f'Cannot autospec attr {name!r} from target ' 

726 f'{target_name!r} as it has already been mocked out. ' 

727 f'[target={self!r}, attr={result.spec!r}]') 

728 self._mock_children[name] = result 

729 

730 return result 

731 

732 

733 def _extract_mock_name(self): 

734 _name_list = [self._mock_new_name] 

735 _parent = self._mock_new_parent 

736 last = self 

737 

738 dot = '.' 

739 if _name_list == ['()']: 

740 dot = '' 

741 

742 while _parent is not None: 

743 last = _parent 

744 

745 _name_list.append(_parent._mock_new_name + dot) 

746 dot = '.' 

747 if _parent._mock_new_name == '()': 

748 dot = '' 

749 

750 _parent = _parent._mock_new_parent 

751 

752 _name_list = list(reversed(_name_list)) 

753 _first = last._mock_name or 'mock' 

754 if len(_name_list) > 1: 

755 if _name_list[1] not in ('()', '().'): 

756 _first += '.' 

757 _name_list[0] = _first 

758 return ''.join(_name_list) 

759 

760 def __repr__(self): 

761 name = self._extract_mock_name() 

762 

763 name_string = '' 

764 if name not in ('mock', 'mock.'): 

765 name_string = ' name=%r' % name 

766 

767 spec_string = '' 

768 if self._spec_class is not None: 

769 spec_string = ' spec=%r' 

770 if self._spec_set: 

771 spec_string = ' spec_set=%r' 

772 spec_string = spec_string % self._spec_class.__name__ 

773 return "<%s%s%s id='%s'>" % ( 

774 type(self).__name__, 

775 name_string, 

776 spec_string, 

777 id(self) 

778 ) 

779 

780 

781 def __dir__(self): 

782 """Filter the output of `dir(mock)` to only useful members.""" 

783 if not FILTER_DIR: 

784 return object.__dir__(self) 

785 

786 extras = self._mock_methods or [] 

787 from_type = dir(type(self)) 

788 from_dict = list(self.__dict__) 

789 from_child_mocks = [ 

790 m_name for m_name, m_value in self._mock_children.items() 

791 if m_value is not _deleted] 

792 

793 from_type = [e for e in from_type if not e.startswith('_')] 

794 from_dict = [e for e in from_dict if not e.startswith('_') or 

795 _is_magic(e)] 

796 return sorted(set(extras + from_type + from_dict + from_child_mocks)) 

797 

798 

799 def __setattr__(self, name, value): 

800 if name in _allowed_names: 

801 # property setters go through here 

802 return object.__setattr__(self, name, value) 

803 elif (self._spec_set and self._mock_methods is not None and 

804 name not in self._mock_methods and 

805 name not in self.__dict__): 

806 raise AttributeError("Mock object has no attribute '%s'" % name) 

807 elif name in _unsupported_magics: 

808 msg = 'Attempting to set unsupported magic method %r.' % name 

809 raise AttributeError(msg) 

810 elif name in _all_magics: 

811 if self._mock_methods is not None and name not in self._mock_methods: 

812 raise AttributeError("Mock object has no attribute '%s'" % name) 

813 

814 if not _is_instance_mock(value): 

815 setattr(type(self), name, _get_method(name, value)) 

816 original = value 

817 value = lambda *args, **kw: original(self, *args, **kw) 

818 else: 

819 # only set _new_name and not name so that mock_calls is tracked 

820 # but not method calls 

821 _check_and_set_parent(self, value, None, name) 

822 setattr(type(self), name, value) 

823 self._mock_children[name] = value 

824 elif name == '__class__': 

825 self._spec_class = value 

826 return 

827 else: 

828 if _check_and_set_parent(self, value, name, name): 

829 self._mock_children[name] = value 

830 

831 if self._mock_sealed and not hasattr(self, name): 

832 mock_name = f'{self._extract_mock_name()}.{name}' 

833 raise AttributeError(f'Cannot set {mock_name}') 

834 

835 return object.__setattr__(self, name, value) 

836 

837 

838 def __delattr__(self, name): 

839 if name in _all_magics and name in type(self).__dict__: 

840 delattr(type(self), name) 

841 if name not in self.__dict__: 

842 # for magic methods that are still MagicProxy objects and 

843 # not set on the instance itself 

844 return 

845 

846 obj = self._mock_children.get(name, _missing) 

847 if name in self.__dict__: 

848 _safe_super(NonCallableMock, self).__delattr__(name) 

849 elif obj is _deleted: 

850 raise AttributeError(name) 

851 if obj is not _missing: 

852 del self._mock_children[name] 

853 self._mock_children[name] = _deleted 

854 

855 

856 def _format_mock_call_signature(self, args, kwargs): 

857 name = self._mock_name or 'mock' 

858 return _format_call_signature(name, args, kwargs) 

859 

860 

861 def _format_mock_failure_message(self, args, kwargs, action='call'): 

862 message = 'expected %s not found.\nExpected: %s\nActual: %s' 

863 expected_string = self._format_mock_call_signature(args, kwargs) 

864 call_args = self.call_args 

865 actual_string = self._format_mock_call_signature(*call_args) 

866 return message % (action, expected_string, actual_string) 

867 

868 

869 def _get_call_signature_from_name(self, name): 

870 """ 

871 * If call objects are asserted against a method/function like obj.meth1 

872 then there could be no name for the call object to lookup. Hence just 

873 return the spec_signature of the method/function being asserted against. 

874 * If the name is not empty then remove () and split by '.' to get 

875 list of names to iterate through the children until a potential 

876 match is found. A child mock is created only during attribute access 

877 so if we get a _SpecState then no attributes of the spec were accessed 

878 and can be safely exited. 

879 """ 

880 if not name: 

881 return self._spec_signature 

882 

883 sig = None 

884 names = name.replace('()', '').split('.') 

885 children = self._mock_children 

886 

887 for name in names: 

888 child = children.get(name) 

889 if child is None or isinstance(child, _SpecState): 

890 break 

891 else: 

892 # If an autospecced object is attached using attach_mock the 

893 # child would be a function with mock object as attribute from 

894 # which signature has to be derived. 

895 child = _extract_mock(child) 

896 children = child._mock_children 

897 sig = child._spec_signature 

898 

899 return sig 

900 

901 

902 def _call_matcher(self, _call): 

903 """ 

904 Given a call (or simply an (args, kwargs) tuple), return a 

905 comparison key suitable for matching with other calls. 

906 This is a best effort method which relies on the spec's signature, 

907 if available, or falls back on the arguments themselves. 

908 """ 

909 

910 if isinstance(_call, tuple) and len(_call) > 2: 

911 sig = self._get_call_signature_from_name(_call[0]) 

912 else: 

913 sig = self._spec_signature 

914 

915 if sig is not None: 

916 if len(_call) == 2: 

917 name = '' 

918 args, kwargs = _call 

919 else: 

920 name, args, kwargs = _call 

921 try: 

922 bound_call = sig.bind(*args, **kwargs) 

923 return call(name, bound_call.args, bound_call.kwargs) 

924 except TypeError as e: 

925 return e.with_traceback(None) 

926 else: 

927 return _call 

928 

929 def assert_not_called(_mock_self): 

930 """assert that the mock was never called. 

931 """ 

932 self = _mock_self 

933 if self.call_count != 0: 

934 msg = ("Expected '%s' to not have been called. Called %s times.%s" 

935 % (self._mock_name or 'mock', 

936 self.call_count, 

937 self._calls_repr())) 

938 raise AssertionError(msg) 

939 

940 def assert_called(_mock_self): 

941 """assert that the mock was called at least once 

942 """ 

943 self = _mock_self 

944 if self.call_count == 0: 

945 msg = ("Expected '%s' to have been called." % 

946 (self._mock_name or 'mock')) 

947 raise AssertionError(msg) 

948 

949 def assert_called_once(_mock_self): 

950 """assert that the mock was called only once. 

951 """ 

952 self = _mock_self 

953 if not self.call_count == 1: 

954 msg = ("Expected '%s' to have been called once. Called %s times.%s" 

955 % (self._mock_name or 'mock', 

956 self.call_count, 

957 self._calls_repr())) 

958 raise AssertionError(msg) 

959 

960 def assert_called_with(_mock_self, *args, **kwargs): 

961 """assert that the last call was made with the specified arguments. 

962 

963 Raises an AssertionError if the args and keyword args passed in are 

964 different to the last call to the mock.""" 

965 self = _mock_self 

966 if self.call_args is None: 

967 expected = self._format_mock_call_signature(args, kwargs) 

968 actual = 'not called.' 

969 error_message = ('expected call not found.\nExpected: %s\nActual: %s' 

970 % (expected, actual)) 

971 raise AssertionError(error_message) 

972 

973 def _error_message(): 

974 msg = self._format_mock_failure_message(args, kwargs) 

975 return msg 

976 expected = self._call_matcher(_Call((args, kwargs), two=True)) 

977 actual = self._call_matcher(self.call_args) 

978 if actual != expected: 

979 cause = expected if isinstance(expected, Exception) else None 

980 raise AssertionError(_error_message()) from cause 

981 

982 

983 def assert_called_once_with(_mock_self, *args, **kwargs): 

984 """assert that the mock was called exactly once and that that call was 

985 with the specified arguments.""" 

986 self = _mock_self 

987 if not self.call_count == 1: 

988 msg = ("Expected '%s' to be called once. Called %s times.%s" 

989 % (self._mock_name or 'mock', 

990 self.call_count, 

991 self._calls_repr())) 

992 raise AssertionError(msg) 

993 return self.assert_called_with(*args, **kwargs) 

994 

995 

996 def assert_has_calls(self, calls, any_order=False): 

997 """assert the mock has been called with the specified calls. 

998 The `mock_calls` list is checked for the calls. 

999 

1000 If `any_order` is False (the default) then the calls must be 

1001 sequential. There can be extra calls before or after the 

1002 specified calls. 

1003 

1004 If `any_order` is True then the calls can be in any order, but 

1005 they must all appear in `mock_calls`.""" 

1006 expected = [self._call_matcher(c) for c in calls] 

1007 cause = next((e for e in expected if isinstance(e, Exception)), None) 

1008 all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls) 

1009 if not any_order: 

1010 if expected not in all_calls: 

1011 if cause is None: 

1012 problem = 'Calls not found.' 

1013 else: 

1014 problem = ('Error processing expected calls.\n' 

1015 'Errors: {}').format( 

1016 [e if isinstance(e, Exception) else None 

1017 for e in expected]) 

1018 raise AssertionError( 

1019 f'{problem}\n' 

1020 f'Expected: {_CallList(calls)}' 

1021 f'{self._calls_repr(prefix="Actual").rstrip(".")}' 

1022 ) from cause 

1023 return 

1024 

1025 all_calls = list(all_calls) 

1026 

1027 not_found = [] 

1028 for kall in expected: 

1029 try: 

1030 all_calls.remove(kall) 

1031 except ValueError: 

1032 not_found.append(kall) 

1033 if not_found: 

1034 raise AssertionError( 

1035 '%r does not contain all of %r in its call list, ' 

1036 'found %r instead' % (self._mock_name or 'mock', 

1037 tuple(not_found), all_calls) 

1038 ) from cause 

1039 

1040 

1041 def assert_any_call(self, *args, **kwargs): 

1042 """assert the mock has been called with the specified arguments. 

1043 

1044 The assert passes if the mock has *ever* been called, unlike 

1045 `assert_called_with` and `assert_called_once_with` that only pass if 

1046 the call is the most recent one.""" 

1047 expected = self._call_matcher(_Call((args, kwargs), two=True)) 

1048 cause = expected if isinstance(expected, Exception) else None 

1049 actual = [self._call_matcher(c) for c in self.call_args_list] 

1050 if cause or expected not in _AnyComparer(actual): 

1051 expected_string = self._format_mock_call_signature(args, kwargs) 

1052 raise AssertionError( 

1053 '%s call not found' % expected_string 

1054 ) from cause 

1055 

1056 

1057 def _get_child_mock(self, **kw): 

1058 """Create the child mocks for attributes and return value. 

1059 By default child mocks will be the same type as the parent. 

1060 Subclasses of Mock may want to override this to customize the way 

1061 child mocks are made. 

1062 

1063 For non-callable mocks the callable variant will be used (rather than 

1064 any custom subclass).""" 

1065 if self._mock_sealed: 

1066 attribute = f".{kw['name']}" if "name" in kw else "()" 

1067 mock_name = self._extract_mock_name() + attribute 

1068 raise AttributeError(mock_name) 

1069 

1070 _new_name = kw.get("_new_name") 

1071 if _new_name in self.__dict__['_spec_asyncs']: 

1072 return AsyncMock(**kw) 

1073 

1074 _type = type(self) 

1075 if issubclass(_type, MagicMock) and _new_name in _async_method_magics: 

1076 # Any asynchronous magic becomes an AsyncMock 

1077 klass = AsyncMock 

1078 elif issubclass(_type, AsyncMockMixin): 

1079 if (_new_name in _all_sync_magics or 

1080 self._mock_methods and _new_name in self._mock_methods): 

1081 # Any synchronous method on AsyncMock becomes a MagicMock 

1082 klass = MagicMock 

1083 else: 

1084 klass = AsyncMock 

1085 elif not issubclass(_type, CallableMixin): 

1086 if issubclass(_type, NonCallableMagicMock): 

1087 klass = MagicMock 

1088 elif issubclass(_type, NonCallableMock): 

1089 klass = Mock 

1090 else: 

1091 klass = _type.__mro__[1] 

1092 return klass(**kw) 

1093 

1094 

1095 def _calls_repr(self, prefix="Calls"): 

1096 """Renders self.mock_calls as a string. 

1097 

1098 Example: "\nCalls: [call(1), call(2)]." 

1099 

1100 If self.mock_calls is empty, an empty string is returned. The 

1101 output will be truncated if very long. 

1102 """ 

1103 if not self.mock_calls: 

1104 return "" 

1105 return f"\n{prefix}: {safe_repr(self.mock_calls)}." 

1106 

1107 

1108try: 

1109 removeprefix = str.removeprefix 

1110except AttributeError: 

1111 # Py 3.8 and earlier: 

1112 def removeprefix(name, prefix): 

1113 return name[len(prefix):] 

1114 

1115# Denylist for forbidden attribute names in safe mode 

1116_ATTRIB_DENY_LIST = frozenset({ 

1117 removeprefix(name, "assert_") 

1118 for name in dir(NonCallableMock) 

1119 if name.startswith("assert_") 

1120}) 

1121 

1122 

1123class _AnyComparer(list): 

1124 """A list which checks if it contains a call which may have an 

1125 argument of ANY, flipping the components of item and self from 

1126 their traditional locations so that ANY is guaranteed to be on 

1127 the left.""" 

1128 def __contains__(self, item): 

1129 for _call in self: 

1130 assert len(item) == len(_call) 

1131 if all([ 

1132 expected == actual 

1133 for expected, actual in zip(item, _call) 

1134 ]): 

1135 return True 

1136 return False 

1137 

1138 

1139def _try_iter(obj): 

1140 if obj is None: 

1141 return obj 

1142 if _is_exception(obj): 

1143 return obj 

1144 if _callable(obj): 

1145 return obj 

1146 try: 

1147 return iter(obj) 

1148 except TypeError: 

1149 # XXXX backwards compatibility 

1150 # but this will blow up on first call - so maybe we should fail early? 

1151 return obj 

1152 

1153 

1154class CallableMixin(Base): 

1155 

1156 def __init__(self, spec=None, side_effect=None, return_value=DEFAULT, 

1157 wraps=None, name=None, spec_set=None, parent=None, 

1158 _spec_state=None, _new_name='', _new_parent=None, **kwargs): 

1159 self.__dict__['_mock_return_value'] = return_value 

1160 _safe_super(CallableMixin, self).__init__( 

1161 spec, wraps, name, spec_set, parent, 

1162 _spec_state, _new_name, _new_parent, **kwargs 

1163 ) 

1164 

1165 self.side_effect = side_effect 

1166 

1167 

1168 def _mock_check_sig(self, *args, **kwargs): 

1169 # stub method that can be replaced with one with a specific signature 

1170 pass 

1171 

1172 

1173 def __call__(_mock_self, *args, **kwargs): 

1174 # can't use self in-case a function / method we are mocking uses self 

1175 # in the signature 

1176 _mock_self._mock_check_sig(*args, **kwargs) 

1177 _mock_self._increment_mock_call(*args, **kwargs) 

1178 return _mock_self._mock_call(*args, **kwargs) 

1179 

1180 

1181 def _mock_call(_mock_self, *args, **kwargs): 

1182 return _mock_self._execute_mock_call(*args, **kwargs) 

1183 

1184 def _increment_mock_call(_mock_self, *args, **kwargs): 

1185 self = _mock_self 

1186 self.called = True 

1187 self.call_count += 1 

1188 

1189 # handle call_args 

1190 # needs to be set here so assertions on call arguments pass before 

1191 # execution in the case of awaited calls 

1192 _call = _Call((args, kwargs), two=True) 

1193 self.call_args = _call 

1194 self.call_args_list.append(_call) 

1195 

1196 # initial stuff for method_calls: 

1197 do_method_calls = self._mock_parent is not None 

1198 method_call_name = self._mock_name 

1199 

1200 # initial stuff for mock_calls: 

1201 mock_call_name = self._mock_new_name 

1202 is_a_call = mock_call_name == '()' 

1203 self.mock_calls.append(_Call(('', args, kwargs))) 

1204 

1205 # follow up the chain of mocks: 

1206 _new_parent = self._mock_new_parent 

1207 while _new_parent is not None: 

1208 

1209 # handle method_calls: 

1210 if do_method_calls: 

1211 _new_parent.method_calls.append(_Call((method_call_name, args, kwargs))) 

1212 do_method_calls = _new_parent._mock_parent is not None 

1213 if do_method_calls: 

1214 method_call_name = _new_parent._mock_name + '.' + method_call_name 

1215 

1216 # handle mock_calls: 

1217 this_mock_call = _Call((mock_call_name, args, kwargs)) 

1218 _new_parent.mock_calls.append(this_mock_call) 

1219 

1220 if _new_parent._mock_new_name: 

1221 if is_a_call: 

1222 dot = '' 

1223 else: 

1224 dot = '.' 

1225 is_a_call = _new_parent._mock_new_name == '()' 

1226 mock_call_name = _new_parent._mock_new_name + dot + mock_call_name 

1227 

1228 # follow the parental chain: 

1229 _new_parent = _new_parent._mock_new_parent 

1230 

1231 def _execute_mock_call(_mock_self, *args, **kwargs): 

1232 self = _mock_self 

1233 # separate from _increment_mock_call so that awaited functions are 

1234 # executed separately from their call, also AsyncMock overrides this method 

1235 

1236 effect = self.side_effect 

1237 if effect is not None: 

1238 if _is_exception(effect): 

1239 raise effect 

1240 elif not _callable(effect): 

1241 result = next(effect) 

1242 if _is_exception(result): 

1243 raise result 

1244 else: 

1245 result = effect(*args, **kwargs) 

1246 

1247 if result is not DEFAULT: 

1248 return result 

1249 

1250 if self._mock_return_value is not DEFAULT: 

1251 return self.return_value 

1252 

1253 if self._mock_wraps is not None: 

1254 return self._mock_wraps(*args, **kwargs) 

1255 

1256 return self.return_value 

1257 

1258 

1259 

1260class Mock(CallableMixin, NonCallableMock): 

1261 """ 

1262 Create a new `Mock` object. `Mock` takes several optional arguments 

1263 that specify the behaviour of the Mock object: 

1264 

1265 * `spec`: This can be either a list of strings or an existing object (a 

1266 class or instance) that acts as the specification for the mock object. If 

1267 you pass in an object then a list of strings is formed by calling dir on 

1268 the object (excluding unsupported magic attributes and methods). Accessing 

1269 any attribute not in this list will raise an `AttributeError`. 

1270 

1271 If `spec` is an object (rather than a list of strings) then 

1272 `mock.__class__` returns the class of the spec object. This allows mocks 

1273 to pass `isinstance` tests. 

1274 

1275 * `spec_set`: A stricter variant of `spec`. If used, attempting to *set* 

1276 or get an attribute on the mock that isn't on the object passed as 

1277 `spec_set` will raise an `AttributeError`. 

1278 

1279 * `side_effect`: A function to be called whenever the Mock is called. See 

1280 the `side_effect` attribute. Useful for raising exceptions or 

1281 dynamically changing return values. The function is called with the same 

1282 arguments as the mock, and unless it returns `DEFAULT`, the return 

1283 value of this function is used as the return value. 

1284 

1285 If `side_effect` is an iterable then each call to the mock will return 

1286 the next value from the iterable. If any of the members of the iterable 

1287 are exceptions they will be raised instead of returned. 

1288 

1289 * `return_value`: The value returned when the mock is called. By default 

1290 this is a new Mock (created on first access). See the 

1291 `return_value` attribute. 

1292 

1293 * `unsafe`: By default, accessing any attribute whose name starts with 

1294 *assert*, *assret*, *asert*, *aseert*, or *assrt* raises an AttributeError. 

1295 Additionally, an AttributeError is raised when accessing 

1296 attributes that match the name of an assertion method without the prefix 

1297 `assert_`, e.g. accessing `called_once` instead of `assert_called_once`. 

1298 Passing `unsafe=True` will allow access to these attributes. 

1299 

1300 * `wraps`: Item for the mock object to wrap. If `wraps` is not None then 

1301 calling the Mock will pass the call through to the wrapped object 

1302 (returning the real result). Attribute access on the mock will return a 

1303 Mock object that wraps the corresponding attribute of the wrapped object 

1304 (so attempting to access an attribute that doesn't exist will raise an 

1305 `AttributeError`). 

1306 

1307 If the mock has an explicit `return_value` set then calls are not passed 

1308 to the wrapped object and the `return_value` is returned instead. 

1309 

1310 * `name`: If the mock has a name then it will be used in the repr of the 

1311 mock. This can be useful for debugging. The name is propagated to child 

1312 mocks. 

1313 

1314 Mocks can also be called with arbitrary keyword arguments. These will be 

1315 used to set attributes on the mock after it is created. 

1316 """ 

1317 

1318 

1319def _dot_lookup(thing, comp, import_path): 

1320 try: 

1321 return getattr(thing, comp) 

1322 except AttributeError: 

1323 __import__(import_path) 

1324 return getattr(thing, comp) 

1325 

1326 

1327def _importer(target): 

1328 components = target.split('.') 

1329 import_path = components.pop(0) 

1330 thing = __import__(import_path) 

1331 

1332 for comp in components: 

1333 import_path += ".%s" % comp 

1334 thing = _dot_lookup(thing, comp, import_path) 

1335 return thing 

1336 

1337 

1338# _check_spec_arg_typos takes kwargs from commands like patch and checks that 

1339# they don't contain common misspellings of arguments related to autospeccing. 

1340def _check_spec_arg_typos(kwargs_to_check): 

1341 typos = ("autospect", "auto_spec", "set_spec") 

1342 for typo in typos: 

1343 if typo in kwargs_to_check: 

1344 raise RuntimeError( 

1345 f"{typo!r} might be a typo; use unsafe=True if this is intended" 

1346 ) 

1347 

1348 

1349class _patch(object): 

1350 

1351 attribute_name = None 

1352 _active_patches = [] 

1353 

1354 def __init__( 

1355 self, getter, attribute, new, spec, create, 

1356 spec_set, autospec, new_callable, kwargs, *, unsafe=False 

1357 ): 

1358 if new_callable is not None: 

1359 if new is not DEFAULT: 

1360 raise ValueError( 

1361 "Cannot use 'new' and 'new_callable' together" 

1362 ) 

1363 if autospec is not None: 

1364 raise ValueError( 

1365 "Cannot use 'autospec' and 'new_callable' together" 

1366 ) 

1367 if not unsafe: 

1368 _check_spec_arg_typos(kwargs) 

1369 if _is_instance_mock(spec): 

1370 raise InvalidSpecError( 

1371 f'Cannot spec attr {attribute!r} as the spec ' 

1372 f'has already been mocked out. [spec={spec!r}]') 

1373 if _is_instance_mock(spec_set): 

1374 raise InvalidSpecError( 

1375 f'Cannot spec attr {attribute!r} as the spec_set ' 

1376 f'target has already been mocked out. [spec_set={spec_set!r}]') 

1377 

1378 self.getter = getter 

1379 self.attribute = attribute 

1380 self.new = new 

1381 self.new_callable = new_callable 

1382 self.spec = spec 

1383 self.create = create 

1384 self.has_local = False 

1385 self.spec_set = spec_set 

1386 self.autospec = autospec 

1387 self.kwargs = kwargs 

1388 self.additional_patchers = [] 

1389 

1390 

1391 def copy(self): 

1392 patcher = _patch( 

1393 self.getter, self.attribute, self.new, self.spec, 

1394 self.create, self.spec_set, 

1395 self.autospec, self.new_callable, self.kwargs 

1396 ) 

1397 patcher.attribute_name = self.attribute_name 

1398 patcher.additional_patchers = [ 

1399 p.copy() for p in self.additional_patchers 

1400 ] 

1401 return patcher 

1402 

1403 

1404 def __call__(self, func): 

1405 if isinstance(func, type): 

1406 return self.decorate_class(func) 

1407 if inspect.iscoroutinefunction(func): 

1408 return self.decorate_async_callable(func) 

1409 return self.decorate_callable(func) 

1410 

1411 

1412 def decorate_class(self, klass): 

1413 for attr in dir(klass): 

1414 if not attr.startswith(patch.TEST_PREFIX): 

1415 continue 

1416 

1417 attr_value = getattr(klass, attr) 

1418 if not hasattr(attr_value, "__call__"): 

1419 continue 

1420 

1421 patcher = self.copy() 

1422 setattr(klass, attr, patcher(attr_value)) 

1423 return klass 

1424 

1425 

1426 @contextlib.contextmanager 

1427 def decoration_helper(self, patched, args, keywargs): 

1428 extra_args = [] 

1429 with contextlib.ExitStack() as exit_stack: 

1430 for patching in patched.patchings: 

1431 arg = exit_stack.enter_context(patching) 

1432 if patching.attribute_name is not None: 

1433 keywargs.update(arg) 

1434 elif patching.new is DEFAULT: 

1435 extra_args.append(arg) 

1436 

1437 args += tuple(extra_args) 

1438 yield (args, keywargs) 

1439 

1440 

1441 def decorate_callable(self, func): 

1442 # NB. Keep the method in sync with decorate_async_callable() 

1443 if hasattr(func, 'patchings'): 

1444 func.patchings.append(self) 

1445 return func 

1446 

1447 @wraps(func) 

1448 def patched(*args, **keywargs): 

1449 with self.decoration_helper(patched, 

1450 args, 

1451 keywargs) as (newargs, newkeywargs): 

1452 return func(*newargs, **newkeywargs) 

1453 

1454 patched.patchings = [self] 

1455 return patched 

1456 

1457 

1458 def decorate_async_callable(self, func): 

1459 # NB. Keep the method in sync with decorate_callable() 

1460 if hasattr(func, 'patchings'): 

1461 func.patchings.append(self) 

1462 return func 

1463 

1464 @wraps(func) 

1465 async def patched(*args, **keywargs): 

1466 with self.decoration_helper(patched, 

1467 args, 

1468 keywargs) as (newargs, newkeywargs): 

1469 return await func(*newargs, **newkeywargs) 

1470 

1471 patched.patchings = [self] 

1472 return patched 

1473 

1474 

1475 def get_original(self): 

1476 target = self.getter() 

1477 name = self.attribute 

1478 

1479 original = DEFAULT 

1480 local = False 

1481 

1482 try: 

1483 original = target.__dict__[name] 

1484 except (AttributeError, KeyError): 

1485 original = getattr(target, name, DEFAULT) 

1486 else: 

1487 local = True 

1488 

1489 if name in _builtins and isinstance(target, ModuleType): 

1490 self.create = True 

1491 

1492 if not self.create and original is DEFAULT: 

1493 raise AttributeError( 

1494 "%s does not have the attribute %r" % (target, name) 

1495 ) 

1496 return original, local 

1497 

1498 

1499 def __enter__(self): 

1500 """Perform the patch.""" 

1501 new, spec, spec_set = self.new, self.spec, self.spec_set 

1502 autospec, kwargs = self.autospec, self.kwargs 

1503 new_callable = self.new_callable 

1504 self.target = self.getter() 

1505 

1506 # normalise False to None 

1507 if spec is False: 

1508 spec = None 

1509 if spec_set is False: 

1510 spec_set = None 

1511 if autospec is False: 

1512 autospec = None 

1513 

1514 if spec is not None and autospec is not None: 

1515 raise TypeError("Can't specify spec and autospec") 

1516 if ((spec is not None or autospec is not None) and 

1517 spec_set not in (True, None)): 

1518 raise TypeError("Can't provide explicit spec_set *and* spec or autospec") 

1519 

1520 original, local = self.get_original() 

1521 

1522 if new is DEFAULT and autospec is None: 

1523 inherit = False 

1524 if spec is True: 

1525 # set spec to the object we are replacing 

1526 spec = original 

1527 if spec_set is True: 

1528 spec_set = original 

1529 spec = None 

1530 elif spec is not None: 

1531 if spec_set is True: 

1532 spec_set = spec 

1533 spec = None 

1534 elif spec_set is True: 

1535 spec_set = original 

1536 

1537 if spec is not None or spec_set is not None: 

1538 if original is DEFAULT: 

1539 raise TypeError("Can't use 'spec' with create=True") 

1540 if isinstance(original, type): 

1541 # If we're patching out a class and there is a spec 

1542 inherit = True 

1543 if spec is None and _is_async_obj(original): 

1544 Klass = AsyncMock 

1545 else: 

1546 Klass = MagicMock 

1547 _kwargs = {} 

1548 if new_callable is not None: 

1549 Klass = new_callable 

1550 elif spec is not None or spec_set is not None: 

1551 this_spec = spec 

1552 if spec_set is not None: 

1553 this_spec = spec_set 

1554 if _is_list(this_spec): 

1555 not_callable = '__call__' not in this_spec 

1556 else: 

1557 not_callable = not callable(this_spec) 

1558 if _is_async_obj(this_spec): 

1559 Klass = AsyncMock 

1560 elif not_callable: 

1561 Klass = NonCallableMagicMock 

1562 

1563 if spec is not None: 

1564 _kwargs['spec'] = spec 

1565 if spec_set is not None: 

1566 _kwargs['spec_set'] = spec_set 

1567 

1568 # add a name to mocks 

1569 if (isinstance(Klass, type) and 

1570 issubclass(Klass, NonCallableMock) and self.attribute): 

1571 _kwargs['name'] = self.attribute 

1572 

1573 _kwargs.update(kwargs) 

1574 new = Klass(**_kwargs) 

1575 

1576 if inherit and _is_instance_mock(new): 

1577 # we can only tell if the instance should be callable if the 

1578 # spec is not a list 

1579 this_spec = spec 

1580 if spec_set is not None: 

1581 this_spec = spec_set 

1582 if (not _is_list(this_spec) and not 

1583 _instance_callable(this_spec)): 

1584 Klass = NonCallableMagicMock 

1585 

1586 _kwargs.pop('name') 

1587 new.return_value = Klass(_new_parent=new, _new_name='()', 

1588 **_kwargs) 

1589 elif autospec is not None: 

1590 # spec is ignored, new *must* be default, spec_set is treated 

1591 # as a boolean. Should we check spec is not None and that spec_set 

1592 # is a bool? 

1593 if new is not DEFAULT: 

1594 raise TypeError( 

1595 "autospec creates the mock for you. Can't specify " 

1596 "autospec and new." 

1597 ) 

1598 if original is DEFAULT: 

1599 raise TypeError("Can't use 'autospec' with create=True") 

1600 spec_set = bool(spec_set) 

1601 if autospec is True: 

1602 autospec = original 

1603 

1604 if _is_instance_mock(self.target): 

1605 raise InvalidSpecError( 

1606 f'Cannot autospec attr {self.attribute!r} as the patch ' 

1607 f'target has already been mocked out. ' 

1608 f'[target={self.target!r}, attr={autospec!r}]') 

1609 if _is_instance_mock(autospec): 

1610 target_name = getattr(self.target, '__name__', self.target) 

1611 raise InvalidSpecError( 

1612 f'Cannot autospec attr {self.attribute!r} from target ' 

1613 f'{target_name!r} as it has already been mocked out. ' 

1614 f'[target={self.target!r}, attr={autospec!r}]') 

1615 

1616 new = create_autospec(autospec, spec_set=spec_set, 

1617 _name=self.attribute, **kwargs) 

1618 elif kwargs: 

1619 # can't set keyword args when we aren't creating the mock 

1620 # XXXX If new is a Mock we could call new.configure_mock(**kwargs) 

1621 raise TypeError("Can't pass kwargs to a mock we aren't creating") 

1622 

1623 new_attr = new 

1624 

1625 self.temp_original = original 

1626 self.is_local = local 

1627 self._exit_stack = contextlib.ExitStack() 

1628 try: 

1629 setattr(self.target, self.attribute, new_attr) 

1630 if self.attribute_name is not None: 

1631 extra_args = {} 

1632 if self.new is DEFAULT: 

1633 extra_args[self.attribute_name] = new 

1634 for patching in self.additional_patchers: 

1635 arg = self._exit_stack.enter_context(patching) 

1636 if patching.new is DEFAULT: 

1637 extra_args.update(arg) 

1638 return extra_args 

1639 

1640 return new 

1641 except: 

1642 if not self.__exit__(*sys.exc_info()): 

1643 raise 

1644 

1645 def __exit__(self, *exc_info): 

1646 """Undo the patch.""" 

1647 if self.is_local and self.temp_original is not DEFAULT: 

1648 setattr(self.target, self.attribute, self.temp_original) 

1649 else: 

1650 delattr(self.target, self.attribute) 

1651 if not self.create and (not hasattr(self.target, self.attribute) or 

1652 self.attribute in ('__doc__', '__module__', 

1653 '__defaults__', '__annotations__', 

1654 '__kwdefaults__')): 

1655 # needed for proxy objects like django settings 

1656 setattr(self.target, self.attribute, self.temp_original) 

1657 

1658 del self.temp_original 

1659 del self.is_local 

1660 del self.target 

1661 exit_stack = self._exit_stack 

1662 del self._exit_stack 

1663 return exit_stack.__exit__(*exc_info) 

1664 

1665 

1666 def start(self): 

1667 """Activate a patch, returning any created mock.""" 

1668 result = self.__enter__() 

1669 self._active_patches.append(self) 

1670 return result 

1671 

1672 

1673 def stop(self): 

1674 """Stop an active patch.""" 

1675 try: 

1676 self._active_patches.remove(self) 

1677 except ValueError: 

1678 # If the patch hasn't been started this will fail 

1679 return None 

1680 

1681 return self.__exit__(None, None, None) 

1682 

1683 

1684 

1685def _get_target(target): 

1686 try: 

1687 target, attribute = target.rsplit('.', 1) 

1688 except (TypeError, ValueError, AttributeError): 

1689 raise TypeError( 

1690 f"Need a valid target to patch. You supplied: {target!r}") 

1691 getter = lambda: _importer(target) 

1692 return getter, attribute 

1693 

1694 

1695def _patch_object( 

1696 target, attribute, new=DEFAULT, spec=None, 

1697 create=False, spec_set=None, autospec=None, 

1698 new_callable=None, *, unsafe=False, **kwargs 

1699 ): 

1700 """ 

1701 patch the named member (`attribute`) on an object (`target`) with a mock 

1702 object. 

1703 

1704 `patch.object` can be used as a decorator, class decorator or a context 

1705 manager. Arguments `new`, `spec`, `create`, `spec_set`, 

1706 `autospec` and `new_callable` have the same meaning as for `patch`. Like 

1707 `patch`, `patch.object` takes arbitrary keyword arguments for configuring 

1708 the mock object it creates. 

1709 

1710 When used as a class decorator `patch.object` honours `patch.TEST_PREFIX` 

1711 for choosing which methods to wrap. 

1712 """ 

1713 if type(target) is str: 

1714 raise TypeError( 

1715 f"{target!r} must be the actual object to be patched, not a str" 

1716 ) 

1717 getter = lambda: target 

1718 return _patch( 

1719 getter, attribute, new, spec, create, 

1720 spec_set, autospec, new_callable, kwargs, unsafe=unsafe 

1721 ) 

1722 

1723 

1724def _patch_multiple(target, spec=None, create=False, spec_set=None, 

1725 autospec=None, new_callable=None, **kwargs): 

1726 """Perform multiple patches in a single call. It takes the object to be 

1727 patched (either as an object or a string to fetch the object by importing) 

1728 and keyword arguments for the patches:: 

1729 

1730 with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'): 

1731 ... 

1732 

1733 Use `DEFAULT` as the value if you want `patch.multiple` to create 

1734 mocks for you. In this case the created mocks are passed into a decorated 

1735 function by keyword, and a dictionary is returned when `patch.multiple` is 

1736 used as a context manager. 

1737 

1738 `patch.multiple` can be used as a decorator, class decorator or a context 

1739 manager. The arguments `spec`, `spec_set`, `create`, 

1740 `autospec` and `new_callable` have the same meaning as for `patch`. These 

1741 arguments will be applied to *all* patches done by `patch.multiple`. 

1742 

1743 When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX` 

1744 for choosing which methods to wrap. 

1745 """ 

1746 if type(target) is str: 

1747 getter = lambda: _importer(target) 

1748 else: 

1749 getter = lambda: target 

1750 

1751 if not kwargs: 

1752 raise ValueError( 

1753 'Must supply at least one keyword argument with patch.multiple' 

1754 ) 

1755 # need to wrap in a list for python 3, where items is a view 

1756 items = list(kwargs.items()) 

1757 attribute, new = items[0] 

1758 patcher = _patch( 

1759 getter, attribute, new, spec, create, spec_set, 

1760 autospec, new_callable, {} 

1761 ) 

1762 patcher.attribute_name = attribute 

1763 for attribute, new in items[1:]: 

1764 this_patcher = _patch( 

1765 getter, attribute, new, spec, create, spec_set, 

1766 autospec, new_callable, {} 

1767 ) 

1768 this_patcher.attribute_name = attribute 

1769 patcher.additional_patchers.append(this_patcher) 

1770 return patcher 

1771 

1772 

1773def patch( 

1774 target, new=DEFAULT, spec=None, create=False, 

1775 spec_set=None, autospec=None, new_callable=None, *, unsafe=False, **kwargs 

1776 ): 

1777 """ 

1778 `patch` acts as a function decorator, class decorator or a context 

1779 manager. Inside the body of the function or with statement, the `target` 

1780 is patched with a `new` object. When the function/with statement exits 

1781 the patch is undone. 

1782 

1783 If `new` is omitted, then the target is replaced with an 

1784 `AsyncMock if the patched object is an async function or a 

1785 `MagicMock` otherwise. If `patch` is used as a decorator and `new` is 

1786 omitted, the created mock is passed in as an extra argument to the 

1787 decorated function. If `patch` is used as a context manager the created 

1788 mock is returned by the context manager. 

1789 

1790 `target` should be a string in the form `'package.module.ClassName'`. The 

1791 `target` is imported and the specified object replaced with the `new` 

1792 object, so the `target` must be importable from the environment you are 

1793 calling `patch` from. The target is imported when the decorated function 

1794 is executed, not at decoration time. 

1795 

1796 The `spec` and `spec_set` keyword arguments are passed to the `MagicMock` 

1797 if patch is creating one for you. 

1798 

1799 In addition you can pass `spec=True` or `spec_set=True`, which causes 

1800 patch to pass in the object being mocked as the spec/spec_set object. 

1801 

1802 `new_callable` allows you to specify a different class, or callable object, 

1803 that will be called to create the `new` object. By default `AsyncMock` is 

1804 used for async functions and `MagicMock` for the rest. 

1805 

1806 A more powerful form of `spec` is `autospec`. If you set `autospec=True` 

1807 then the mock will be created with a spec from the object being replaced. 

1808 All attributes of the mock will also have the spec of the corresponding 

1809 attribute of the object being replaced. Methods and functions being 

1810 mocked will have their arguments checked and will raise a `TypeError` if 

1811 they are called with the wrong signature. For mocks replacing a class, 

1812 their return value (the 'instance') will have the same spec as the class. 

1813 

1814 Instead of `autospec=True` you can pass `autospec=some_object` to use an 

1815 arbitrary object as the spec instead of the one being replaced. 

1816 

1817 By default `patch` will fail to replace attributes that don't exist. If 

1818 you pass in `create=True`, and the attribute doesn't exist, patch will 

1819 create the attribute for you when the patched function is called, and 

1820 delete it again afterwards. This is useful for writing tests against 

1821 attributes that your production code creates at runtime. It is off by 

1822 default because it can be dangerous. With it switched on you can write 

1823 passing tests against APIs that don't actually exist! 

1824 

1825 Patch can be used as a `TestCase` class decorator. It works by 

1826 decorating each test method in the class. This reduces the boilerplate 

1827 code when your test methods share a common patchings set. `patch` finds 

1828 tests by looking for method names that start with `patch.TEST_PREFIX`. 

1829 By default this is `test`, which matches the way `unittest` finds tests. 

1830 You can specify an alternative prefix by setting `patch.TEST_PREFIX`. 

1831 

1832 Patch can be used as a context manager, with the with statement. Here the 

1833 patching applies to the indented block after the with statement. If you 

1834 use "as" then the patched object will be bound to the name after the 

1835 "as"; very useful if `patch` is creating a mock object for you. 

1836 

1837 Patch will raise a `RuntimeError` if passed some common misspellings of 

1838 the arguments autospec and spec_set. Pass the argument `unsafe` with the 

1839 value True to disable that check. 

1840 

1841 `patch` takes arbitrary keyword arguments. These will be passed to 

1842 `AsyncMock` if the patched object is asynchronous, to `MagicMock` 

1843 otherwise or to `new_callable` if specified. 

1844 

1845 `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are 

1846 available for alternate use-cases. 

1847 """ 

1848 getter, attribute = _get_target(target) 

1849 return _patch( 

1850 getter, attribute, new, spec, create, 

1851 spec_set, autospec, new_callable, kwargs, unsafe=unsafe 

1852 ) 

1853 

1854 

1855class _patch_dict(object): 

1856 """ 

1857 Patch a dictionary, or dictionary like object, and restore the dictionary 

1858 to its original state after the test. 

1859 

1860 `in_dict` can be a dictionary or a mapping like container. If it is a 

1861 mapping then it must at least support getting, setting and deleting items 

1862 plus iterating over keys. 

1863 

1864 `in_dict` can also be a string specifying the name of the dictionary, which 

1865 will then be fetched by importing it. 

1866 

1867 `values` can be a dictionary of values to set in the dictionary. `values` 

1868 can also be an iterable of `(key, value)` pairs. 

1869 

1870 If `clear` is True then the dictionary will be cleared before the new 

1871 values are set. 

1872 

1873 `patch.dict` can also be called with arbitrary keyword arguments to set 

1874 values in the dictionary:: 

1875 

1876 with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()): 

1877 ... 

1878 

1879 `patch.dict` can be used as a context manager, decorator or class 

1880 decorator. When used as a class decorator `patch.dict` honours 

1881 `patch.TEST_PREFIX` for choosing which methods to wrap. 

1882 """ 

1883 

1884 def __init__(self, in_dict, values=(), clear=False, **kwargs): 

1885 self.in_dict = in_dict 

1886 # support any argument supported by dict(...) constructor 

1887 self.values = dict(values) 

1888 self.values.update(kwargs) 

1889 self.clear = clear 

1890 self._original = None 

1891 

1892 

1893 def __call__(self, f): 

1894 if isinstance(f, type): 

1895 return self.decorate_class(f) 

1896 if inspect.iscoroutinefunction(f): 

1897 return self.decorate_async_callable(f) 

1898 return self.decorate_callable(f) 

1899 

1900 

1901 def decorate_callable(self, f): 

1902 @wraps(f) 

1903 def _inner(*args, **kw): 

1904 self._patch_dict() 

1905 try: 

1906 return f(*args, **kw) 

1907 finally: 

1908 self._unpatch_dict() 

1909 

1910 return _inner 

1911 

1912 

1913 def decorate_async_callable(self, f): 

1914 @wraps(f) 

1915 async def _inner(*args, **kw): 

1916 self._patch_dict() 

1917 try: 

1918 return await f(*args, **kw) 

1919 finally: 

1920 self._unpatch_dict() 

1921 

1922 return _inner 

1923 

1924 

1925 def decorate_class(self, klass): 

1926 for attr in dir(klass): 

1927 attr_value = getattr(klass, attr) 

1928 if (attr.startswith(patch.TEST_PREFIX) and 

1929 hasattr(attr_value, "__call__")): 

1930 decorator = _patch_dict(self.in_dict, self.values, self.clear) 

1931 decorated = decorator(attr_value) 

1932 setattr(klass, attr, decorated) 

1933 return klass 

1934 

1935 

1936 def __enter__(self): 

1937 """Patch the dict.""" 

1938 self._patch_dict() 

1939 return self.in_dict 

1940 

1941 

1942 def _patch_dict(self): 

1943 values = self.values 

1944 if isinstance(self.in_dict, str): 

1945 self.in_dict = _importer(self.in_dict) 

1946 in_dict = self.in_dict 

1947 clear = self.clear 

1948 

1949 try: 

1950 original = in_dict.copy() 

1951 except AttributeError: 

1952 # dict like object with no copy method 

1953 # must support iteration over keys 

1954 original = {} 

1955 for key in in_dict: 

1956 original[key] = in_dict[key] 

1957 self._original = original 

1958 

1959 if clear: 

1960 _clear_dict(in_dict) 

1961 

1962 try: 

1963 in_dict.update(values) 

1964 except AttributeError: 

1965 # dict like object with no update method 

1966 for key in values: 

1967 in_dict[key] = values[key] 

1968 

1969 

1970 def _unpatch_dict(self): 

1971 in_dict = self.in_dict 

1972 original = self._original 

1973 

1974 _clear_dict(in_dict) 

1975 

1976 try: 

1977 in_dict.update(original) 

1978 except AttributeError: 

1979 for key in original: 

1980 in_dict[key] = original[key] 

1981 

1982 

1983 def __exit__(self, *args): 

1984 """Unpatch the dict.""" 

1985 if self._original is not None: 

1986 self._unpatch_dict() 

1987 return False 

1988 

1989 

1990 def start(self): 

1991 """Activate a patch, returning any created mock.""" 

1992 result = self.__enter__() 

1993 _patch._active_patches.append(self) 

1994 return result 

1995 

1996 

1997 def stop(self): 

1998 """Stop an active patch.""" 

1999 try: 

2000 _patch._active_patches.remove(self) 

2001 except ValueError: 

2002 # If the patch hasn't been started this will fail 

2003 return None 

2004 

2005 return self.__exit__(None, None, None) 

2006 

2007 

2008def _clear_dict(in_dict): 

2009 try: 

2010 in_dict.clear() 

2011 except AttributeError: 

2012 keys = list(in_dict) 

2013 for key in keys: 

2014 del in_dict[key] 

2015 

2016 

2017def _patch_stopall(): 

2018 """Stop all active patches. LIFO to unroll nested patches.""" 

2019 for patch in reversed(_patch._active_patches): 

2020 patch.stop() 

2021 

2022 

2023patch.object = _patch_object 

2024patch.dict = _patch_dict 

2025patch.multiple = _patch_multiple 

2026patch.stopall = _patch_stopall 

2027patch.TEST_PREFIX = 'test' 

2028 

2029magic_methods = ( 

2030 "lt le gt ge eq ne " 

2031 "getitem setitem delitem " 

2032 "len contains iter " 

2033 "hash str sizeof " 

2034 "enter exit " 

2035 # we added divmod and rdivmod here instead of numerics 

2036 # because there is no idivmod 

2037 "divmod rdivmod neg pos abs invert " 

2038 "complex int float index " 

2039 "round trunc floor ceil " 

2040 "bool next " 

2041 "fspath " 

2042 "aiter " 

2043) 

2044 

2045if IS_PYPY: 

2046 # PyPy has no __sizeof__: http://doc.pypy.org/en/latest/cpython_differences.html 

2047 magic_methods = magic_methods.replace('sizeof ', '') 

2048 

2049numerics = ( 

2050 "add sub mul matmul truediv floordiv mod lshift rshift and xor or pow" 

2051) 

2052inplace = ' '.join('i%s' % n for n in numerics.split()) 

2053right = ' '.join('r%s' % n for n in numerics.split()) 

2054 

2055# not including __prepare__, __instancecheck__, __subclasscheck__ 

2056# (as they are metaclass methods) 

2057# __del__ is not supported at all as it causes problems if it exists 

2058 

2059_non_defaults = { 

2060 '__get__', '__set__', '__delete__', '__reversed__', '__missing__', 

2061 '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__', 

2062 '__getstate__', '__setstate__', '__getformat__', 

2063 '__repr__', '__dir__', '__subclasses__', '__format__', 

2064 '__getnewargs_ex__', 

2065} 

2066 

2067 

2068def _get_method(name, func): 

2069 "Turns a callable object (like a mock) into a real function" 

2070 def method(self, *args, **kw): 

2071 return func(self, *args, **kw) 

2072 method.__name__ = name 

2073 return method 

2074 

2075 

2076_magics = { 

2077 '__%s__' % method for method in 

2078 ' '.join([magic_methods, numerics, inplace, right]).split() 

2079} 

2080 

2081# Magic methods used for async `with` statements 

2082_async_method_magics = {"__aenter__", "__aexit__", "__anext__"} 

2083# Magic methods that are only used with async calls but are synchronous functions themselves 

2084_sync_async_magics = {"__aiter__"} 

2085_async_magics = _async_method_magics | _sync_async_magics 

2086 

2087_all_sync_magics = _magics | _non_defaults 

2088_all_magics = _all_sync_magics | _async_magics 

2089 

2090_unsupported_magics = { 

2091 '__getattr__', '__setattr__', 

2092 '__init__', '__new__', '__prepare__', 

2093 '__instancecheck__', '__subclasscheck__', 

2094 '__del__' 

2095} 

2096 

2097_calculate_return_value = { 

2098 '__hash__': lambda self: object.__hash__(self), 

2099 '__str__': lambda self: object.__str__(self), 

2100 '__sizeof__': lambda self: object.__sizeof__(self), 

2101 '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}", 

2102} 

2103 

2104_return_values = { 

2105 '__lt__': NotImplemented, 

2106 '__gt__': NotImplemented, 

2107 '__le__': NotImplemented, 

2108 '__ge__': NotImplemented, 

2109 '__int__': 1, 

2110 '__contains__': False, 

2111 '__len__': 0, 

2112 '__exit__': False, 

2113 '__complex__': 1j, 

2114 '__float__': 1.0, 

2115 '__bool__': True, 

2116 '__index__': 1, 

2117 '__aexit__': False, 

2118} 

2119 

2120 

2121def _get_eq(self): 

2122 def __eq__(other): 

2123 ret_val = self.__eq__._mock_return_value 

2124 if ret_val is not DEFAULT: 

2125 return ret_val 

2126 if self is other: 

2127 return True 

2128 return NotImplemented 

2129 return __eq__ 

2130 

2131def _get_ne(self): 

2132 def __ne__(other): 

2133 if self.__ne__._mock_return_value is not DEFAULT: 

2134 return DEFAULT 

2135 if self is other: 

2136 return False 

2137 return NotImplemented 

2138 return __ne__ 

2139 

2140def _get_iter(self): 

2141 def __iter__(): 

2142 ret_val = self.__iter__._mock_return_value 

2143 if ret_val is DEFAULT: 

2144 return iter([]) 

2145 # if ret_val was already an iterator, then calling iter on it should 

2146 # return the iterator unchanged 

2147 return iter(ret_val) 

2148 return __iter__ 

2149 

2150def _get_async_iter(self): 

2151 def __aiter__(): 

2152 ret_val = self.__aiter__._mock_return_value 

2153 if ret_val is DEFAULT: 

2154 return _AsyncIterator(iter([])) 

2155 return _AsyncIterator(iter(ret_val)) 

2156 return __aiter__ 

2157 

2158_side_effect_methods = { 

2159 '__eq__': _get_eq, 

2160 '__ne__': _get_ne, 

2161 '__iter__': _get_iter, 

2162 '__aiter__': _get_async_iter 

2163} 

2164 

2165 

2166 

2167def _set_return_value(mock, method, name): 

2168 fixed = _return_values.get(name, DEFAULT) 

2169 if fixed is not DEFAULT: 

2170 method.return_value = fixed 

2171 return 

2172 

2173 return_calculator = _calculate_return_value.get(name) 

2174 if return_calculator is not None: 

2175 return_value = return_calculator(mock) 

2176 method.return_value = return_value 

2177 return 

2178 

2179 side_effector = _side_effect_methods.get(name) 

2180 if side_effector is not None: 

2181 method.side_effect = side_effector(mock) 

2182 

2183 

2184 

2185class MagicMixin(Base): 

2186 def __init__(self, *args, **kw): 

2187 self._mock_set_magics() # make magic work for kwargs in init 

2188 _safe_super(MagicMixin, self).__init__(*args, **kw) 

2189 self._mock_set_magics() # fix magic broken by upper level init 

2190 

2191 

2192 def _mock_set_magics(self): 

2193 orig_magics = _magics | _async_method_magics 

2194 these_magics = orig_magics 

2195 

2196 if getattr(self, "_mock_methods", None) is not None: 

2197 these_magics = orig_magics.intersection(self._mock_methods) 

2198 

2199 remove_magics = set() 

2200 remove_magics = orig_magics - these_magics 

2201 

2202 for entry in remove_magics: 

2203 if entry in type(self).__dict__: 

2204 # remove unneeded magic methods 

2205 delattr(self, entry) 

2206 

2207 # don't overwrite existing attributes if called a second time 

2208 these_magics = these_magics - set(type(self).__dict__) 

2209 

2210 _type = type(self) 

2211 for entry in these_magics: 

2212 setattr(_type, entry, MagicProxy(entry, self)) 

2213 

2214 

2215 

2216class NonCallableMagicMock(MagicMixin, NonCallableMock): 

2217 """A version of `MagicMock` that isn't callable.""" 

2218 def mock_add_spec(self, spec, spec_set=False): 

2219 """Add a spec to a mock. `spec` can either be an object or a 

2220 list of strings. Only attributes on the `spec` can be fetched as 

2221 attributes from the mock. 

2222 

2223 If `spec_set` is True then only attributes on the spec can be set.""" 

2224 self._mock_add_spec(spec, spec_set) 

2225 self._mock_set_magics() 

2226 

2227 

2228class AsyncMagicMixin(MagicMixin): 

2229 pass 

2230 

2231 

2232class MagicMock(MagicMixin, Mock): 

2233 """ 

2234 MagicMock is a subclass of Mock with default implementations 

2235 of most of the magic methods. You can use MagicMock without having to 

2236 configure the magic methods yourself. 

2237 

2238 If you use the `spec` or `spec_set` arguments then *only* magic 

2239 methods that exist in the spec will be created. 

2240 

2241 Attributes and the return value of a `MagicMock` will also be `MagicMocks`. 

2242 """ 

2243 def mock_add_spec(self, spec, spec_set=False): 

2244 """Add a spec to a mock. `spec` can either be an object or a 

2245 list of strings. Only attributes on the `spec` can be fetched as 

2246 attributes from the mock. 

2247 

2248 If `spec_set` is True then only attributes on the spec can be set.""" 

2249 self._mock_add_spec(spec, spec_set) 

2250 self._mock_set_magics() 

2251 

2252 

2253 

2254class MagicProxy(Base): 

2255 def __init__(self, name, parent): 

2256 self.name = name 

2257 self.parent = parent 

2258 

2259 def create_mock(self): 

2260 entry = self.name 

2261 parent = self.parent 

2262 m = parent._get_child_mock(name=entry, _new_name=entry, 

2263 _new_parent=parent) 

2264 setattr(parent, entry, m) 

2265 _set_return_value(parent, m, entry) 

2266 return m 

2267 

2268 def __get__(self, obj, _type=None): 

2269 return self.create_mock() 

2270 

2271 

2272_CODE_ATTRS = dir(CodeType) 

2273_CODE_SIG = inspect.signature(partial(CodeType.__init__, None)) 

2274 

2275 

2276class AsyncMockMixin(Base): 

2277 await_count = _delegating_property('await_count') 

2278 await_args = _delegating_property('await_args') 

2279 await_args_list = _delegating_property('await_args_list') 

2280 

2281 def __init__(self, *args, **kwargs): 

2282 super().__init__(*args, **kwargs) 

2283 # iscoroutinefunction() checks _is_coroutine property to say if an 

2284 # object is a coroutine. Without this check it looks to see if it is a 

2285 # function/method, which in this case it is not (since it is an 

2286 # AsyncMock). 

2287 # It is set through __dict__ because when spec_set is True, this 

2288 # attribute is likely undefined. 

2289 self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine 

2290 self.__dict__['_mock_await_count'] = 0 

2291 self.__dict__['_mock_await_args'] = None 

2292 self.__dict__['_mock_await_args_list'] = _CallList() 

2293 code_mock = NonCallableMock(spec_set=_CODE_ATTRS) 

2294 code_mock.__dict__["_spec_class"] = CodeType 

2295 code_mock.__dict__["_spec_signature"] = _CODE_SIG 

2296 code_mock.co_flags = ( 

2297 inspect.CO_COROUTINE 

2298 + inspect.CO_VARARGS 

2299 + inspect.CO_VARKEYWORDS 

2300 ) 

2301 code_mock.co_argcount = 0 

2302 code_mock.co_varnames = ('args', 'kwargs') 

2303 try: 

2304 code_mock.co_posonlyargcount = 0 

2305 except AttributeError: 

2306 # Python 3.7 and earlier. 

2307 pass 

2308 code_mock.co_kwonlyargcount = 0 

2309 self.__dict__['__code__'] = code_mock 

2310 self.__dict__['__name__'] = 'AsyncMock' 

2311 self.__dict__['__defaults__'] = tuple() 

2312 self.__dict__['__kwdefaults__'] = {} 

2313 self.__dict__['__annotations__'] = None 

2314 

2315 async def _execute_mock_call(_mock_self, *args, **kwargs): 

2316 self = _mock_self 

2317 # This is nearly just like super(), except for special handling 

2318 # of coroutines 

2319 

2320 _call = _Call((args, kwargs), two=True) 

2321 self.await_count += 1 

2322 self.await_args = _call 

2323 self.await_args_list.append(_call) 

2324 

2325 effect = self.side_effect 

2326 if effect is not None: 

2327 if _is_exception(effect): 

2328 raise effect 

2329 elif not _callable(effect): 

2330 try: 

2331 result = next(effect) 

2332 except StopIteration: 

2333 # It is impossible to propagate a StopIteration 

2334 # through coroutines because of PEP 479 

2335 raise StopAsyncIteration 

2336 if _is_exception(result): 

2337 raise result 

2338 elif iscoroutinefunction(effect): 

2339 result = await effect(*args, **kwargs) 

2340 else: 

2341 result = effect(*args, **kwargs) 

2342 

2343 if result is not DEFAULT: 

2344 return result 

2345 

2346 if self._mock_return_value is not DEFAULT: 

2347 return self.return_value 

2348 

2349 if self._mock_wraps is not None: 

2350 if iscoroutinefunction(self._mock_wraps): 

2351 return await self._mock_wraps(*args, **kwargs) 

2352 return self._mock_wraps(*args, **kwargs) 

2353 

2354 return self.return_value 

2355 

2356 def assert_awaited(_mock_self): 

2357 """ 

2358 Assert that the mock was awaited at least once. 

2359 """ 

2360 self = _mock_self 

2361 if self.await_count == 0: 

2362 msg = f"Expected {self._mock_name or 'mock'} to have been awaited." 

2363 raise AssertionError(msg) 

2364 

2365 def assert_awaited_once(_mock_self): 

2366 """ 

2367 Assert that the mock was awaited exactly once. 

2368 """ 

2369 self = _mock_self 

2370 if not self.await_count == 1: 

2371 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 

2372 f" Awaited {self.await_count} times.") 

2373 raise AssertionError(msg) 

2374 

2375 def assert_awaited_with(_mock_self, *args, **kwargs): 

2376 """ 

2377 Assert that the last await was with the specified arguments. 

2378 """ 

2379 self = _mock_self 

2380 if self.await_args is None: 

2381 expected = self._format_mock_call_signature(args, kwargs) 

2382 raise AssertionError(f'Expected await: {expected}\nNot awaited') 

2383 

2384 def _error_message(): 

2385 msg = self._format_mock_failure_message(args, kwargs, action='await') 

2386 return msg 

2387 

2388 expected = self._call_matcher(_Call((args, kwargs), two=True)) 

2389 actual = self._call_matcher(self.await_args) 

2390 if actual != expected: 

2391 cause = expected if isinstance(expected, Exception) else None 

2392 raise AssertionError(_error_message()) from cause 

2393 

2394 def assert_awaited_once_with(_mock_self, *args, **kwargs): 

2395 """ 

2396 Assert that the mock was awaited exactly once and with the specified 

2397 arguments. 

2398 """ 

2399 self = _mock_self 

2400 if not self.await_count == 1: 

2401 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 

2402 f" Awaited {self.await_count} times.") 

2403 raise AssertionError(msg) 

2404 return self.assert_awaited_with(*args, **kwargs) 

2405 

2406 def assert_any_await(_mock_self, *args, **kwargs): 

2407 """ 

2408 Assert the mock has ever been awaited with the specified arguments. 

2409 """ 

2410 self = _mock_self 

2411 expected = self._call_matcher(_Call((args, kwargs), two=True)) 

2412 cause = expected if isinstance(expected, Exception) else None 

2413 actual = [self._call_matcher(c) for c in self.await_args_list] 

2414 if cause or expected not in _AnyComparer(actual): 

2415 expected_string = self._format_mock_call_signature(args, kwargs) 

2416 raise AssertionError( 

2417 '%s await not found' % expected_string 

2418 ) from cause 

2419 

2420 def assert_has_awaits(_mock_self, calls, any_order=False): 

2421 """ 

2422 Assert the mock has been awaited with the specified calls. 

2423 The :attr:`await_args_list` list is checked for the awaits. 

2424 

2425 If `any_order` is False (the default) then the awaits must be 

2426 sequential. There can be extra calls before or after the 

2427 specified awaits. 

2428 

2429 If `any_order` is True then the awaits can be in any order, but 

2430 they must all appear in :attr:`await_args_list`. 

2431 """ 

2432 self = _mock_self 

2433 expected = [self._call_matcher(c) for c in calls] 

2434 cause = cause = next((e for e in expected if isinstance(e, Exception)), None) 

2435 all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list) 

2436 if not any_order: 

2437 if expected not in all_awaits: 

2438 if cause is None: 

2439 problem = 'Awaits not found.' 

2440 else: 

2441 problem = ('Error processing expected awaits.\n' 

2442 'Errors: {}').format( 

2443 [e if isinstance(e, Exception) else None 

2444 for e in expected]) 

2445 raise AssertionError( 

2446 f'{problem}\n' 

2447 f'Expected: {_CallList(calls)}\n' 

2448 f'Actual: {self.await_args_list}' 

2449 ) from cause 

2450 return 

2451 

2452 all_awaits = list(all_awaits) 

2453 

2454 not_found = [] 

2455 for kall in expected: 

2456 try: 

2457 all_awaits.remove(kall) 

2458 except ValueError: 

2459 not_found.append(kall) 

2460 if not_found: 

2461 raise AssertionError( 

2462 '%r not all found in await list' % (tuple(not_found),) 

2463 ) from cause 

2464 

2465 def assert_not_awaited(_mock_self): 

2466 """ 

2467 Assert that the mock was never awaited. 

2468 """ 

2469 self = _mock_self 

2470 if self.await_count != 0: 

2471 msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited." 

2472 f" Awaited {self.await_count} times.") 

2473 raise AssertionError(msg) 

2474 

2475 def reset_mock(self, *args, **kwargs): 

2476 """ 

2477 See :func:`.Mock.reset_mock()` 

2478 """ 

2479 super().reset_mock(*args, **kwargs) 

2480 self.await_count = 0 

2481 self.await_args = None 

2482 self.await_args_list = _CallList() 

2483 

2484 

2485class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock): 

2486 """ 

2487 Enhance :class:`Mock` with features allowing to mock 

2488 an async function. 

2489 

2490 The :class:`AsyncMock` object will behave so the object is 

2491 recognized as an async function, and the result of a call is an awaitable: 

2492 

2493 >>> mock = AsyncMock() 

2494 >>> iscoroutinefunction(mock) 

2495 True 

2496 >>> inspect.isawaitable(mock()) 

2497 True 

2498 

2499 

2500 The result of ``mock()`` is an async function which will have the outcome 

2501 of ``side_effect`` or ``return_value``: 

2502 

2503 - if ``side_effect`` is a function, the async function will return the 

2504 result of that function, 

2505 - if ``side_effect`` is an exception, the async function will raise the 

2506 exception, 

2507 - if ``side_effect`` is an iterable, the async function will return the 

2508 next value of the iterable, however, if the sequence of result is 

2509 exhausted, ``StopIteration`` is raised immediately, 

2510 - if ``side_effect`` is not defined, the async function will return the 

2511 value defined by ``return_value``, hence, by default, the async function 

2512 returns a new :class:`AsyncMock` object. 

2513 

2514 If the outcome of ``side_effect`` or ``return_value`` is an async function, 

2515 the mock async function obtained when the mock object is called will be this 

2516 async function itself (and not an async function returning an async 

2517 function). 

2518 

2519 The test author can also specify a wrapped object with ``wraps``. In this 

2520 case, the :class:`Mock` object behavior is the same as with an 

2521 :class:`.Mock` object: the wrapped object may have methods 

2522 defined as async function functions. 

2523 

2524 Based on Martin Richard's asynctest project. 

2525 """ 

2526 

2527 

2528class _ANY(object): 

2529 "A helper object that compares equal to everything." 

2530 

2531 def __eq__(self, other): 

2532 return True 

2533 

2534 def __ne__(self, other): 

2535 return False 

2536 

2537 def __repr__(self): 

2538 return '<ANY>' 

2539 

2540ANY = _ANY() 

2541 

2542 

2543 

2544def _format_call_signature(name, args, kwargs): 

2545 message = '%s(%%s)' % name 

2546 formatted_args = '' 

2547 args_string = ', '.join([repr(arg) for arg in args]) 

2548 kwargs_string = ', '.join([ 

2549 '%s=%r' % (key, value) for key, value in kwargs.items() 

2550 ]) 

2551 if args_string: 

2552 formatted_args = args_string 

2553 if kwargs_string: 

2554 if formatted_args: 

2555 formatted_args += ', ' 

2556 formatted_args += kwargs_string 

2557 

2558 return message % formatted_args 

2559 

2560 

2561 

2562class _Call(tuple): 

2563 """ 

2564 A tuple for holding the results of a call to a mock, either in the form 

2565 `(args, kwargs)` or `(name, args, kwargs)`. 

2566 

2567 If args or kwargs are empty then a call tuple will compare equal to 

2568 a tuple without those values. This makes comparisons less verbose:: 

2569 

2570 _Call(('name', (), {})) == ('name',) 

2571 _Call(('name', (1,), {})) == ('name', (1,)) 

2572 _Call(((), {'a': 'b'})) == ({'a': 'b'},) 

2573 

2574 The `_Call` object provides a useful shortcut for comparing with call:: 

2575 

2576 _Call(((1, 2), {'a': 3})) == call(1, 2, a=3) 

2577 _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3) 

2578 

2579 If the _Call has no name then it will match any name. 

2580 """ 

2581 def __new__(cls, value=(), name='', parent=None, two=False, 

2582 from_kall=True): 

2583 args = () 

2584 kwargs = {} 

2585 _len = len(value) 

2586 if _len == 3: 

2587 name, args, kwargs = value 

2588 elif _len == 2: 

2589 first, second = value 

2590 if isinstance(first, str): 

2591 name = first 

2592 if isinstance(second, tuple): 

2593 args = second 

2594 else: 

2595 kwargs = second 

2596 else: 

2597 args, kwargs = first, second 

2598 elif _len == 1: 

2599 value, = value 

2600 if isinstance(value, str): 

2601 name = value 

2602 elif isinstance(value, tuple): 

2603 args = value 

2604 else: 

2605 kwargs = value 

2606 

2607 if two: 

2608 return tuple.__new__(cls, (args, kwargs)) 

2609 

2610 return tuple.__new__(cls, (name, args, kwargs)) 

2611 

2612 

2613 def __init__(self, value=(), name=None, parent=None, two=False, 

2614 from_kall=True): 

2615 self._mock_name = name 

2616 self._mock_parent = parent 

2617 self._mock_from_kall = from_kall 

2618 

2619 

2620 def __eq__(self, other): 

2621 try: 

2622 len_other = len(other) 

2623 except TypeError: 

2624 return NotImplemented 

2625 

2626 self_name = '' 

2627 if len(self) == 2: 

2628 self_args, self_kwargs = self 

2629 else: 

2630 self_name, self_args, self_kwargs = self 

2631 

2632 if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None) 

2633 and self._mock_parent != other._mock_parent): 

2634 return False 

2635 

2636 other_name = '' 

2637 if len_other == 0: 

2638 other_args, other_kwargs = (), {} 

2639 elif len_other == 3: 

2640 other_name, other_args, other_kwargs = other 

2641 elif len_other == 1: 

2642 value, = other 

2643 if isinstance(value, tuple): 

2644 other_args = value 

2645 other_kwargs = {} 

2646 elif isinstance(value, str): 

2647 other_name = value 

2648 other_args, other_kwargs = (), {} 

2649 else: 

2650 other_args = () 

2651 other_kwargs = value 

2652 elif len_other == 2: 

2653 # could be (name, args) or (name, kwargs) or (args, kwargs) 

2654 first, second = other 

2655 if isinstance(first, str): 

2656 other_name = first 

2657 if isinstance(second, tuple): 

2658 other_args, other_kwargs = second, {} 

2659 else: 

2660 other_args, other_kwargs = (), second 

2661 else: 

2662 other_args, other_kwargs = first, second 

2663 else: 

2664 return False 

2665 

2666 if self_name and other_name != self_name: 

2667 return False 

2668 

2669 # this order is important for ANY to work! 

2670 return (other_args, other_kwargs) == (self_args, self_kwargs) 

2671 

2672 

2673 __ne__ = object.__ne__ 

2674 

2675 

2676 def __call__(self, *args, **kwargs): 

2677 if self._mock_name is None: 

2678 return _Call(('', args, kwargs), name='()') 

2679 

2680 name = self._mock_name + '()' 

2681 return _Call((self._mock_name, args, kwargs), name=name, parent=self) 

2682 

2683 

2684 def __getattr__(self, attr): 

2685 if self._mock_name is None: 

2686 return _Call(name=attr, from_kall=False) 

2687 name = '%s.%s' % (self._mock_name, attr) 

2688 return _Call(name=name, parent=self, from_kall=False) 

2689 

2690 

2691 def __getattribute__(self, attr): 

2692 if attr in tuple.__dict__: 

2693 raise AttributeError 

2694 return tuple.__getattribute__(self, attr) 

2695 

2696 

2697 def _get_call_arguments(self): 

2698 if len(self) == 2: 

2699 args, kwargs = self 

2700 else: 

2701 name, args, kwargs = self 

2702 

2703 return args, kwargs 

2704 

2705 @property 

2706 def args(self): 

2707 return self._get_call_arguments()[0] 

2708 

2709 @property 

2710 def kwargs(self): 

2711 return self._get_call_arguments()[1] 

2712 

2713 def __repr__(self): 

2714 if not self._mock_from_kall: 

2715 name = self._mock_name or 'call' 

2716 if name.startswith('()'): 

2717 name = 'call%s' % name 

2718 return name 

2719 

2720 if len(self) == 2: 

2721 name = 'call' 

2722 args, kwargs = self 

2723 else: 

2724 name, args, kwargs = self 

2725 if not name: 

2726 name = 'call' 

2727 elif not name.startswith('()'): 

2728 name = 'call.%s' % name 

2729 else: 

2730 name = 'call%s' % name 

2731 return _format_call_signature(name, args, kwargs) 

2732 

2733 

2734 def call_list(self): 

2735 """For a call object that represents multiple calls, `call_list` 

2736 returns a list of all the intermediate calls as well as the 

2737 final call.""" 

2738 vals = [] 

2739 thing = self 

2740 while thing is not None: 

2741 if thing._mock_from_kall: 

2742 vals.append(thing) 

2743 thing = thing._mock_parent 

2744 return _CallList(reversed(vals)) 

2745 

2746 

2747call = _Call(from_kall=False) 

2748 

2749 

2750def create_autospec(spec, spec_set=False, instance=False, _parent=None, 

2751 _name=None, *, unsafe=False, **kwargs): 

2752 """Create a mock object using another object as a spec. Attributes on the 

2753 mock will use the corresponding attribute on the `spec` object as their 

2754 spec. 

2755 

2756 Functions or methods being mocked will have their arguments checked 

2757 to check that they are called with the correct signature. 

2758 

2759 If `spec_set` is True then attempting to set attributes that don't exist 

2760 on the spec object will raise an `AttributeError`. 

2761 

2762 If a class is used as a spec then the return value of the mock (the 

2763 instance of the class) will have the same spec. You can use a class as the 

2764 spec for an instance object by passing `instance=True`. The returned mock 

2765 will only be callable if instances of the mock are callable. 

2766 

2767 `create_autospec` will raise a `RuntimeError` if passed some common 

2768 misspellings of the arguments autospec and spec_set. Pass the argument 

2769 `unsafe` with the value True to disable that check. 

2770 

2771 `create_autospec` also takes arbitrary keyword arguments that are passed to 

2772 the constructor of the created mock.""" 

2773 if _is_list(spec): 

2774 # can't pass a list instance to the mock constructor as it will be 

2775 # interpreted as a list of strings 

2776 spec = type(spec) 

2777 

2778 is_type = isinstance(spec, type) 

2779 if _is_instance_mock(spec): 

2780 raise InvalidSpecError(f'Cannot autospec a Mock object. ' 

2781 f'[object={spec!r}]') 

2782 is_async_func = _is_async_func(spec) 

2783 _kwargs = {'spec': spec} 

2784 if spec_set: 

2785 _kwargs = {'spec_set': spec} 

2786 elif spec is None: 

2787 # None we mock with a normal mock without a spec 

2788 _kwargs = {} 

2789 if _kwargs and instance: 

2790 _kwargs['_spec_as_instance'] = True 

2791 if not unsafe: 

2792 _check_spec_arg_typos(kwargs) 

2793 

2794 _kwargs.update(kwargs) 

2795 

2796 Klass = MagicMock 

2797 if inspect.isdatadescriptor(spec): 

2798 # descriptors don't have a spec 

2799 # because we don't know what type they return 

2800 _kwargs = {} 

2801 elif is_async_func: 

2802 if instance: 

2803 raise RuntimeError("Instance can not be True when create_autospec " 

2804 "is mocking an async function") 

2805 Klass = AsyncMock 

2806 elif not _callable(spec): 

2807 Klass = NonCallableMagicMock 

2808 elif is_type and instance and not _instance_callable(spec): 

2809 Klass = NonCallableMagicMock 

2810 

2811 _name = _kwargs.pop('name', _name) 

2812 

2813 _new_name = _name 

2814 if _parent is None: 

2815 # for a top level object no _new_name should be set 

2816 _new_name = '' 

2817 

2818 mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name, 

2819 name=_name, **_kwargs) 

2820 

2821 if isinstance(spec, FunctionTypes): 

2822 # should only happen at the top level because we don't 

2823 # recurse for functions 

2824 if is_async_func: 

2825 mock = _set_async_signature(mock, spec) 

2826 else: 

2827 mock = _set_signature(mock, spec) 

2828 else: 

2829 _check_signature(spec, mock, is_type, instance) 

2830 

2831 if _parent is not None and not instance: 

2832 _parent._mock_children[_name] = mock 

2833 

2834 if is_type and not instance and 'return_value' not in kwargs: 

2835 mock.return_value = create_autospec(spec, spec_set, instance=True, 

2836 _name='()', _parent=mock) 

2837 

2838 for entry in dir(spec): 

2839 if _is_magic(entry): 

2840 # MagicMock already does the useful magic methods for us 

2841 continue 

2842 

2843 # XXXX do we need a better way of getting attributes without 

2844 # triggering code execution (?) Probably not - we need the actual 

2845 # object to mock it so we would rather trigger a property than mock 

2846 # the property descriptor. Likewise we want to mock out dynamically 

2847 # provided attributes. 

2848 # XXXX what about attributes that raise exceptions other than 

2849 # AttributeError on being fetched? 

2850 # we could be resilient against it, or catch and propagate the 

2851 # exception when the attribute is fetched from the mock 

2852 try: 

2853 original = getattr(spec, entry) 

2854 except AttributeError: 

2855 continue 

2856 

2857 kwargs = {'spec': original} 

2858 if spec_set: 

2859 kwargs = {'spec_set': original} 

2860 

2861 if not isinstance(original, FunctionTypes): 

2862 new = _SpecState(original, spec_set, mock, entry, instance) 

2863 mock._mock_children[entry] = new 

2864 else: 

2865 parent = mock 

2866 if isinstance(spec, FunctionTypes): 

2867 parent = mock.mock 

2868 

2869 skipfirst = _must_skip(spec, entry, is_type) 

2870 kwargs['_eat_self'] = skipfirst 

2871 if iscoroutinefunction(original): 

2872 child_klass = AsyncMock 

2873 else: 

2874 child_klass = MagicMock 

2875 new = child_klass(parent=parent, name=entry, _new_name=entry, 

2876 _new_parent=parent, 

2877 **kwargs) 

2878 mock._mock_children[entry] = new 

2879 new.return_value = child_klass() 

2880 _check_signature(original, new, skipfirst=skipfirst) 

2881 

2882 # so functions created with _set_signature become instance attributes, 

2883 # *plus* their underlying mock exists in _mock_children of the parent 

2884 # mock. Adding to _mock_children may be unnecessary where we are also 

2885 # setting as an instance attribute? 

2886 if isinstance(new, FunctionTypes): 

2887 setattr(mock, entry, new) 

2888 

2889 return mock 

2890 

2891 

2892def _must_skip(spec, entry, is_type): 

2893 """ 

2894 Return whether we should skip the first argument on spec's `entry` 

2895 attribute. 

2896 """ 

2897 if not isinstance(spec, type): 

2898 if entry in getattr(spec, '__dict__', {}): 

2899 # instance attribute - shouldn't skip 

2900 return False 

2901 spec = spec.__class__ 

2902 

2903 for klass in spec.__mro__: 

2904 result = klass.__dict__.get(entry, DEFAULT) 

2905 if result is DEFAULT: 

2906 continue 

2907 if isinstance(result, (staticmethod, classmethod)): 

2908 return False 

2909 elif isinstance(result, FunctionTypes): 

2910 # Normal method => skip if looked up on type 

2911 # (if looked up on instance, self is already skipped) 

2912 return is_type 

2913 else: 

2914 return False 

2915 

2916 # function is a dynamically provided attribute 

2917 return is_type 

2918 

2919 

2920class _SpecState(object): 

2921 

2922 def __init__(self, spec, spec_set=False, parent=None, 

2923 name=None, ids=None, instance=False): 

2924 self.spec = spec 

2925 self.ids = ids 

2926 self.spec_set = spec_set 

2927 self.parent = parent 

2928 self.instance = instance 

2929 self.name = name 

2930 

2931 

2932FunctionTypes = ( 

2933 # python function 

2934 type(create_autospec), 

2935 # instance method 

2936 type(ANY.__eq__), 

2937) 

2938 

2939 

2940file_spec = None 

2941open_spec = None 

2942 

2943 

2944def _to_stream(read_data): 

2945 if isinstance(read_data, bytes): 

2946 return io.BytesIO(read_data) 

2947 else: 

2948 return io.StringIO(read_data) 

2949 

2950 

2951def mock_open(mock=None, read_data=''): 

2952 """ 

2953 A helper function to create a mock to replace the use of `open`. It works 

2954 for `open` called directly or used as a context manager. 

2955 

2956 The `mock` argument is the mock object to configure. If `None` (the 

2957 default) then a `MagicMock` will be created for you, with the API limited 

2958 to methods or attributes available on standard file handles. 

2959 

2960 `read_data` is a string for the `read`, `readline` and `readlines` of the 

2961 file handle to return. This is an empty string by default. 

2962 """ 

2963 _read_data = _to_stream(read_data) 

2964 _state = [_read_data, None] 

2965 

2966 def _readlines_side_effect(*args, **kwargs): 

2967 if handle.readlines.return_value is not None: 

2968 return handle.readlines.return_value 

2969 return _state[0].readlines(*args, **kwargs) 

2970 

2971 def _read_side_effect(*args, **kwargs): 

2972 if handle.read.return_value is not None: 

2973 return handle.read.return_value 

2974 return _state[0].read(*args, **kwargs) 

2975 

2976 def _readline_side_effect(*args, **kwargs): 

2977 yield from _iter_side_effect() 

2978 while True: 

2979 yield _state[0].readline(*args, **kwargs) 

2980 

2981 def _iter_side_effect(): 

2982 if handle.readline.return_value is not None: 

2983 while True: 

2984 yield handle.readline.return_value 

2985 for line in _state[0]: 

2986 yield line 

2987 

2988 def _next_side_effect(): 

2989 if handle.readline.return_value is not None: 

2990 return handle.readline.return_value 

2991 return next(_state[0]) 

2992 

2993 def _exit_side_effect(exctype, excinst, exctb): 

2994 handle.close() 

2995 

2996 global file_spec 

2997 if file_spec is None: 

2998 import _io 

2999 file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO)))) 

3000 

3001 global open_spec 

3002 if open_spec is None: 

3003 import _io 

3004 open_spec = list(set(dir(_io.open))) 

3005 if mock is None: 

3006 mock = MagicMock(name='open', spec=open_spec) 

3007 

3008 handle = MagicMock(spec=file_spec) 

3009 handle.__enter__.return_value = handle 

3010 

3011 handle.write.return_value = None 

3012 handle.read.return_value = None 

3013 handle.readline.return_value = None 

3014 handle.readlines.return_value = None 

3015 

3016 handle.read.side_effect = _read_side_effect 

3017 _state[1] = _readline_side_effect() 

3018 handle.readline.side_effect = _state[1] 

3019 handle.readlines.side_effect = _readlines_side_effect 

3020 handle.__iter__.side_effect = _iter_side_effect 

3021 handle.__next__.side_effect = _next_side_effect 

3022 handle.__exit__.side_effect = _exit_side_effect 

3023 

3024 def reset_data(*args, **kwargs): 

3025 _state[0] = _to_stream(read_data) 

3026 if handle.readline.side_effect == _state[1]: 

3027 # Only reset the side effect if the user hasn't overridden it. 

3028 _state[1] = _readline_side_effect() 

3029 handle.readline.side_effect = _state[1] 

3030 return DEFAULT 

3031 

3032 mock.side_effect = reset_data 

3033 mock.return_value = handle 

3034 return mock 

3035 

3036 

3037class PropertyMock(Mock): 

3038 """ 

3039 A mock intended to be used as a property, or other descriptor, on a class. 

3040 `PropertyMock` provides `__get__` and `__set__` methods so you can specify 

3041 a return value when it is fetched. 

3042 

3043 Fetching a `PropertyMock` instance from an object calls the mock, with 

3044 no args. Setting it calls the mock with the value being set. 

3045 """ 

3046 def _get_child_mock(self, **kwargs): 

3047 return MagicMock(**kwargs) 

3048 

3049 def __get__(self, obj, obj_type=None): 

3050 return self() 

3051 def __set__(self, obj, val): 

3052 self(val) 

3053 

3054 

3055_timeout_unset = sentinel.TIMEOUT_UNSET 

3056 

3057class ThreadingMixin(Base): 

3058 

3059 DEFAULT_TIMEOUT = None 

3060 

3061 def _get_child_mock(self, **kw): 

3062 if isinstance(kw.get("parent"), ThreadingMixin): 

3063 kw["timeout"] = kw["parent"]._mock_wait_timeout 

3064 elif isinstance(kw.get("_new_parent"), ThreadingMixin): 

3065 kw["timeout"] = kw["_new_parent"]._mock_wait_timeout 

3066 return super()._get_child_mock(**kw) 

3067 

3068 def __init__(self, *args, timeout=_timeout_unset, **kwargs): 

3069 super().__init__(*args, **kwargs) 

3070 if timeout is _timeout_unset: 

3071 timeout = self.DEFAULT_TIMEOUT 

3072 self.__dict__["_mock_event"] = threading.Event() # Event for any call 

3073 self.__dict__["_mock_calls_events"] = [] # Events for each of the calls 

3074 self.__dict__["_mock_calls_events_lock"] = threading.Lock() 

3075 self.__dict__["_mock_wait_timeout"] = timeout 

3076 

3077 def reset_mock(self, *args, **kwargs): 

3078 """ 

3079 See :func:`.Mock.reset_mock()` 

3080 """ 

3081 super().reset_mock(*args, **kwargs) 

3082 self.__dict__["_mock_event"] = threading.Event() 

3083 self.__dict__["_mock_calls_events"] = [] 

3084 

3085 def __get_event(self, expected_args, expected_kwargs): 

3086 with self._mock_calls_events_lock: 

3087 for args, kwargs, event in self._mock_calls_events: 

3088 if (args, kwargs) == (expected_args, expected_kwargs): 

3089 return event 

3090 new_event = threading.Event() 

3091 self._mock_calls_events.append((expected_args, expected_kwargs, new_event)) 

3092 return new_event 

3093 

3094 def _mock_call(self, *args, **kwargs): 

3095 ret_value = super()._mock_call(*args, **kwargs) 

3096 

3097 call_event = self.__get_event(args, kwargs) 

3098 call_event.set() 

3099 

3100 self._mock_event.set() 

3101 

3102 return ret_value 

3103 

3104 def wait_until_called(self, *, timeout=_timeout_unset): 

3105 """Wait until the mock object is called. 

3106 

3107 `timeout` - time to wait for in seconds, waits forever otherwise. 

3108 Defaults to the constructor provided timeout. 

3109 Use None to block undefinetively. 

3110 """ 

3111 if timeout is _timeout_unset: 

3112 timeout = self._mock_wait_timeout 

3113 if not self._mock_event.wait(timeout=timeout): 

3114 msg = (f"{self._mock_name or 'mock'} was not called before" 

3115 f" timeout({timeout}).") 

3116 raise AssertionError(msg) 

3117 

3118 def wait_until_any_call_with(self, *args, **kwargs): 

3119 """Wait until the mock object is called with given args. 

3120 

3121 Waits for the timeout in seconds provided in the constructor. 

3122 """ 

3123 event = self.__get_event(args, kwargs) 

3124 if not event.wait(timeout=self._mock_wait_timeout): 

3125 expected_string = self._format_mock_call_signature(args, kwargs) 

3126 raise AssertionError(f'{expected_string} call not found') 

3127 

3128 

3129class ThreadingMock(ThreadingMixin, MagicMixin, Mock): 

3130 """ 

3131 A mock that can be used to wait until on calls happening 

3132 in a different thread. 

3133 

3134 The constructor can take a `timeout` argument which 

3135 controls the timeout in seconds for all `wait` calls of the mock. 

3136 

3137 You can change the default timeout of all instances via the 

3138 `ThreadingMock.DEFAULT_TIMEOUT` attribute. 

3139 

3140 If no timeout is set, it will block undefinetively. 

3141 """ 

3142 pass 

3143 

3144 

3145def seal(mock): 

3146 """Disable the automatic generation of child mocks. 

3147 

3148 Given an input Mock, seals it to ensure no further mocks will be generated 

3149 when accessing an attribute that was not already defined. 

3150 

3151 The operation recursively seals the mock passed in, meaning that 

3152 the mock itself, any mocks generated by accessing one of its attributes, 

3153 and all assigned mocks without a name or spec will be sealed. 

3154 """ 

3155 mock._mock_sealed = True 

3156 for attr in dir(mock): 

3157 try: 

3158 m = getattr(mock, attr) 

3159 except AttributeError: 

3160 continue 

3161 if not isinstance(m, NonCallableMock): 

3162 continue 

3163 if isinstance(m._mock_children.get(attr), _SpecState): 

3164 continue 

3165 if m._mock_new_parent is mock: 

3166 seal(m) 

3167 

3168 

3169class _AsyncIterator: 

3170 """ 

3171 Wraps an iterator in an asynchronous iterator. 

3172 """ 

3173 def __init__(self, iterator): 

3174 self.iterator = iterator 

3175 code_mock = NonCallableMock(spec_set=CodeType) 

3176 code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE 

3177 self.__dict__['__code__'] = code_mock 

3178 

3179 async def __anext__(self): 

3180 try: 

3181 return next(self.iterator) 

3182 except StopIteration: 

3183 pass 

3184 raise StopAsyncIteration