Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/mock/mock.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1734 statements  

1# mock.py 

2# Test tools for mocking and patching. 

3# Maintained by Michael Foord 

4# Backport for other versions of Python available from 

5# https://pypi.org/project/mock 

6 

7__all__ = ( 

8 'Mock', 

9 'MagicMock', 

10 'patch', 

11 'sentinel', 

12 'DEFAULT', 

13 'ANY', 

14 'call', 

15 'create_autospec', 

16 'AsyncMock', 

17 'ThreadingMock', 

18 'FILTER_DIR', 

19 'NonCallableMock', 

20 'NonCallableMagicMock', 

21 'mock_open', 

22 'PropertyMock', 

23 'seal', 

24) 

25 

26 

27import asyncio 

28import contextlib 

29import io 

30import inspect 

31import pprint 

32import sys 

33import threading 

34import builtins 

35 

36try: 

37 from dataclasses import fields, is_dataclass 

38 HAS_DATACLASSES = True 

39except ImportError: 

40 HAS_DATACLASSES = False 

41 

42from types import CodeType, ModuleType, MethodType 

43from unittest.util import safe_repr 

44from functools import wraps, partial 

45from threading import RLock 

46 

47 

48from mock import IS_PYPY 

49from .backports import iscoroutinefunction 

50 

51 

52class InvalidSpecError(Exception): 

53 """Indicates that an invalid value was used as a mock spec.""" 

54 

55 

56_builtins = {name for name in dir(builtins) if not name.startswith('_')} 

57 

58FILTER_DIR = True 

59 

60# Workaround for issue #12370 

61# Without this, the __class__ properties wouldn't be set correctly 

62_safe_super = super 

63 

64def _is_async_obj(obj): 

65 if _is_instance_mock(obj) and not isinstance(obj, AsyncMock): 

66 return False 

67 if hasattr(obj, '__func__'): 

68 obj = getattr(obj, '__func__') 

69 return iscoroutinefunction(obj) or inspect.isawaitable(obj) 

70 

71 

72def _is_async_func(func): 

73 if getattr(func, '__code__', None): 

74 return iscoroutinefunction(func) 

75 else: 

76 return False 

77 

78 

79def _is_instance_mock(obj): 

80 # can't use isinstance on Mock objects because they override __class__ 

81 # The base class for all mocks is NonCallableMock 

82 return issubclass(type(obj), NonCallableMock) 

83 

84 

85def _is_exception(obj): 

86 return ( 

87 isinstance(obj, BaseException) or 

88 isinstance(obj, type) and issubclass(obj, BaseException) 

89 ) 

90 

91 

92def _extract_mock(obj): 

93 # Autospecced functions will return a FunctionType with "mock" attribute 

94 # which is the actual mock object that needs to be used. 

95 if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'): 

96 return obj.mock 

97 else: 

98 return obj 

99 

100 

101def _get_signature_object(func, as_instance, eat_self): 

102 """ 

103 Given an arbitrary, possibly callable object, try to create a suitable 

104 signature object. 

105 Return a (reduced func, signature) tuple, or None. 

106 """ 

107 if isinstance(func, type) and not as_instance: 

108 # If it's a type and should be modelled as a type, use __init__. 

109 func = func.__init__ 

110 # Skip the `self` argument in __init__ 

111 eat_self = True 

112 elif isinstance(func, (classmethod, staticmethod)): 

113 if isinstance(func, classmethod): 

114 # Skip the `cls` argument of a class method 

115 eat_self = True 

116 # Use the original decorated method to extract the correct function signature 

117 func = func.__func__ 

118 elif not isinstance(func, FunctionTypes): 

119 # If we really want to model an instance of the passed type, 

120 # __call__ should be looked up, not __init__. 

121 try: 

122 func = func.__call__ 

123 except AttributeError: 

124 return None 

125 if eat_self: 

126 sig_func = partial(func, None) 

127 else: 

128 sig_func = func 

129 try: 

130 return func, inspect.signature(sig_func) 

131 except ValueError: 

132 # Certain callable types are not supported by inspect.signature() 

133 return None 

134 

135 

136def _check_signature(func, mock, skipfirst, instance=False): 

137 sig = _get_signature_object(func, instance, skipfirst) 

138 if sig is None: 

139 return 

140 func, sig = sig 

141 def checksig(_mock_self, *args, **kwargs): 

142 sig.bind(*args, **kwargs) 

143 _copy_func_details(func, checksig) 

144 type(mock)._mock_check_sig = checksig 

145 type(mock).__signature__ = sig 

146 

147 

148def _copy_func_details(func, funcopy): 

149 # we explicitly don't copy func.__dict__ into this copy as it would 

150 # expose original attributes that should be mocked 

151 for attribute in ( 

152 '__name__', '__doc__', '__text_signature__', 

153 '__module__', '__defaults__', '__kwdefaults__', 

154 ): 

155 try: 

156 setattr(funcopy, attribute, getattr(func, attribute)) 

157 except AttributeError: 

158 pass 

159 

160 

161def _callable(obj): 

162 if isinstance(obj, type): 

163 return True 

164 if isinstance(obj, (staticmethod, classmethod, MethodType)): 

165 return _callable(obj.__func__) 

166 if getattr(obj, '__call__', None) is not None: 

167 return True 

168 return False 

169 

170 

171def _is_list(obj): 

172 # checks for list or tuples 

173 # XXXX badly named! 

174 return type(obj) in (list, tuple) 

175 

176 

177def _instance_callable(obj): 

178 """Given an object, return True if the object is callable. 

179 For classes, return True if instances would be callable.""" 

180 if not isinstance(obj, type): 

181 # already an instance 

182 return getattr(obj, '__call__', None) is not None 

183 

184 # *could* be broken by a class overriding __mro__ or __dict__ via 

185 # a metaclass 

186 for base in (obj,) + obj.__mro__: 

187 if base.__dict__.get('__call__') is not None: 

188 return True 

189 return False 

190 

191 

192def _set_signature(mock, original, instance=False): 

193 # creates a function with signature (*args, **kwargs) that delegates to a 

194 # mock. It still does signature checking by calling a lambda with the same 

195 # signature as the original. 

196 

197 skipfirst = isinstance(original, type) 

198 result = _get_signature_object(original, instance, skipfirst) 

199 if result is None: 

200 return mock 

201 func, sig = result 

202 def checksig(*args, **kwargs): 

203 sig.bind(*args, **kwargs) 

204 _copy_func_details(func, checksig) 

205 

206 name = original.__name__ 

207 if not name.isidentifier(): 

208 name = 'funcopy' 

209 context = {'_checksig_': checksig, 'mock': mock} 

210 src = """def %s(*args, **kwargs): 

211 _checksig_(*args, **kwargs) 

212 return mock(*args, **kwargs)""" % name 

213 exec (src, context) 

214 funcopy = context[name] 

215 _setup_func(funcopy, mock, sig) 

216 return funcopy 

217 

218def _set_async_signature(mock, original, instance=False, is_async_mock=False): 

219 # creates an async function with signature (*args, **kwargs) that delegates to a 

220 # mock. It still does signature checking by calling a lambda with the same 

221 # signature as the original. 

222 

223 skipfirst = isinstance(original, type) 

224 func, sig = _get_signature_object(original, instance, skipfirst) 

225 def checksig(*args, **kwargs): 

226 sig.bind(*args, **kwargs) 

227 _copy_func_details(func, checksig) 

228 

229 name = original.__name__ 

230 context = {'_checksig_': checksig, 'mock': mock} 

231 src = """async def %s(*args, **kwargs): 

232 _checksig_(*args, **kwargs) 

233 return await mock(*args, **kwargs)""" % name 

234 exec (src, context) 

235 funcopy = context[name] 

236 _setup_func(funcopy, mock, sig) 

237 _setup_async_mock(funcopy) 

238 return funcopy 

239 

240 

241def _setup_func(funcopy, mock, sig): 

242 funcopy.mock = mock 

243 

244 def assert_called_with(*args, **kwargs): 

245 return mock.assert_called_with(*args, **kwargs) 

246 def assert_called(*args, **kwargs): 

247 return mock.assert_called(*args, **kwargs) 

248 def assert_not_called(*args, **kwargs): 

249 return mock.assert_not_called(*args, **kwargs) 

250 def assert_called_once(*args, **kwargs): 

251 return mock.assert_called_once(*args, **kwargs) 

252 def assert_called_once_with(*args, **kwargs): 

253 return mock.assert_called_once_with(*args, **kwargs) 

254 def assert_has_calls(*args, **kwargs): 

255 return mock.assert_has_calls(*args, **kwargs) 

256 def assert_any_call(*args, **kwargs): 

257 return mock.assert_any_call(*args, **kwargs) 

258 def reset_mock(): 

259 funcopy.method_calls = _CallList() 

260 funcopy.mock_calls = _CallList() 

261 mock.reset_mock() 

262 ret = funcopy.return_value 

263 if _is_instance_mock(ret) and not ret is mock: 

264 ret.reset_mock() 

265 

266 funcopy.called = False 

267 funcopy.call_count = 0 

268 funcopy.call_args = None 

269 funcopy.call_args_list = _CallList() 

270 funcopy.method_calls = _CallList() 

271 funcopy.mock_calls = _CallList() 

272 

273 funcopy.return_value = mock.return_value 

274 funcopy.side_effect = mock.side_effect 

275 funcopy._mock_children = mock._mock_children 

276 

277 funcopy.assert_called_with = assert_called_with 

278 funcopy.assert_called_once_with = assert_called_once_with 

279 funcopy.assert_has_calls = assert_has_calls 

280 funcopy.assert_any_call = assert_any_call 

281 funcopy.reset_mock = reset_mock 

282 funcopy.assert_called = assert_called 

283 funcopy.assert_not_called = assert_not_called 

284 funcopy.assert_called_once = assert_called_once 

285 funcopy.__signature__ = sig 

286 

287 mock._mock_delegate = funcopy 

288 

289 

290def _setup_async_mock(mock): 

291 mock._is_coroutine = asyncio.coroutines._is_coroutine 

292 mock.await_count = 0 

293 mock.await_args = None 

294 mock.await_args_list = _CallList() 

295 

296 # Mock is not configured yet so the attributes are set 

297 # to a function and then the corresponding mock helper function 

298 # is called when the helper is accessed similar to _setup_func. 

299 def wrapper(attr, *args, **kwargs): 

300 return getattr(mock.mock, attr)(*args, **kwargs) 

301 

302 for attribute in ('assert_awaited', 

303 'assert_awaited_once', 

304 'assert_awaited_with', 

305 'assert_awaited_once_with', 

306 'assert_any_await', 

307 'assert_has_awaits', 

308 'assert_not_awaited'): 

309 

310 # setattr(mock, attribute, wrapper) causes late binding 

311 # hence attribute will always be the last value in the loop 

312 # Use partial(wrapper, attribute) to ensure the attribute is bound 

313 # correctly. 

314 setattr(mock, attribute, partial(wrapper, attribute)) 

315 

316 

317def _is_magic(name): 

318 return '__%s__' % name[2:-2] == name 

319 

320 

321class _SentinelObject(object): 

322 "A unique, named, sentinel object." 

323 def __init__(self, name): 

324 self.name = name 

325 

326 def __repr__(self): 

327 return 'sentinel.%s' % self.name 

328 

329 def __reduce__(self): 

330 return 'sentinel.%s' % self.name 

331 

332 

333class _Sentinel(object): 

334 """Access attributes to return a named object, usable as a sentinel.""" 

335 def __init__(self): 

336 self._sentinels = {} 

337 

338 def __getattr__(self, name): 

339 if name == '__bases__': 

340 # Without this help(unittest.mock) raises an exception 

341 raise AttributeError 

342 return self._sentinels.setdefault(name, _SentinelObject(name)) 

343 

344 def __reduce__(self): 

345 return 'sentinel' 

346 

347 

348sentinel = _Sentinel() 

349 

350DEFAULT = sentinel.DEFAULT 

351_missing = sentinel.MISSING 

352_deleted = sentinel.DELETED 

353 

354 

355_allowed_names = { 

356 'return_value', '_mock_return_value', 'side_effect', 

357 '_mock_side_effect', '_mock_parent', '_mock_new_parent', 

358 '_mock_name', '_mock_new_name' 

359} 

360 

361 

362def _delegating_property(name): 

363 _allowed_names.add(name) 

364 _the_name = '_mock_' + name 

365 def _get(self, name=name, _the_name=_the_name): 

366 sig = self._mock_delegate 

367 if sig is None: 

368 return getattr(self, _the_name) 

369 return getattr(sig, name) 

370 def _set(self, value, name=name, _the_name=_the_name): 

371 sig = self._mock_delegate 

372 if sig is None: 

373 self.__dict__[_the_name] = value 

374 else: 

375 setattr(sig, name, value) 

376 

377 return property(_get, _set) 

378 

379 

380 

381class _CallList(list): 

382 

383 def __contains__(self, value): 

384 if not isinstance(value, list): 

385 return list.__contains__(self, value) 

386 len_value = len(value) 

387 len_self = len(self) 

388 if len_value > len_self: 

389 return False 

390 

391 for i in range(0, len_self - len_value + 1): 

392 sub_list = self[i:i+len_value] 

393 if sub_list == value: 

394 return True 

395 return False 

396 

397 def __repr__(self): 

398 return pprint.pformat(list(self)) 

399 

400 

401def _check_and_set_parent(parent, value, name, new_name): 

402 value = _extract_mock(value) 

403 

404 if not _is_instance_mock(value): 

405 return False 

406 if ((value._mock_name or value._mock_new_name) or 

407 (value._mock_parent is not None) or 

408 (value._mock_new_parent is not None)): 

409 return False 

410 

411 _parent = parent 

412 while _parent is not None: 

413 # setting a mock (value) as a child or return value of itself 

414 # should not modify the mock 

415 if _parent is value: 

416 return False 

417 _parent = _parent._mock_new_parent 

418 

419 if new_name: 

420 value._mock_new_parent = parent 

421 value._mock_new_name = new_name 

422 if name: 

423 value._mock_parent = parent 

424 value._mock_name = name 

425 return True 

426 

427# Internal class to identify if we wrapped an iterator object or not. 

428class _MockIter(object): 

429 def __init__(self, obj): 

430 self.obj = iter(obj) 

431 def __next__(self): 

432 return next(self.obj) 

433 

434class Base(object): 

435 _mock_return_value = DEFAULT 

436 _mock_side_effect = None 

437 def __init__(self, *args, **kwargs): 

438 pass 

439 

440 

441 

442class NonCallableMock(Base): 

443 """A non-callable version of `Mock`""" 

444 

445 # Store a mutex as a class attribute in order to protect concurrent access 

446 # to mock attributes. Using a class attribute allows all NonCallableMock 

447 # instances to share the mutex for simplicity. 

448 # 

449 # See https://github.com/python/cpython/issues/98624 for why this is 

450 # necessary. 

451 _lock = RLock() 

452 

453 def __new__( 

454 cls, spec=None, wraps=None, name=None, spec_set=None, 

455 parent=None, _spec_state=None, _new_name='', _new_parent=None, 

456 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs 

457 ): 

458 # every instance has its own class 

459 # so we can create magic methods on the 

460 # class without stomping on other mocks 

461 bases = (cls,) 

462 if not issubclass(cls, AsyncMockMixin): 

463 # Check if spec is an async object or function 

464 spec_arg = spec_set or spec 

465 if spec_arg is not None and _is_async_obj(spec_arg): 

466 bases = (AsyncMockMixin, cls) 

467 new = type(cls.__name__, bases, {'__doc__': cls.__doc__}) 

468 instance = _safe_super(NonCallableMock, cls).__new__(new) 

469 return instance 

470 

471 

472 def __init__( 

473 self, spec=None, wraps=None, name=None, spec_set=None, 

474 parent=None, _spec_state=None, _new_name='', _new_parent=None, 

475 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs 

476 ): 

477 if _new_parent is None: 

478 _new_parent = parent 

479 

480 __dict__ = self.__dict__ 

481 __dict__['_mock_parent'] = parent 

482 __dict__['_mock_name'] = name 

483 __dict__['_mock_new_name'] = _new_name 

484 __dict__['_mock_new_parent'] = _new_parent 

485 __dict__['_mock_sealed'] = False 

486 

487 if spec_set is not None: 

488 spec = spec_set 

489 spec_set = True 

490 if _eat_self is None: 

491 _eat_self = parent is not None 

492 

493 self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self) 

494 

495 __dict__['_mock_children'] = {} 

496 __dict__['_mock_wraps'] = wraps 

497 __dict__['_mock_delegate'] = None 

498 

499 __dict__['_mock_called'] = False 

500 __dict__['_mock_call_args'] = None 

501 __dict__['_mock_call_count'] = 0 

502 __dict__['_mock_call_args_list'] = _CallList() 

503 __dict__['_mock_mock_calls'] = _CallList() 

504 

505 __dict__['method_calls'] = _CallList() 

506 __dict__['_mock_unsafe'] = unsafe 

507 

508 if kwargs: 

509 self.configure_mock(**kwargs) 

510 

511 _safe_super(NonCallableMock, self).__init__( 

512 spec, wraps, name, spec_set, parent, 

513 _spec_state 

514 ) 

515 

516 

517 def attach_mock(self, mock, attribute): 

518 """ 

519 Attach a mock as an attribute of this one, replacing its name and 

520 parent. Calls to the attached mock will be recorded in the 

521 `method_calls` and `mock_calls` attributes of this one.""" 

522 inner_mock = _extract_mock(mock) 

523 

524 inner_mock._mock_parent = None 

525 inner_mock._mock_new_parent = None 

526 inner_mock._mock_name = '' 

527 inner_mock._mock_new_name = None 

528 

529 setattr(self, attribute, mock) 

530 

531 

532 def mock_add_spec(self, spec, spec_set=False): 

533 """Add a spec to a mock. `spec` can either be an object or a 

534 list of strings. Only attributes on the `spec` can be fetched as 

535 attributes from the mock. 

536 

537 If `spec_set` is True then only attributes on the spec can be set.""" 

538 self._mock_add_spec(spec, spec_set) 

539 

540 

541 def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False, 

542 _eat_self=False): 

543 if _is_instance_mock(spec): 

544 raise InvalidSpecError(f'Cannot spec a Mock object. [object={spec!r}]') 

545 

546 _spec_class = None 

547 _spec_signature = None 

548 _spec_asyncs = [] 

549 

550 if spec is not None and not _is_list(spec): 

551 if isinstance(spec, type): 

552 _spec_class = spec 

553 else: 

554 _spec_class = type(spec) 

555 res = _get_signature_object(spec, 

556 _spec_as_instance, _eat_self) 

557 _spec_signature = res and res[1] 

558 

559 spec_list = dir(spec) 

560 

561 for attr in spec_list: 

562 static_attr = inspect.getattr_static(spec, attr, None) 

563 unwrapped_attr = static_attr 

564 try: 

565 unwrapped_attr = inspect.unwrap(unwrapped_attr) 

566 except ValueError: 

567 pass 

568 if iscoroutinefunction(unwrapped_attr): 

569 _spec_asyncs.append(attr) 

570 

571 spec = spec_list 

572 

573 __dict__ = self.__dict__ 

574 __dict__['_spec_class'] = _spec_class 

575 __dict__['_spec_set'] = spec_set 

576 __dict__['_spec_signature'] = _spec_signature 

577 __dict__['_mock_methods'] = spec 

578 __dict__['_spec_asyncs'] = _spec_asyncs 

579 

580 def __get_return_value(self): 

581 ret = self._mock_return_value 

582 if self._mock_delegate is not None: 

583 ret = self._mock_delegate.return_value 

584 

585 if ret is DEFAULT and self._mock_wraps is None: 

586 ret = self._get_child_mock( 

587 _new_parent=self, _new_name='()' 

588 ) 

589 self.return_value = ret 

590 return ret 

591 

592 

593 def __set_return_value(self, value): 

594 if self._mock_delegate is not None: 

595 self._mock_delegate.return_value = value 

596 else: 

597 self._mock_return_value = value 

598 _check_and_set_parent(self, value, None, '()') 

599 

600 __return_value_doc = "The value to be returned when the mock is called." 

601 return_value = property(__get_return_value, __set_return_value, 

602 __return_value_doc) 

603 

604 

605 @property 

606 def __class__(self): 

607 if self._spec_class is None: 

608 return type(self) 

609 return self._spec_class 

610 

611 called = _delegating_property('called') 

612 call_count = _delegating_property('call_count') 

613 call_args = _delegating_property('call_args') 

614 call_args_list = _delegating_property('call_args_list') 

615 mock_calls = _delegating_property('mock_calls') 

616 

617 

618 def __get_side_effect(self): 

619 delegated = self._mock_delegate 

620 if delegated is None: 

621 return self._mock_side_effect 

622 sf = delegated.side_effect 

623 if (sf is not None and not callable(sf) 

624 and not isinstance(sf, _MockIter) and not _is_exception(sf)): 

625 sf = _MockIter(sf) 

626 delegated.side_effect = sf 

627 return sf 

628 

629 def __set_side_effect(self, value): 

630 value = _try_iter(value) 

631 delegated = self._mock_delegate 

632 if delegated is None: 

633 self._mock_side_effect = value 

634 else: 

635 delegated.side_effect = value 

636 

637 side_effect = property(__get_side_effect, __set_side_effect) 

638 

639 

640 def reset_mock(self, visited=None, *, 

641 return_value: bool = False, 

642 side_effect: bool = False): 

643 "Restore the mock object to its initial state." 

644 if visited is None: 

645 visited = [] 

646 if id(self) in visited: 

647 return 

648 visited.append(id(self)) 

649 

650 self.called = False 

651 self.call_args = None 

652 self.call_count = 0 

653 self.mock_calls = _CallList() 

654 self.call_args_list = _CallList() 

655 self.method_calls = _CallList() 

656 

657 if return_value: 

658 self._mock_return_value = DEFAULT 

659 if side_effect: 

660 self._mock_side_effect = None 

661 

662 for child in self._mock_children.values(): 

663 if isinstance(child, _SpecState) or child is _deleted: 

664 continue 

665 child.reset_mock(visited, return_value=return_value, side_effect=side_effect) 

666 

667 ret = self._mock_return_value 

668 if _is_instance_mock(ret) and ret is not self: 

669 ret.reset_mock(visited) 

670 

671 

672 def configure_mock(self, **kwargs): 

673 """Set attributes on the mock through keyword arguments. 

674 

675 Attributes plus return values and side effects can be set on child 

676 mocks using standard dot notation and unpacking a dictionary in the 

677 method call: 

678 

679 >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError} 

680 >>> mock.configure_mock(**attrs)""" 

681 for arg, val in sorted(kwargs.items(), 

682 # we sort on the number of dots so that 

683 # attributes are set before we set attributes on 

684 # attributes 

685 key=lambda entry: entry[0].count('.')): 

686 args = arg.split('.') 

687 final = args.pop() 

688 obj = self 

689 for entry in args: 

690 obj = getattr(obj, entry) 

691 setattr(obj, final, val) 

692 

693 

694 def __getattr__(self, name): 

695 if name in {'_mock_methods', '_mock_unsafe'}: 

696 raise AttributeError(name) 

697 elif self._mock_methods is not None: 

698 if name not in self._mock_methods or name in _all_magics: 

699 raise AttributeError("Mock object has no attribute %r" % name) 

700 elif _is_magic(name): 

701 raise AttributeError(name) 

702 if not self._mock_unsafe and (not self._mock_methods or name not in self._mock_methods): 

703 if name.startswith(('assert', 'assret', 'asert', 'aseert', 'assrt')) or name in _ATTRIB_DENY_LIST: 

704 raise AttributeError( 

705 f"{name!r} is not a valid assertion. Use a spec " 

706 f"for the mock if {name!r} is meant to be an attribute.") 

707 

708 with NonCallableMock._lock: 

709 result = self._mock_children.get(name) 

710 if result is _deleted: 

711 raise AttributeError(name) 

712 elif result is None: 

713 wraps = None 

714 if self._mock_wraps is not None: 

715 # XXXX should we get the attribute without triggering code 

716 # execution? 

717 wraps = getattr(self._mock_wraps, name) 

718 

719 result = self._get_child_mock( 

720 parent=self, name=name, wraps=wraps, _new_name=name, 

721 _new_parent=self 

722 ) 

723 self._mock_children[name] = result 

724 

725 elif isinstance(result, _SpecState): 

726 try: 

727 result = create_autospec( 

728 result.spec, result.spec_set, result.instance, 

729 result.parent, result.name 

730 ) 

731 except InvalidSpecError: 

732 target_name = self.__dict__['_mock_name'] or self 

733 raise InvalidSpecError( 

734 f'Cannot autospec attr {name!r} from target ' 

735 f'{target_name!r} as it has already been mocked out. ' 

736 f'[target={self!r}, attr={result.spec!r}]') 

737 self._mock_children[name] = result 

738 

739 return result 

740 

741 

742 def _extract_mock_name(self): 

743 _name_list = [self._mock_new_name] 

744 _parent = self._mock_new_parent 

745 last = self 

746 

747 dot = '.' 

748 if _name_list == ['()']: 

749 dot = '' 

750 

751 while _parent is not None: 

752 last = _parent 

753 

754 _name_list.append(_parent._mock_new_name + dot) 

755 dot = '.' 

756 if _parent._mock_new_name == '()': 

757 dot = '' 

758 

759 _parent = _parent._mock_new_parent 

760 

761 _name_list = list(reversed(_name_list)) 

762 _first = last._mock_name or 'mock' 

763 if len(_name_list) > 1: 

764 if _name_list[1] not in ('()', '().'): 

765 _first += '.' 

766 _name_list[0] = _first 

767 return ''.join(_name_list) 

768 

769 def __repr__(self): 

770 name = self._extract_mock_name() 

771 

772 name_string = '' 

773 if name not in ('mock', 'mock.'): 

774 name_string = ' name=%r' % name 

775 

776 spec_string = '' 

777 if self._spec_class is not None: 

778 spec_string = ' spec=%r' 

779 if self._spec_set: 

780 spec_string = ' spec_set=%r' 

781 spec_string = spec_string % self._spec_class.__name__ 

782 return "<%s%s%s id='%s'>" % ( 

783 type(self).__name__, 

784 name_string, 

785 spec_string, 

786 id(self) 

787 ) 

788 

789 

790 def __dir__(self): 

791 """Filter the output of `dir(mock)` to only useful members.""" 

792 if not FILTER_DIR: 

793 return object.__dir__(self) 

794 

795 extras = self._mock_methods or [] 

796 from_type = dir(type(self)) 

797 from_dict = list(self.__dict__) 

798 from_child_mocks = [ 

799 m_name for m_name, m_value in self._mock_children.items() 

800 if m_value is not _deleted] 

801 

802 from_type = [e for e in from_type if not e.startswith('_')] 

803 from_dict = [e for e in from_dict if not e.startswith('_') or 

804 _is_magic(e)] 

805 return sorted(set(extras + from_type + from_dict + from_child_mocks)) 

806 

807 

808 def __setattr__(self, name, value): 

809 if name in _allowed_names: 

810 # property setters go through here 

811 return object.__setattr__(self, name, value) 

812 elif (self._spec_set and self._mock_methods is not None and 

813 name not in self._mock_methods and 

814 name not in self.__dict__): 

815 raise AttributeError("Mock object has no attribute '%s'" % name) 

816 elif name in _unsupported_magics: 

817 msg = 'Attempting to set unsupported magic method %r.' % name 

818 raise AttributeError(msg) 

819 elif name in _all_magics: 

820 if self._mock_methods is not None and name not in self._mock_methods: 

821 raise AttributeError("Mock object has no attribute '%s'" % name) 

822 

823 if not _is_instance_mock(value): 

824 setattr(type(self), name, _get_method(name, value)) 

825 original = value 

826 value = lambda *args, **kw: original(self, *args, **kw) 

827 else: 

828 # only set _new_name and not name so that mock_calls is tracked 

829 # but not method calls 

830 _check_and_set_parent(self, value, None, name) 

831 setattr(type(self), name, value) 

832 self._mock_children[name] = value 

833 elif name == '__class__': 

834 self._spec_class = value 

835 return 

836 else: 

837 if _check_and_set_parent(self, value, name, name): 

838 self._mock_children[name] = value 

839 

840 if self._mock_sealed and not hasattr(self, name): 

841 mock_name = f'{self._extract_mock_name()}.{name}' 

842 raise AttributeError(f'Cannot set {mock_name}') 

843 

844 if isinstance(value, PropertyMock): 

845 self.__dict__[name] = value 

846 return 

847 return object.__setattr__(self, name, value) 

848 

849 

850 def __delattr__(self, name): 

851 if name in _all_magics and name in type(self).__dict__: 

852 delattr(type(self), name) 

853 if name not in self.__dict__: 

854 # for magic methods that are still MagicProxy objects and 

855 # not set on the instance itself 

856 return 

857 

858 obj = self._mock_children.get(name, _missing) 

859 if name in self.__dict__: 

860 _safe_super(NonCallableMock, self).__delattr__(name) 

861 elif obj is _deleted: 

862 raise AttributeError(name) 

863 if obj is not _missing: 

864 del self._mock_children[name] 

865 self._mock_children[name] = _deleted 

866 

867 

868 def _format_mock_call_signature(self, args, kwargs): 

869 name = self._mock_name or 'mock' 

870 return _format_call_signature(name, args, kwargs) 

871 

872 

873 def _format_mock_failure_message(self, args, kwargs, action='call'): 

874 message = 'expected %s not found.\nExpected: %s\n Actual: %s' 

875 expected_string = self._format_mock_call_signature(args, kwargs) 

876 call_args = self.call_args 

877 actual_string = self._format_mock_call_signature(*call_args) 

878 return message % (action, expected_string, actual_string) 

879 

880 

881 def _get_call_signature_from_name(self, name): 

882 """ 

883 * If call objects are asserted against a method/function like obj.meth1 

884 then there could be no name for the call object to lookup. Hence just 

885 return the spec_signature of the method/function being asserted against. 

886 * If the name is not empty then remove () and split by '.' to get 

887 list of names to iterate through the children until a potential 

888 match is found. A child mock is created only during attribute access 

889 so if we get a _SpecState then no attributes of the spec were accessed 

890 and can be safely exited. 

891 """ 

892 if not name: 

893 return self._spec_signature 

894 

895 sig = None 

896 names = name.replace('()', '').split('.') 

897 children = self._mock_children 

898 

899 for name in names: 

900 child = children.get(name) 

901 if child is None or isinstance(child, _SpecState): 

902 break 

903 else: 

904 # If an autospecced object is attached using attach_mock the 

905 # child would be a function with mock object as attribute from 

906 # which signature has to be derived. 

907 child = _extract_mock(child) 

908 children = child._mock_children 

909 sig = child._spec_signature 

910 

911 return sig 

912 

913 

914 def _call_matcher(self, _call): 

915 """ 

916 Given a call (or simply an (args, kwargs) tuple), return a 

917 comparison key suitable for matching with other calls. 

918 This is a best effort method which relies on the spec's signature, 

919 if available, or falls back on the arguments themselves. 

920 """ 

921 

922 if isinstance(_call, tuple) and len(_call) > 2: 

923 sig = self._get_call_signature_from_name(_call[0]) 

924 else: 

925 sig = self._spec_signature 

926 

927 if sig is not None: 

928 if len(_call) == 2: 

929 name = '' 

930 args, kwargs = _call 

931 else: 

932 name, args, kwargs = _call 

933 try: 

934 bound_call = sig.bind(*args, **kwargs) 

935 return call(name, bound_call.args, bound_call.kwargs) 

936 except TypeError as e: 

937 return e.with_traceback(None) 

938 else: 

939 return _call 

940 

941 def assert_not_called(_mock_self): 

942 """assert that the mock was never called. 

943 """ 

944 self = _mock_self 

945 if self.call_count != 0: 

946 msg = ("Expected '%s' to not have been called. Called %s times.%s" 

947 % (self._mock_name or 'mock', 

948 self.call_count, 

949 self._calls_repr())) 

950 raise AssertionError(msg) 

951 

952 def assert_called(_mock_self): 

953 """assert that the mock was called at least once 

954 """ 

955 self = _mock_self 

956 if self.call_count == 0: 

957 msg = ("Expected '%s' to have been called." % 

958 (self._mock_name or 'mock')) 

959 raise AssertionError(msg) 

960 

961 def assert_called_once(_mock_self): 

962 """assert that the mock was called only once. 

963 """ 

964 self = _mock_self 

965 if not self.call_count == 1: 

966 msg = ("Expected '%s' to have been called once. Called %s times.%s" 

967 % (self._mock_name or 'mock', 

968 self.call_count, 

969 self._calls_repr())) 

970 raise AssertionError(msg) 

971 

972 def assert_called_with(_mock_self, *args, **kwargs): 

973 """assert that the last call was made with the specified arguments. 

974 

975 Raises an AssertionError if the args and keyword args passed in are 

976 different to the last call to the mock.""" 

977 self = _mock_self 

978 if self.call_args is None: 

979 expected = self._format_mock_call_signature(args, kwargs) 

980 actual = 'not called.' 

981 error_message = ('expected call not found.\nExpected: %s\n Actual: %s' 

982 % (expected, actual)) 

983 raise AssertionError(error_message) 

984 

985 def _error_message(): 

986 msg = self._format_mock_failure_message(args, kwargs) 

987 return msg 

988 expected = self._call_matcher(_Call((args, kwargs), two=True)) 

989 actual = self._call_matcher(self.call_args) 

990 if actual != expected: 

991 cause = expected if isinstance(expected, Exception) else None 

992 raise AssertionError(_error_message()) from cause 

993 

994 

995 def assert_called_once_with(_mock_self, *args, **kwargs): 

996 """assert that the mock was called exactly once and that that call was 

997 with the specified arguments.""" 

998 self = _mock_self 

999 if not self.call_count == 1: 

1000 msg = ("Expected '%s' to be called once. Called %s times.%s" 

1001 % (self._mock_name or 'mock', 

1002 self.call_count, 

1003 self._calls_repr())) 

1004 raise AssertionError(msg) 

1005 return self.assert_called_with(*args, **kwargs) 

1006 

1007 

1008 def assert_has_calls(self, calls, any_order=False): 

1009 """assert the mock has been called with the specified calls. 

1010 The `mock_calls` list is checked for the calls. 

1011 

1012 If `any_order` is False (the default) then the calls must be 

1013 sequential. There can be extra calls before or after the 

1014 specified calls. 

1015 

1016 If `any_order` is True then the calls can be in any order, but 

1017 they must all appear in `mock_calls`.""" 

1018 expected = [self._call_matcher(c) for c in calls] 

1019 cause = next((e for e in expected if isinstance(e, Exception)), None) 

1020 all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls) 

1021 if not any_order: 

1022 if expected not in all_calls: 

1023 if cause is None: 

1024 problem = 'Calls not found.' 

1025 else: 

1026 problem = ('Error processing expected calls.\n' 

1027 'Errors: {}').format( 

1028 [e if isinstance(e, Exception) else None 

1029 for e in expected]) 

1030 raise AssertionError( 

1031 f'{problem}\n' 

1032 f'Expected: {_CallList(calls)}\n' 

1033 f' Actual: {safe_repr(self.mock_calls)}' 

1034 ) from cause 

1035 return 

1036 

1037 all_calls = list(all_calls) 

1038 

1039 not_found = [] 

1040 for kall in expected: 

1041 try: 

1042 all_calls.remove(kall) 

1043 except ValueError: 

1044 not_found.append(kall) 

1045 if not_found: 

1046 raise AssertionError( 

1047 '%r does not contain all of %r in its call list, ' 

1048 'found %r instead' % (self._mock_name or 'mock', 

1049 tuple(not_found), all_calls) 

1050 ) from cause 

1051 

1052 

1053 def assert_any_call(self, *args, **kwargs): 

1054 """assert the mock has been called with the specified arguments. 

1055 

1056 The assert passes if the mock has *ever* been called, unlike 

1057 `assert_called_with` and `assert_called_once_with` that only pass if 

1058 the call is the most recent one.""" 

1059 expected = self._call_matcher(_Call((args, kwargs), two=True)) 

1060 cause = expected if isinstance(expected, Exception) else None 

1061 actual = [self._call_matcher(c) for c in self.call_args_list] 

1062 if cause or expected not in _AnyComparer(actual): 

1063 expected_string = self._format_mock_call_signature(args, kwargs) 

1064 raise AssertionError( 

1065 '%s call not found' % expected_string 

1066 ) from cause 

1067 

1068 

1069 def _get_child_mock(self, **kw): 

1070 """Create the child mocks for attributes and return value. 

1071 By default child mocks will be the same type as the parent. 

1072 Subclasses of Mock may want to override this to customize the way 

1073 child mocks are made. 

1074 

1075 For non-callable mocks the callable variant will be used (rather than 

1076 any custom subclass).""" 

1077 if self._mock_sealed: 

1078 attribute = f".{kw['name']}" if "name" in kw else "()" 

1079 mock_name = self._extract_mock_name() + attribute 

1080 raise AttributeError(mock_name) 

1081 

1082 _new_name = kw.get("_new_name") 

1083 if _new_name in self.__dict__['_spec_asyncs']: 

1084 return AsyncMock(**kw) 

1085 

1086 _type = type(self) 

1087 if issubclass(_type, MagicMock) and _new_name in _async_method_magics: 

1088 # Any asynchronous magic becomes an AsyncMock 

1089 klass = AsyncMock 

1090 elif issubclass(_type, AsyncMockMixin): 

1091 if (_new_name in _all_sync_magics or 

1092 self._mock_methods and _new_name in self._mock_methods): 

1093 # Any synchronous method on AsyncMock becomes a MagicMock 

1094 klass = MagicMock 

1095 else: 

1096 klass = AsyncMock 

1097 elif not issubclass(_type, CallableMixin): 

1098 if issubclass(_type, NonCallableMagicMock): 

1099 klass = MagicMock 

1100 elif issubclass(_type, NonCallableMock): 

1101 klass = Mock 

1102 else: 

1103 klass = _type.__mro__[1] 

1104 return klass(**kw) 

1105 

1106 

1107 def _calls_repr(self): 

1108 """Renders self.mock_calls as a string. 

1109 

1110 Example: "\nCalls: [call(1), call(2)]." 

1111 

1112 If self.mock_calls is empty, an empty string is returned. The 

1113 output will be truncated if very long. 

1114 """ 

1115 if not self.mock_calls: 

1116 return "" 

1117 return f"\nCalls: {safe_repr(self.mock_calls)}." 

1118 

1119 

1120try: 

1121 removeprefix = str.removeprefix 

1122except AttributeError: 

1123 # Py 3.8 and earlier: 

1124 def removeprefix(name, prefix): 

1125 return name[len(prefix):] 

1126 

1127# Denylist for forbidden attribute names in safe mode 

1128_ATTRIB_DENY_LIST = frozenset({ 

1129 removeprefix(name, "assert_") 

1130 for name in dir(NonCallableMock) 

1131 if name.startswith("assert_") 

1132}) 

1133 

1134 

1135class _AnyComparer(list): 

1136 """A list which checks if it contains a call which may have an 

1137 argument of ANY, flipping the components of item and self from 

1138 their traditional locations so that ANY is guaranteed to be on 

1139 the left.""" 

1140 def __contains__(self, item): 

1141 for _call in self: 

1142 assert len(item) == len(_call) 

1143 if all([ 

1144 expected == actual 

1145 for expected, actual in zip(item, _call) 

1146 ]): 

1147 return True 

1148 return False 

1149 

1150 

1151def _try_iter(obj): 

1152 if obj is None: 

1153 return obj 

1154 if _is_exception(obj): 

1155 return obj 

1156 if _callable(obj): 

1157 return obj 

1158 try: 

1159 return iter(obj) 

1160 except TypeError: 

1161 # XXXX backwards compatibility 

1162 # but this will blow up on first call - so maybe we should fail early? 

1163 return obj 

1164 

1165 

1166class CallableMixin(Base): 

1167 

1168 def __init__(self, spec=None, side_effect=None, return_value=DEFAULT, 

1169 wraps=None, name=None, spec_set=None, parent=None, 

1170 _spec_state=None, _new_name='', _new_parent=None, **kwargs): 

1171 self.__dict__['_mock_return_value'] = return_value 

1172 _safe_super(CallableMixin, self).__init__( 

1173 spec, wraps, name, spec_set, parent, 

1174 _spec_state, _new_name, _new_parent, **kwargs 

1175 ) 

1176 

1177 self.side_effect = side_effect 

1178 

1179 

1180 def _mock_check_sig(self, *args, **kwargs): 

1181 # stub method that can be replaced with one with a specific signature 

1182 pass 

1183 

1184 

1185 def __call__(_mock_self, *args, **kwargs): 

1186 # can't use self in-case a function / method we are mocking uses self 

1187 # in the signature 

1188 _mock_self._mock_check_sig(*args, **kwargs) 

1189 _mock_self._increment_mock_call(*args, **kwargs) 

1190 return _mock_self._mock_call(*args, **kwargs) 

1191 

1192 

1193 def _mock_call(_mock_self, *args, **kwargs): 

1194 return _mock_self._execute_mock_call(*args, **kwargs) 

1195 

1196 def _increment_mock_call(_mock_self, *args, **kwargs): 

1197 self = _mock_self 

1198 self.called = True 

1199 self.call_count += 1 

1200 

1201 # handle call_args 

1202 # needs to be set here so assertions on call arguments pass before 

1203 # execution in the case of awaited calls 

1204 _call = _Call((args, kwargs), two=True) 

1205 self.call_args = _call 

1206 self.call_args_list.append(_call) 

1207 

1208 # initial stuff for method_calls: 

1209 do_method_calls = self._mock_parent is not None 

1210 method_call_name = self._mock_name 

1211 

1212 # initial stuff for mock_calls: 

1213 mock_call_name = self._mock_new_name 

1214 is_a_call = mock_call_name == '()' 

1215 self.mock_calls.append(_Call(('', args, kwargs))) 

1216 

1217 # follow up the chain of mocks: 

1218 _new_parent = self._mock_new_parent 

1219 while _new_parent is not None: 

1220 

1221 # handle method_calls: 

1222 if do_method_calls: 

1223 _new_parent.method_calls.append(_Call((method_call_name, args, kwargs))) 

1224 do_method_calls = _new_parent._mock_parent is not None 

1225 if do_method_calls: 

1226 method_call_name = _new_parent._mock_name + '.' + method_call_name 

1227 

1228 # handle mock_calls: 

1229 this_mock_call = _Call((mock_call_name, args, kwargs)) 

1230 _new_parent.mock_calls.append(this_mock_call) 

1231 

1232 if _new_parent._mock_new_name: 

1233 if is_a_call: 

1234 dot = '' 

1235 else: 

1236 dot = '.' 

1237 is_a_call = _new_parent._mock_new_name == '()' 

1238 mock_call_name = _new_parent._mock_new_name + dot + mock_call_name 

1239 

1240 # follow the parental chain: 

1241 _new_parent = _new_parent._mock_new_parent 

1242 

1243 def _execute_mock_call(_mock_self, *args, **kwargs): 

1244 self = _mock_self 

1245 # separate from _increment_mock_call so that awaited functions are 

1246 # executed separately from their call, also AsyncMock overrides this method 

1247 

1248 effect = self.side_effect 

1249 if effect is not None: 

1250 if _is_exception(effect): 

1251 raise effect 

1252 elif not _callable(effect): 

1253 result = next(effect) 

1254 if _is_exception(result): 

1255 raise result 

1256 else: 

1257 result = effect(*args, **kwargs) 

1258 

1259 if result is not DEFAULT: 

1260 return result 

1261 

1262 if self._mock_return_value is not DEFAULT: 

1263 return self.return_value 

1264 

1265 if self._mock_delegate and self._mock_delegate.return_value is not DEFAULT: 

1266 return self.return_value 

1267 

1268 if self._mock_wraps is not None: 

1269 return self._mock_wraps(*args, **kwargs) 

1270 

1271 return self.return_value 

1272 

1273 

1274 

1275class Mock(CallableMixin, NonCallableMock): 

1276 """ 

1277 Create a new `Mock` object. `Mock` takes several optional arguments 

1278 that specify the behaviour of the Mock object: 

1279 

1280 * `spec`: This can be either a list of strings or an existing object (a 

1281 class or instance) that acts as the specification for the mock object. If 

1282 you pass in an object then a list of strings is formed by calling dir on 

1283 the object (excluding unsupported magic attributes and methods). Accessing 

1284 any attribute not in this list will raise an `AttributeError`. 

1285 

1286 If `spec` is an object (rather than a list of strings) then 

1287 `mock.__class__` returns the class of the spec object. This allows mocks 

1288 to pass `isinstance` tests. 

1289 

1290 * `spec_set`: A stricter variant of `spec`. If used, attempting to *set* 

1291 or get an attribute on the mock that isn't on the object passed as 

1292 `spec_set` will raise an `AttributeError`. 

1293 

1294 * `side_effect`: A function to be called whenever the Mock is called. See 

1295 the `side_effect` attribute. Useful for raising exceptions or 

1296 dynamically changing return values. The function is called with the same 

1297 arguments as the mock, and unless it returns `DEFAULT`, the return 

1298 value of this function is used as the return value. 

1299 

1300 If `side_effect` is an iterable then each call to the mock will return 

1301 the next value from the iterable. If any of the members of the iterable 

1302 are exceptions they will be raised instead of returned. 

1303 

1304 * `return_value`: The value returned when the mock is called. By default 

1305 this is a new Mock (created on first access). See the 

1306 `return_value` attribute. 

1307 

1308 * `unsafe`: By default, accessing any attribute whose name starts with 

1309 *assert*, *assret*, *asert*, *aseert*, or *assrt* raises an AttributeError. 

1310 Additionally, an AttributeError is raised when accessing 

1311 attributes that match the name of an assertion method without the prefix 

1312 `assert_`, e.g. accessing `called_once` instead of `assert_called_once`. 

1313 Passing `unsafe=True` will allow access to these attributes. 

1314 

1315 * `wraps`: Item for the mock object to wrap. If `wraps` is not None then 

1316 calling the Mock will pass the call through to the wrapped object 

1317 (returning the real result). Attribute access on the mock will return a 

1318 Mock object that wraps the corresponding attribute of the wrapped object 

1319 (so attempting to access an attribute that doesn't exist will raise an 

1320 `AttributeError`). 

1321 

1322 If the mock has an explicit `return_value` set then calls are not passed 

1323 to the wrapped object and the `return_value` is returned instead. 

1324 

1325 * `name`: If the mock has a name then it will be used in the repr of the 

1326 mock. This can be useful for debugging. The name is propagated to child 

1327 mocks. 

1328 

1329 Mocks can also be called with arbitrary keyword arguments. These will be 

1330 used to set attributes on the mock after it is created. 

1331 """ 

1332 

1333 

1334def _dot_lookup(thing, comp, import_path): 

1335 try: 

1336 return getattr(thing, comp) 

1337 except AttributeError: 

1338 __import__(import_path) 

1339 return getattr(thing, comp) 

1340 

1341 

1342def _importer(target): 

1343 components = target.split('.') 

1344 import_path = components.pop(0) 

1345 thing = __import__(import_path) 

1346 

1347 for comp in components: 

1348 import_path += ".%s" % comp 

1349 thing = _dot_lookup(thing, comp, import_path) 

1350 return thing 

1351 

1352 

1353# _check_spec_arg_typos takes kwargs from commands like patch and checks that 

1354# they don't contain common misspellings of arguments related to autospeccing. 

1355def _check_spec_arg_typos(kwargs_to_check): 

1356 typos = ("autospect", "auto_spec", "set_spec") 

1357 for typo in typos: 

1358 if typo in kwargs_to_check: 

1359 raise RuntimeError( 

1360 f"{typo!r} might be a typo; use unsafe=True if this is intended" 

1361 ) 

1362 

1363 

1364class _patch(object): 

1365 

1366 attribute_name = None 

1367 _active_patches = [] 

1368 

1369 def __init__( 

1370 self, getter, attribute, new, spec, create, 

1371 spec_set, autospec, new_callable, kwargs, *, unsafe=False 

1372 ): 

1373 if new_callable is not None: 

1374 if new is not DEFAULT: 

1375 raise ValueError( 

1376 "Cannot use 'new' and 'new_callable' together" 

1377 ) 

1378 if autospec is not None: 

1379 raise ValueError( 

1380 "Cannot use 'autospec' and 'new_callable' together" 

1381 ) 

1382 if not unsafe: 

1383 _check_spec_arg_typos(kwargs) 

1384 if _is_instance_mock(spec): 

1385 raise InvalidSpecError( 

1386 f'Cannot spec attr {attribute!r} as the spec ' 

1387 f'has already been mocked out. [spec={spec!r}]') 

1388 if _is_instance_mock(spec_set): 

1389 raise InvalidSpecError( 

1390 f'Cannot spec attr {attribute!r} as the spec_set ' 

1391 f'target has already been mocked out. [spec_set={spec_set!r}]') 

1392 

1393 self.getter = getter 

1394 self.attribute = attribute 

1395 self.new = new 

1396 self.new_callable = new_callable 

1397 self.spec = spec 

1398 self.create = create 

1399 self.has_local = False 

1400 self.spec_set = spec_set 

1401 self.autospec = autospec 

1402 self.kwargs = kwargs 

1403 self.additional_patchers = [] 

1404 self.is_started = False 

1405 

1406 

1407 def copy(self): 

1408 patcher = _patch( 

1409 self.getter, self.attribute, self.new, self.spec, 

1410 self.create, self.spec_set, 

1411 self.autospec, self.new_callable, self.kwargs 

1412 ) 

1413 patcher.attribute_name = self.attribute_name 

1414 patcher.additional_patchers = [ 

1415 p.copy() for p in self.additional_patchers 

1416 ] 

1417 return patcher 

1418 

1419 

1420 def __call__(self, func): 

1421 if isinstance(func, type): 

1422 return self.decorate_class(func) 

1423 if iscoroutinefunction(func): 

1424 return self.decorate_async_callable(func) 

1425 return self.decorate_callable(func) 

1426 

1427 

1428 def decorate_class(self, klass): 

1429 for attr in dir(klass): 

1430 if not attr.startswith(patch.TEST_PREFIX): 

1431 continue 

1432 

1433 attr_value = getattr(klass, attr) 

1434 if not hasattr(attr_value, "__call__"): 

1435 continue 

1436 

1437 patcher = self.copy() 

1438 setattr(klass, attr, patcher(attr_value)) 

1439 return klass 

1440 

1441 

1442 @contextlib.contextmanager 

1443 def decoration_helper(self, patched, args, keywargs): 

1444 extra_args = [] 

1445 with contextlib.ExitStack() as exit_stack: 

1446 for patching in patched.patchings: 

1447 arg = exit_stack.enter_context(patching) 

1448 if patching.attribute_name is not None: 

1449 keywargs.update(arg) 

1450 elif patching.new is DEFAULT: 

1451 extra_args.append(arg) 

1452 

1453 args += tuple(extra_args) 

1454 yield (args, keywargs) 

1455 

1456 

1457 def decorate_callable(self, func): 

1458 # NB. Keep the method in sync with decorate_async_callable() 

1459 if hasattr(func, 'patchings'): 

1460 func.patchings.append(self) 

1461 return func 

1462 

1463 @wraps(func) 

1464 def patched(*args, **keywargs): 

1465 with self.decoration_helper(patched, 

1466 args, 

1467 keywargs) as (newargs, newkeywargs): 

1468 return func(*newargs, **newkeywargs) 

1469 

1470 patched.patchings = [self] 

1471 return patched 

1472 

1473 

1474 def decorate_async_callable(self, func): 

1475 # NB. Keep the method in sync with decorate_callable() 

1476 if hasattr(func, 'patchings'): 

1477 func.patchings.append(self) 

1478 return func 

1479 

1480 @wraps(func) 

1481 async def patched(*args, **keywargs): 

1482 with self.decoration_helper(patched, 

1483 args, 

1484 keywargs) as (newargs, newkeywargs): 

1485 return await func(*newargs, **newkeywargs) 

1486 

1487 patched.patchings = [self] 

1488 return patched 

1489 

1490 

1491 def get_original(self): 

1492 target = self.getter() 

1493 name = self.attribute 

1494 

1495 original = DEFAULT 

1496 local = False 

1497 

1498 try: 

1499 original = target.__dict__[name] 

1500 except (AttributeError, KeyError): 

1501 original = getattr(target, name, DEFAULT) 

1502 else: 

1503 local = True 

1504 

1505 if name in _builtins and isinstance(target, ModuleType): 

1506 self.create = True 

1507 

1508 if not self.create and original is DEFAULT: 

1509 raise AttributeError( 

1510 "%s does not have the attribute %r" % (target, name) 

1511 ) 

1512 return original, local 

1513 

1514 

1515 def __enter__(self): 

1516 """Perform the patch.""" 

1517 if self.is_started: 

1518 raise RuntimeError("Patch is already started") 

1519 

1520 new, spec, spec_set = self.new, self.spec, self.spec_set 

1521 autospec, kwargs = self.autospec, self.kwargs 

1522 new_callable = self.new_callable 

1523 self.target = self.getter() 

1524 

1525 # normalise False to None 

1526 if spec is False: 

1527 spec = None 

1528 if spec_set is False: 

1529 spec_set = None 

1530 if autospec is False: 

1531 autospec = None 

1532 

1533 if spec is not None and autospec is not None: 

1534 raise TypeError("Can't specify spec and autospec") 

1535 if ((spec is not None or autospec is not None) and 

1536 spec_set not in (True, None)): 

1537 raise TypeError("Can't provide explicit spec_set *and* spec or autospec") 

1538 

1539 original, local = self.get_original() 

1540 

1541 if new is DEFAULT and autospec is None: 

1542 inherit = False 

1543 if spec is True: 

1544 # set spec to the object we are replacing 

1545 spec = original 

1546 if spec_set is True: 

1547 spec_set = original 

1548 spec = None 

1549 elif spec is not None: 

1550 if spec_set is True: 

1551 spec_set = spec 

1552 spec = None 

1553 elif spec_set is True: 

1554 spec_set = original 

1555 

1556 if spec is not None or spec_set is not None: 

1557 if original is DEFAULT: 

1558 raise TypeError("Can't use 'spec' with create=True") 

1559 if isinstance(original, type): 

1560 # If we're patching out a class and there is a spec 

1561 inherit = True 

1562 

1563 # Determine the Klass to use 

1564 if new_callable is not None: 

1565 Klass = new_callable 

1566 elif spec is None and _is_async_obj(original): 

1567 Klass = AsyncMock 

1568 elif spec is not None or spec_set is not None: 

1569 this_spec = spec 

1570 if spec_set is not None: 

1571 this_spec = spec_set 

1572 if _is_list(this_spec): 

1573 not_callable = '__call__' not in this_spec 

1574 else: 

1575 not_callable = not callable(this_spec) 

1576 if _is_async_obj(this_spec): 

1577 Klass = AsyncMock 

1578 elif not_callable: 

1579 Klass = NonCallableMagicMock 

1580 else: 

1581 Klass = MagicMock 

1582 else: 

1583 Klass = MagicMock 

1584 

1585 _kwargs = {} 

1586 if spec is not None: 

1587 _kwargs['spec'] = spec 

1588 if spec_set is not None: 

1589 _kwargs['spec_set'] = spec_set 

1590 

1591 # add a name to mocks 

1592 if (isinstance(Klass, type) and 

1593 issubclass(Klass, NonCallableMock) and self.attribute): 

1594 _kwargs['name'] = self.attribute 

1595 

1596 _kwargs.update(kwargs) 

1597 new = Klass(**_kwargs) 

1598 

1599 if inherit and _is_instance_mock(new): 

1600 # we can only tell if the instance should be callable if the 

1601 # spec is not a list 

1602 this_spec = spec 

1603 if spec_set is not None: 

1604 this_spec = spec_set 

1605 if (not _is_list(this_spec) and not 

1606 _instance_callable(this_spec)): 

1607 Klass = NonCallableMagicMock 

1608 

1609 _kwargs.pop('name') 

1610 new.return_value = Klass(_new_parent=new, _new_name='()', 

1611 **_kwargs) 

1612 elif autospec is not None: 

1613 # spec is ignored, new *must* be default, spec_set is treated 

1614 # as a boolean. Should we check spec is not None and that spec_set 

1615 # is a bool? 

1616 if new is not DEFAULT: 

1617 raise TypeError( 

1618 "autospec creates the mock for you. Can't specify " 

1619 "autospec and new." 

1620 ) 

1621 if original is DEFAULT: 

1622 raise TypeError("Can't use 'autospec' with create=True") 

1623 spec_set = bool(spec_set) 

1624 if autospec is True: 

1625 autospec = original 

1626 

1627 if _is_instance_mock(self.target): 

1628 raise InvalidSpecError( 

1629 f'Cannot autospec attr {self.attribute!r} as the patch ' 

1630 f'target has already been mocked out. ' 

1631 f'[target={self.target!r}, attr={autospec!r}]') 

1632 if _is_instance_mock(autospec): 

1633 target_name = getattr(self.target, '__name__', self.target) 

1634 raise InvalidSpecError( 

1635 f'Cannot autospec attr {self.attribute!r} from target ' 

1636 f'{target_name!r} as it has already been mocked out. ' 

1637 f'[target={self.target!r}, attr={autospec!r}]') 

1638 

1639 new = create_autospec(autospec, spec_set=spec_set, 

1640 _name=self.attribute, **kwargs) 

1641 elif kwargs: 

1642 # can't set keyword args when we aren't creating the mock 

1643 # XXXX If new is a Mock we could call new.configure_mock(**kwargs) 

1644 raise TypeError("Can't pass kwargs to a mock we aren't creating") 

1645 

1646 new_attr = new 

1647 

1648 self.temp_original = original 

1649 self.is_local = local 

1650 self._exit_stack = contextlib.ExitStack() 

1651 self.is_started = True 

1652 try: 

1653 setattr(self.target, self.attribute, new_attr) 

1654 if self.attribute_name is not None: 

1655 extra_args = {} 

1656 if self.new is DEFAULT: 

1657 extra_args[self.attribute_name] = new 

1658 for patching in self.additional_patchers: 

1659 arg = self._exit_stack.enter_context(patching) 

1660 if patching.new is DEFAULT: 

1661 extra_args.update(arg) 

1662 return extra_args 

1663 

1664 return new 

1665 except: 

1666 if not self.__exit__(*sys.exc_info()): 

1667 raise 

1668 

1669 def __exit__(self, *exc_info): 

1670 """Undo the patch.""" 

1671 if not self.is_started: 

1672 return 

1673 

1674 if self.is_local and self.temp_original is not DEFAULT: 

1675 setattr(self.target, self.attribute, self.temp_original) 

1676 else: 

1677 delattr(self.target, self.attribute) 

1678 if not self.create and (not hasattr(self.target, self.attribute) or 

1679 self.attribute in ('__doc__', '__module__', 

1680 '__defaults__', '__annotations__', 

1681 '__kwdefaults__')): 

1682 # needed for proxy objects like django settings 

1683 setattr(self.target, self.attribute, self.temp_original) 

1684 

1685 del self.temp_original 

1686 del self.is_local 

1687 del self.target 

1688 exit_stack = self._exit_stack 

1689 del self._exit_stack 

1690 self.is_started = False 

1691 return exit_stack.__exit__(*exc_info) 

1692 

1693 

1694 def start(self): 

1695 """Activate a patch, returning any created mock.""" 

1696 result = self.__enter__() 

1697 self._active_patches.append(self) 

1698 return result 

1699 

1700 

1701 def stop(self): 

1702 """Stop an active patch.""" 

1703 try: 

1704 self._active_patches.remove(self) 

1705 except ValueError: 

1706 # If the patch hasn't been started this will fail 

1707 return None 

1708 

1709 return self.__exit__(None, None, None) 

1710 

1711 

1712 

1713def _get_target(target): 

1714 try: 

1715 target, attribute = target.rsplit('.', 1) 

1716 except (TypeError, ValueError, AttributeError): 

1717 raise TypeError( 

1718 f"Need a valid target to patch. You supplied: {target!r}") 

1719 getter = lambda: _importer(target) 

1720 return getter, attribute 

1721 

1722 

1723def _patch_object( 

1724 target, attribute, new=DEFAULT, spec=None, 

1725 create=False, spec_set=None, autospec=None, 

1726 new_callable=None, *, unsafe=False, **kwargs 

1727 ): 

1728 """ 

1729 patch the named member (`attribute`) on an object (`target`) with a mock 

1730 object. 

1731 

1732 `patch.object` can be used as a decorator, class decorator or a context 

1733 manager. Arguments `new`, `spec`, `create`, `spec_set`, 

1734 `autospec` and `new_callable` have the same meaning as for `patch`. Like 

1735 `patch`, `patch.object` takes arbitrary keyword arguments for configuring 

1736 the mock object it creates. 

1737 

1738 When used as a class decorator `patch.object` honours `patch.TEST_PREFIX` 

1739 for choosing which methods to wrap. 

1740 """ 

1741 if type(target) is str: 

1742 raise TypeError( 

1743 f"{target!r} must be the actual object to be patched, not a str" 

1744 ) 

1745 getter = lambda: target 

1746 return _patch( 

1747 getter, attribute, new, spec, create, 

1748 spec_set, autospec, new_callable, kwargs, unsafe=unsafe 

1749 ) 

1750 

1751 

1752def _patch_multiple(target, spec=None, create=False, spec_set=None, 

1753 autospec=None, new_callable=None, **kwargs): 

1754 """Perform multiple patches in a single call. It takes the object to be 

1755 patched (either as an object or a string to fetch the object by importing) 

1756 and keyword arguments for the patches:: 

1757 

1758 with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'): 

1759 ... 

1760 

1761 Use `DEFAULT` as the value if you want `patch.multiple` to create 

1762 mocks for you. In this case the created mocks are passed into a decorated 

1763 function by keyword, and a dictionary is returned when `patch.multiple` is 

1764 used as a context manager. 

1765 

1766 `patch.multiple` can be used as a decorator, class decorator or a context 

1767 manager. The arguments `spec`, `spec_set`, `create`, 

1768 `autospec` and `new_callable` have the same meaning as for `patch`. These 

1769 arguments will be applied to *all* patches done by `patch.multiple`. 

1770 

1771 When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX` 

1772 for choosing which methods to wrap. 

1773 """ 

1774 if type(target) is str: 

1775 getter = lambda: _importer(target) 

1776 else: 

1777 getter = lambda: target 

1778 

1779 if not kwargs: 

1780 raise ValueError( 

1781 'Must supply at least one keyword argument with patch.multiple' 

1782 ) 

1783 # need to wrap in a list for python 3, where items is a view 

1784 items = list(kwargs.items()) 

1785 attribute, new = items[0] 

1786 patcher = _patch( 

1787 getter, attribute, new, spec, create, spec_set, 

1788 autospec, new_callable, {} 

1789 ) 

1790 patcher.attribute_name = attribute 

1791 for attribute, new in items[1:]: 

1792 this_patcher = _patch( 

1793 getter, attribute, new, spec, create, spec_set, 

1794 autospec, new_callable, {} 

1795 ) 

1796 this_patcher.attribute_name = attribute 

1797 patcher.additional_patchers.append(this_patcher) 

1798 return patcher 

1799 

1800 

1801def patch( 

1802 target, new=DEFAULT, spec=None, create=False, 

1803 spec_set=None, autospec=None, new_callable=None, *, unsafe=False, **kwargs 

1804 ): 

1805 """ 

1806 `patch` acts as a function decorator, class decorator or a context 

1807 manager. Inside the body of the function or with statement, the `target` 

1808 is patched with a `new` object. When the function/with statement exits 

1809 the patch is undone. 

1810 

1811 If `new` is omitted, then the target is replaced with an 

1812 `AsyncMock` if the patched object is an async function or a 

1813 `MagicMock` otherwise. If `patch` is used as a decorator and `new` is 

1814 omitted, the created mock is passed in as an extra argument to the 

1815 decorated function. If `patch` is used as a context manager the created 

1816 mock is returned by the context manager. 

1817 

1818 `target` should be a string in the form `'package.module.ClassName'`. The 

1819 `target` is imported and the specified object replaced with the `new` 

1820 object, so the `target` must be importable from the environment you are 

1821 calling `patch` from. The target is imported when the decorated function 

1822 is executed, not at decoration time. 

1823 

1824 The `spec` and `spec_set` keyword arguments are passed to the `MagicMock` 

1825 if patch is creating one for you. 

1826 

1827 In addition you can pass `spec=True` or `spec_set=True`, which causes 

1828 patch to pass in the object being mocked as the spec/spec_set object. 

1829 

1830 `new_callable` allows you to specify a different class, or callable object, 

1831 that will be called to create the `new` object. By default `AsyncMock` is 

1832 used for async functions and `MagicMock` for the rest. 

1833 

1834 A more powerful form of `spec` is `autospec`. If you set `autospec=True` 

1835 then the mock will be created with a spec from the object being replaced. 

1836 All attributes of the mock will also have the spec of the corresponding 

1837 attribute of the object being replaced. Methods and functions being 

1838 mocked will have their arguments checked and will raise a `TypeError` if 

1839 they are called with the wrong signature. For mocks replacing a class, 

1840 their return value (the 'instance') will have the same spec as the class. 

1841 

1842 Instead of `autospec=True` you can pass `autospec=some_object` to use an 

1843 arbitrary object as the spec instead of the one being replaced. 

1844 

1845 By default `patch` will fail to replace attributes that don't exist. If 

1846 you pass in `create=True`, and the attribute doesn't exist, patch will 

1847 create the attribute for you when the patched function is called, and 

1848 delete it again afterwards. This is useful for writing tests against 

1849 attributes that your production code creates at runtime. It is off by 

1850 default because it can be dangerous. With it switched on you can write 

1851 passing tests against APIs that don't actually exist! 

1852 

1853 Patch can be used as a `TestCase` class decorator. It works by 

1854 decorating each test method in the class. This reduces the boilerplate 

1855 code when your test methods share a common patchings set. `patch` finds 

1856 tests by looking for method names that start with `patch.TEST_PREFIX`. 

1857 By default this is `test`, which matches the way `unittest` finds tests. 

1858 You can specify an alternative prefix by setting `patch.TEST_PREFIX`. 

1859 

1860 Patch can be used as a context manager, with the with statement. Here the 

1861 patching applies to the indented block after the with statement. If you 

1862 use "as" then the patched object will be bound to the name after the 

1863 "as"; very useful if `patch` is creating a mock object for you. 

1864 

1865 Patch will raise a `RuntimeError` if passed some common misspellings of 

1866 the arguments autospec and spec_set. Pass the argument `unsafe` with the 

1867 value True to disable that check. 

1868 

1869 `patch` takes arbitrary keyword arguments. These will be passed to 

1870 `AsyncMock` if the patched object is asynchronous, to `MagicMock` 

1871 otherwise or to `new_callable` if specified. 

1872 

1873 `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are 

1874 available for alternate use-cases. 

1875 """ 

1876 getter, attribute = _get_target(target) 

1877 return _patch( 

1878 getter, attribute, new, spec, create, 

1879 spec_set, autospec, new_callable, kwargs, unsafe=unsafe 

1880 ) 

1881 

1882 

1883class _patch_dict(object): 

1884 """ 

1885 Patch a dictionary, or dictionary like object, and restore the dictionary 

1886 to its original state after the test, where the restored dictionary is 

1887 a copy of the dictionary as it was before the test. 

1888 

1889 `in_dict` can be a dictionary or a mapping like container. If it is a 

1890 mapping then it must at least support getting, setting and deleting items 

1891 plus iterating over keys. 

1892 

1893 `in_dict` can also be a string specifying the name of the dictionary, which 

1894 will then be fetched by importing it. 

1895 

1896 `values` can be a dictionary of values to set in the dictionary. `values` 

1897 can also be an iterable of `(key, value)` pairs. 

1898 

1899 If `clear` is True then the dictionary will be cleared before the new 

1900 values are set. 

1901 

1902 `patch.dict` can also be called with arbitrary keyword arguments to set 

1903 values in the dictionary:: 

1904 

1905 with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()): 

1906 ... 

1907 

1908 `patch.dict` can be used as a context manager, decorator or class 

1909 decorator. When used as a class decorator `patch.dict` honours 

1910 `patch.TEST_PREFIX` for choosing which methods to wrap. 

1911 """ 

1912 

1913 def __init__(self, in_dict, values=(), clear=False, **kwargs): 

1914 self.in_dict = in_dict 

1915 # support any argument supported by dict(...) constructor 

1916 self.values = dict(values) 

1917 self.values.update(kwargs) 

1918 self.clear = clear 

1919 self._original = None 

1920 

1921 

1922 def __call__(self, f): 

1923 if isinstance(f, type): 

1924 return self.decorate_class(f) 

1925 if iscoroutinefunction(f): 

1926 return self.decorate_async_callable(f) 

1927 return self.decorate_callable(f) 

1928 

1929 

1930 def decorate_callable(self, f): 

1931 @wraps(f) 

1932 def _inner(*args, **kw): 

1933 self._patch_dict() 

1934 try: 

1935 return f(*args, **kw) 

1936 finally: 

1937 self._unpatch_dict() 

1938 

1939 return _inner 

1940 

1941 

1942 def decorate_async_callable(self, f): 

1943 @wraps(f) 

1944 async def _inner(*args, **kw): 

1945 self._patch_dict() 

1946 try: 

1947 return await f(*args, **kw) 

1948 finally: 

1949 self._unpatch_dict() 

1950 

1951 return _inner 

1952 

1953 

1954 def decorate_class(self, klass): 

1955 for attr in dir(klass): 

1956 attr_value = getattr(klass, attr) 

1957 if (attr.startswith(patch.TEST_PREFIX) and 

1958 hasattr(attr_value, "__call__")): 

1959 decorator = _patch_dict(self.in_dict, self.values, self.clear) 

1960 decorated = decorator(attr_value) 

1961 setattr(klass, attr, decorated) 

1962 return klass 

1963 

1964 

1965 def __enter__(self): 

1966 """Patch the dict.""" 

1967 self._patch_dict() 

1968 return self.in_dict 

1969 

1970 

1971 def _patch_dict(self): 

1972 values = self.values 

1973 if isinstance(self.in_dict, str): 

1974 self.in_dict = _importer(self.in_dict) 

1975 in_dict = self.in_dict 

1976 clear = self.clear 

1977 

1978 try: 

1979 original = in_dict.copy() 

1980 except AttributeError: 

1981 # dict like object with no copy method 

1982 # must support iteration over keys 

1983 original = {} 

1984 for key in in_dict: 

1985 original[key] = in_dict[key] 

1986 self._original = original 

1987 

1988 if clear: 

1989 _clear_dict(in_dict) 

1990 

1991 try: 

1992 in_dict.update(values) 

1993 except AttributeError: 

1994 # dict like object with no update method 

1995 for key in values: 

1996 in_dict[key] = values[key] 

1997 

1998 

1999 def _unpatch_dict(self): 

2000 in_dict = self.in_dict 

2001 original = self._original 

2002 

2003 _clear_dict(in_dict) 

2004 

2005 try: 

2006 in_dict.update(original) 

2007 except AttributeError: 

2008 for key in original: 

2009 in_dict[key] = original[key] 

2010 

2011 

2012 def __exit__(self, *args): 

2013 """Unpatch the dict.""" 

2014 if self._original is not None: 

2015 self._unpatch_dict() 

2016 return False 

2017 

2018 

2019 def start(self): 

2020 """Activate a patch, returning any created mock.""" 

2021 result = self.__enter__() 

2022 _patch._active_patches.append(self) 

2023 return result 

2024 

2025 

2026 def stop(self): 

2027 """Stop an active patch.""" 

2028 try: 

2029 _patch._active_patches.remove(self) 

2030 except ValueError: 

2031 # If the patch hasn't been started this will fail 

2032 return None 

2033 

2034 return self.__exit__(None, None, None) 

2035 

2036 

2037def _clear_dict(in_dict): 

2038 try: 

2039 in_dict.clear() 

2040 except AttributeError: 

2041 keys = list(in_dict) 

2042 for key in keys: 

2043 del in_dict[key] 

2044 

2045 

2046def _patch_stopall(): 

2047 """Stop all active patches. LIFO to unroll nested patches.""" 

2048 for patch in reversed(_patch._active_patches): 

2049 patch.stop() 

2050 

2051 

2052patch.object = _patch_object 

2053patch.dict = _patch_dict 

2054patch.multiple = _patch_multiple 

2055patch.stopall = _patch_stopall 

2056patch.TEST_PREFIX = 'test' 

2057 

2058magic_methods = ( 

2059 "lt le gt ge eq ne " 

2060 "getitem setitem delitem " 

2061 "len contains iter " 

2062 "hash str sizeof " 

2063 "enter exit " 

2064 # we added divmod and rdivmod here instead of numerics 

2065 # because there is no idivmod 

2066 "divmod rdivmod neg pos abs invert " 

2067 "complex int float index " 

2068 "round trunc floor ceil " 

2069 "bool next " 

2070 "fspath " 

2071 "aiter " 

2072) 

2073 

2074if IS_PYPY: 

2075 # PyPy has no __sizeof__: http://doc.pypy.org/en/latest/cpython_differences.html 

2076 magic_methods = magic_methods.replace('sizeof ', '') 

2077 

2078numerics = ( 

2079 "add sub mul matmul truediv floordiv mod lshift rshift and xor or pow" 

2080) 

2081inplace = ' '.join('i%s' % n for n in numerics.split()) 

2082right = ' '.join('r%s' % n for n in numerics.split()) 

2083 

2084# not including __prepare__, __instancecheck__, __subclasscheck__ 

2085# (as they are metaclass methods) 

2086# __del__ is not supported at all as it causes problems if it exists 

2087 

2088_non_defaults = { 

2089 '__get__', '__set__', '__delete__', '__reversed__', '__missing__', 

2090 '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__', 

2091 '__getstate__', '__setstate__', '__getformat__', 

2092 '__repr__', '__dir__', '__subclasses__', '__format__', 

2093 '__getnewargs_ex__', 

2094} 

2095 

2096 

2097def _get_method(name, func): 

2098 "Turns a callable object (like a mock) into a real function" 

2099 def method(self, *args, **kw): 

2100 return func(self, *args, **kw) 

2101 method.__name__ = name 

2102 return method 

2103 

2104 

2105_magics = { 

2106 '__%s__' % method for method in 

2107 ' '.join([magic_methods, numerics, inplace, right]).split() 

2108} 

2109 

2110# Magic methods used for async `with` statements 

2111_async_method_magics = {"__aenter__", "__aexit__", "__anext__"} 

2112# Magic methods that are only used with async calls but are synchronous functions themselves 

2113_sync_async_magics = {"__aiter__"} 

2114_async_magics = _async_method_magics | _sync_async_magics 

2115 

2116_all_sync_magics = _magics | _non_defaults 

2117_all_magics = _all_sync_magics | _async_magics 

2118 

2119_unsupported_magics = { 

2120 '__getattr__', '__setattr__', 

2121 '__init__', '__new__', '__prepare__', 

2122 '__instancecheck__', '__subclasscheck__', 

2123 '__del__' 

2124} 

2125 

2126_calculate_return_value = { 

2127 '__hash__': lambda self: object.__hash__(self), 

2128 '__str__': lambda self: object.__str__(self), 

2129 '__sizeof__': lambda self: object.__sizeof__(self), 

2130 '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}", 

2131} 

2132 

2133_return_values = { 

2134 '__lt__': NotImplemented, 

2135 '__gt__': NotImplemented, 

2136 '__le__': NotImplemented, 

2137 '__ge__': NotImplemented, 

2138 '__int__': 1, 

2139 '__contains__': False, 

2140 '__len__': 0, 

2141 '__exit__': False, 

2142 '__complex__': 1j, 

2143 '__float__': 1.0, 

2144 '__bool__': True, 

2145 '__index__': 1, 

2146 '__aexit__': False, 

2147} 

2148 

2149 

2150def _get_eq(self): 

2151 def __eq__(other): 

2152 ret_val = self.__eq__._mock_return_value 

2153 if ret_val is not DEFAULT: 

2154 return ret_val 

2155 if self is other: 

2156 return True 

2157 return NotImplemented 

2158 return __eq__ 

2159 

2160def _get_ne(self): 

2161 def __ne__(other): 

2162 if self.__ne__._mock_return_value is not DEFAULT: 

2163 return DEFAULT 

2164 if self is other: 

2165 return False 

2166 return NotImplemented 

2167 return __ne__ 

2168 

2169def _get_iter(self): 

2170 def __iter__(): 

2171 ret_val = self.__iter__._mock_return_value 

2172 if ret_val is DEFAULT: 

2173 return iter([]) 

2174 # if ret_val was already an iterator, then calling iter on it should 

2175 # return the iterator unchanged 

2176 return iter(ret_val) 

2177 return __iter__ 

2178 

2179def _get_async_iter(self): 

2180 def __aiter__(): 

2181 ret_val = self.__aiter__._mock_return_value 

2182 if ret_val is DEFAULT: 

2183 return _AsyncIterator(iter([])) 

2184 return _AsyncIterator(iter(ret_val)) 

2185 return __aiter__ 

2186 

2187_side_effect_methods = { 

2188 '__eq__': _get_eq, 

2189 '__ne__': _get_ne, 

2190 '__iter__': _get_iter, 

2191 '__aiter__': _get_async_iter 

2192} 

2193 

2194 

2195 

2196def _set_return_value(mock, method, name): 

2197 fixed = _return_values.get(name, DEFAULT) 

2198 if fixed is not DEFAULT: 

2199 method.return_value = fixed 

2200 return 

2201 

2202 return_calculator = _calculate_return_value.get(name) 

2203 if return_calculator is not None: 

2204 return_value = return_calculator(mock) 

2205 method.return_value = return_value 

2206 return 

2207 

2208 side_effector = _side_effect_methods.get(name) 

2209 if side_effector is not None: 

2210 method.side_effect = side_effector(mock) 

2211 

2212 

2213 

2214class MagicMixin(Base): 

2215 def __init__(self, *args, **kw): 

2216 self._mock_set_magics() # make magic work for kwargs in init 

2217 _safe_super(MagicMixin, self).__init__(*args, **kw) 

2218 self._mock_set_magics() # fix magic broken by upper level init 

2219 

2220 

2221 def _mock_set_magics(self): 

2222 orig_magics = _magics | _async_method_magics 

2223 these_magics = orig_magics 

2224 

2225 if getattr(self, "_mock_methods", None) is not None: 

2226 these_magics = orig_magics.intersection(self._mock_methods) 

2227 remove_magics = orig_magics - these_magics 

2228 

2229 for entry in remove_magics: 

2230 if entry in type(self).__dict__: 

2231 # remove unneeded magic methods 

2232 delattr(self, entry) 

2233 

2234 # don't overwrite existing attributes if called a second time 

2235 these_magics = these_magics - set(type(self).__dict__) 

2236 

2237 _type = type(self) 

2238 for entry in these_magics: 

2239 setattr(_type, entry, MagicProxy(entry, self)) 

2240 

2241 

2242 

2243class NonCallableMagicMock(MagicMixin, NonCallableMock): 

2244 """A version of `MagicMock` that isn't callable.""" 

2245 def mock_add_spec(self, spec, spec_set=False): 

2246 """Add a spec to a mock. `spec` can either be an object or a 

2247 list of strings. Only attributes on the `spec` can be fetched as 

2248 attributes from the mock. 

2249 

2250 If `spec_set` is True then only attributes on the spec can be set.""" 

2251 self._mock_add_spec(spec, spec_set) 

2252 self._mock_set_magics() 

2253 

2254 

2255class AsyncMagicMixin(MagicMixin): 

2256 pass 

2257 

2258 

2259class MagicMock(MagicMixin, Mock): 

2260 """ 

2261 MagicMock is a subclass of Mock with default implementations 

2262 of most of the magic methods. You can use MagicMock without having to 

2263 configure the magic methods yourself. 

2264 

2265 If you use the `spec` or `spec_set` arguments then *only* magic 

2266 methods that exist in the spec will be created. 

2267 

2268 Attributes and the return value of a `MagicMock` will also be `MagicMocks`. 

2269 """ 

2270 def mock_add_spec(self, spec, spec_set=False): 

2271 """Add a spec to a mock. `spec` can either be an object or a 

2272 list of strings. Only attributes on the `spec` can be fetched as 

2273 attributes from the mock. 

2274 

2275 If `spec_set` is True then only attributes on the spec can be set.""" 

2276 self._mock_add_spec(spec, spec_set) 

2277 self._mock_set_magics() 

2278 

2279 def reset_mock(self, *args, return_value: bool = False, **kwargs): 

2280 if ( 

2281 return_value 

2282 and self._mock_name 

2283 and _is_magic(self._mock_name) 

2284 ): 

2285 # Don't reset return values for magic methods, 

2286 # otherwise `m.__str__` will start 

2287 # to return `MagicMock` instances, instead of `str` instances. 

2288 return_value = False 

2289 super().reset_mock(*args, return_value=return_value, **kwargs) 

2290 

2291 

2292class MagicProxy(Base): 

2293 def __init__(self, name, parent): 

2294 self.name = name 

2295 self.parent = parent 

2296 

2297 def create_mock(self): 

2298 entry = self.name 

2299 parent = self.parent 

2300 m = parent._get_child_mock(name=entry, _new_name=entry, 

2301 _new_parent=parent) 

2302 setattr(parent, entry, m) 

2303 _set_return_value(parent, m, entry) 

2304 return m 

2305 

2306 def __get__(self, obj, _type=None): 

2307 return self.create_mock() 

2308 

2309 

2310try: 

2311 _CODE_SIG = inspect.signature(partial(CodeType.__init__, None)) 

2312 _CODE_ATTRS = dir(CodeType) 

2313except ValueError: # pragma: no cover - backport is only tested against builds with docstrings 

2314 _CODE_SIG = None 

2315 

2316 

2317class AsyncMockMixin(Base): 

2318 await_count = _delegating_property('await_count') 

2319 await_args = _delegating_property('await_args') 

2320 await_args_list = _delegating_property('await_args_list') 

2321 

2322 def __init__(self, *args, **kwargs): 

2323 super().__init__(*args, **kwargs) 

2324 # iscoroutinefunction() checks _is_coroutine property to say if an 

2325 # object is a coroutine. Without this check it looks to see if it is a 

2326 # function/method, which in this case it is not (since it is an 

2327 # AsyncMock). 

2328 # It is set through __dict__ because when spec_set is True, this 

2329 # attribute is likely undefined. 

2330 self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine 

2331 self.__dict__['_mock_await_count'] = 0 

2332 self.__dict__['_mock_await_args'] = None 

2333 self.__dict__['_mock_await_args_list'] = _CallList() 

2334 if _CODE_SIG: 

2335 code_mock = NonCallableMock(spec_set=_CODE_ATTRS) 

2336 code_mock.__dict__["_spec_class"] = CodeType 

2337 code_mock.__dict__["_spec_signature"] = _CODE_SIG 

2338 else: # pragma: no cover - backport is only tested against builds with docstrings 

2339 code_mock = NonCallableMock(spec_set=CodeType) 

2340 code_mock.co_flags = ( 

2341 inspect.CO_COROUTINE 

2342 + inspect.CO_VARARGS 

2343 + inspect.CO_VARKEYWORDS 

2344 ) 

2345 code_mock.co_argcount = 0 

2346 code_mock.co_varnames = ('args', 'kwargs') 

2347 try: 

2348 code_mock.co_posonlyargcount = 0 

2349 except AttributeError: 

2350 # Python 3.7 and earlier. 

2351 pass 

2352 code_mock.co_kwonlyargcount = 0 

2353 self.__dict__['__code__'] = code_mock 

2354 self.__dict__['__name__'] = 'AsyncMock' 

2355 self.__dict__['__defaults__'] = tuple() 

2356 self.__dict__['__kwdefaults__'] = {} 

2357 self.__dict__['__annotations__'] = None 

2358 

2359 async def _execute_mock_call(_mock_self, *args, **kwargs): 

2360 self = _mock_self 

2361 # This is nearly just like super(), except for special handling 

2362 # of coroutines 

2363 

2364 _call = _Call((args, kwargs), two=True) 

2365 self.await_count += 1 

2366 self.await_args = _call 

2367 self.await_args_list.append(_call) 

2368 

2369 effect = self.side_effect 

2370 if effect is not None: 

2371 if _is_exception(effect): 

2372 raise effect 

2373 elif not _callable(effect): 

2374 try: 

2375 result = next(effect) 

2376 except StopIteration: 

2377 # It is impossible to propagate a StopIteration 

2378 # through coroutines because of PEP 479 

2379 raise StopAsyncIteration 

2380 if _is_exception(result): 

2381 raise result 

2382 elif iscoroutinefunction(effect): 

2383 result = await effect(*args, **kwargs) 

2384 else: 

2385 result = effect(*args, **kwargs) 

2386 

2387 if result is not DEFAULT: 

2388 return result 

2389 

2390 if self._mock_return_value is not DEFAULT: 

2391 return self.return_value 

2392 

2393 if self._mock_wraps is not None: 

2394 if iscoroutinefunction(self._mock_wraps): 

2395 return await self._mock_wraps(*args, **kwargs) 

2396 return self._mock_wraps(*args, **kwargs) 

2397 

2398 return self.return_value 

2399 

2400 def assert_awaited(_mock_self): 

2401 """ 

2402 Assert that the mock was awaited at least once. 

2403 """ 

2404 self = _mock_self 

2405 if self.await_count == 0: 

2406 msg = f"Expected {self._mock_name or 'mock'} to have been awaited." 

2407 raise AssertionError(msg) 

2408 

2409 def assert_awaited_once(_mock_self): 

2410 """ 

2411 Assert that the mock was awaited exactly once. 

2412 """ 

2413 self = _mock_self 

2414 if not self.await_count == 1: 

2415 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 

2416 f" Awaited {self.await_count} times.") 

2417 raise AssertionError(msg) 

2418 

2419 def assert_awaited_with(_mock_self, *args, **kwargs): 

2420 """ 

2421 Assert that the last await was with the specified arguments. 

2422 """ 

2423 self = _mock_self 

2424 if self.await_args is None: 

2425 expected = self._format_mock_call_signature(args, kwargs) 

2426 raise AssertionError(f'Expected await: {expected}\nNot awaited') 

2427 

2428 def _error_message(): 

2429 msg = self._format_mock_failure_message(args, kwargs, action='await') 

2430 return msg 

2431 

2432 expected = self._call_matcher(_Call((args, kwargs), two=True)) 

2433 actual = self._call_matcher(self.await_args) 

2434 if actual != expected: 

2435 cause = expected if isinstance(expected, Exception) else None 

2436 raise AssertionError(_error_message()) from cause 

2437 

2438 def assert_awaited_once_with(_mock_self, *args, **kwargs): 

2439 """ 

2440 Assert that the mock was awaited exactly once and with the specified 

2441 arguments. 

2442 """ 

2443 self = _mock_self 

2444 if not self.await_count == 1: 

2445 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 

2446 f" Awaited {self.await_count} times.") 

2447 raise AssertionError(msg) 

2448 return self.assert_awaited_with(*args, **kwargs) 

2449 

2450 def assert_any_await(_mock_self, *args, **kwargs): 

2451 """ 

2452 Assert the mock has ever been awaited with the specified arguments. 

2453 """ 

2454 self = _mock_self 

2455 expected = self._call_matcher(_Call((args, kwargs), two=True)) 

2456 cause = expected if isinstance(expected, Exception) else None 

2457 actual = [self._call_matcher(c) for c in self.await_args_list] 

2458 if cause or expected not in _AnyComparer(actual): 

2459 expected_string = self._format_mock_call_signature(args, kwargs) 

2460 raise AssertionError( 

2461 '%s await not found' % expected_string 

2462 ) from cause 

2463 

2464 def assert_has_awaits(_mock_self, calls, any_order=False): 

2465 """ 

2466 Assert the mock has been awaited with the specified calls. 

2467 The :attr:`await_args_list` list is checked for the awaits. 

2468 

2469 If `any_order` is False (the default) then the awaits must be 

2470 sequential. There can be extra calls before or after the 

2471 specified awaits. 

2472 

2473 If `any_order` is True then the awaits can be in any order, but 

2474 they must all appear in :attr:`await_args_list`. 

2475 """ 

2476 self = _mock_self 

2477 expected = [self._call_matcher(c) for c in calls] 

2478 cause = cause = next((e for e in expected if isinstance(e, Exception)), None) 

2479 all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list) 

2480 if not any_order: 

2481 if expected not in all_awaits: 

2482 if cause is None: 

2483 problem = 'Awaits not found.' 

2484 else: 

2485 problem = ('Error processing expected awaits.\n' 

2486 'Errors: {}').format( 

2487 [e if isinstance(e, Exception) else None 

2488 for e in expected]) 

2489 raise AssertionError( 

2490 f'{problem}\n' 

2491 f'Expected: {_CallList(calls)}\n' 

2492 f'Actual: {self.await_args_list}' 

2493 ) from cause 

2494 return 

2495 

2496 all_awaits = list(all_awaits) 

2497 

2498 not_found = [] 

2499 for kall in expected: 

2500 try: 

2501 all_awaits.remove(kall) 

2502 except ValueError: 

2503 not_found.append(kall) 

2504 if not_found: 

2505 raise AssertionError( 

2506 '%r not all found in await list' % (tuple(not_found),) 

2507 ) from cause 

2508 

2509 def assert_not_awaited(_mock_self): 

2510 """ 

2511 Assert that the mock was never awaited. 

2512 """ 

2513 self = _mock_self 

2514 if self.await_count != 0: 

2515 msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited." 

2516 f" Awaited {self.await_count} times.") 

2517 raise AssertionError(msg) 

2518 

2519 def reset_mock(self, *args, **kwargs): 

2520 """ 

2521 See :func:`.Mock.reset_mock()` 

2522 """ 

2523 super().reset_mock(*args, **kwargs) 

2524 self.await_count = 0 

2525 self.await_args = None 

2526 self.await_args_list = _CallList() 

2527 

2528 

2529class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock): 

2530 """ 

2531 Enhance :class:`Mock` with features allowing to mock 

2532 an async function. 

2533 

2534 The :class:`AsyncMock` object will behave so the object is 

2535 recognized as an async function, and the result of a call is an awaitable: 

2536 

2537 >>> mock = AsyncMock() 

2538 >>> iscoroutinefunction(mock) 

2539 True 

2540 >>> inspect.isawaitable(mock()) 

2541 True 

2542 

2543 

2544 The result of ``mock()`` is an async function which will have the outcome 

2545 of ``side_effect`` or ``return_value``: 

2546 

2547 - if ``side_effect`` is a function, the async function will return the 

2548 result of that function, 

2549 - if ``side_effect`` is an exception, the async function will raise the 

2550 exception, 

2551 - if ``side_effect`` is an iterable, the async function will return the 

2552 next value of the iterable, however, if the sequence of result is 

2553 exhausted, ``StopIteration`` is raised immediately, 

2554 - if ``side_effect`` is not defined, the async function will return the 

2555 value defined by ``return_value``, hence, by default, the async function 

2556 returns a new :class:`AsyncMock` object. 

2557 

2558 If the outcome of ``side_effect`` or ``return_value`` is an async function, 

2559 the mock async function obtained when the mock object is called will be this 

2560 async function itself (and not an async function returning an async 

2561 function). 

2562 

2563 The test author can also specify a wrapped object with ``wraps``. In this 

2564 case, the :class:`Mock` object behavior is the same as with an 

2565 :class:`.Mock` object: the wrapped object may have methods 

2566 defined as async function functions. 

2567 

2568 Based on Martin Richard's asynctest project. 

2569 """ 

2570 

2571 

2572class _ANY(object): 

2573 "A helper object that compares equal to everything." 

2574 

2575 def __eq__(self, other): 

2576 return True 

2577 

2578 def __ne__(self, other): 

2579 return False 

2580 

2581 def __repr__(self): 

2582 return '<ANY>' 

2583 

2584ANY = _ANY() 

2585 

2586 

2587 

2588def _format_call_signature(name, args, kwargs): 

2589 message = '%s(%%s)' % name 

2590 formatted_args = '' 

2591 args_string = ', '.join([repr(arg) for arg in args]) 

2592 kwargs_string = ', '.join([ 

2593 '%s=%r' % (key, value) for key, value in kwargs.items() 

2594 ]) 

2595 if args_string: 

2596 formatted_args = args_string 

2597 if kwargs_string: 

2598 if formatted_args: 

2599 formatted_args += ', ' 

2600 formatted_args += kwargs_string 

2601 

2602 return message % formatted_args 

2603 

2604 

2605 

2606class _Call(tuple): 

2607 """ 

2608 A tuple for holding the results of a call to a mock, either in the form 

2609 `(args, kwargs)` or `(name, args, kwargs)`. 

2610 

2611 If args or kwargs are empty then a call tuple will compare equal to 

2612 a tuple without those values. This makes comparisons less verbose:: 

2613 

2614 _Call(('name', (), {})) == ('name',) 

2615 _Call(('name', (1,), {})) == ('name', (1,)) 

2616 _Call(((), {'a': 'b'})) == ({'a': 'b'},) 

2617 

2618 The `_Call` object provides a useful shortcut for comparing with call:: 

2619 

2620 _Call(((1, 2), {'a': 3})) == call(1, 2, a=3) 

2621 _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3) 

2622 

2623 If the _Call has no name then it will match any name. 

2624 """ 

2625 def __new__(cls, value=(), name='', parent=None, two=False, 

2626 from_kall=True): 

2627 args = () 

2628 kwargs = {} 

2629 _len = len(value) 

2630 if _len == 3: 

2631 name, args, kwargs = value 

2632 elif _len == 2: 

2633 first, second = value 

2634 if isinstance(first, str): 

2635 name = first 

2636 if isinstance(second, tuple): 

2637 args = second 

2638 else: 

2639 kwargs = second 

2640 else: 

2641 args, kwargs = first, second 

2642 elif _len == 1: 

2643 value, = value 

2644 if isinstance(value, str): 

2645 name = value 

2646 elif isinstance(value, tuple): 

2647 args = value 

2648 else: 

2649 kwargs = value 

2650 

2651 if two: 

2652 return tuple.__new__(cls, (args, kwargs)) 

2653 

2654 return tuple.__new__(cls, (name, args, kwargs)) 

2655 

2656 

2657 def __init__(self, value=(), name=None, parent=None, two=False, 

2658 from_kall=True): 

2659 self._mock_name = name 

2660 self._mock_parent = parent 

2661 self._mock_from_kall = from_kall 

2662 

2663 

2664 def __eq__(self, other): 

2665 try: 

2666 len_other = len(other) 

2667 except TypeError: 

2668 return NotImplemented 

2669 

2670 self_name = '' 

2671 if len(self) == 2: 

2672 self_args, self_kwargs = self 

2673 else: 

2674 self_name, self_args, self_kwargs = self 

2675 

2676 if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None) 

2677 and self._mock_parent != other._mock_parent): 

2678 return False 

2679 

2680 other_name = '' 

2681 if len_other == 0: 

2682 other_args, other_kwargs = (), {} 

2683 elif len_other == 3: 

2684 other_name, other_args, other_kwargs = other 

2685 elif len_other == 1: 

2686 value, = other 

2687 if isinstance(value, tuple): 

2688 other_args = value 

2689 other_kwargs = {} 

2690 elif isinstance(value, str): 

2691 other_name = value 

2692 other_args, other_kwargs = (), {} 

2693 else: 

2694 other_args = () 

2695 other_kwargs = value 

2696 elif len_other == 2: 

2697 # could be (name, args) or (name, kwargs) or (args, kwargs) 

2698 first, second = other 

2699 if isinstance(first, str): 

2700 other_name = first 

2701 if isinstance(second, tuple): 

2702 other_args, other_kwargs = second, {} 

2703 else: 

2704 other_args, other_kwargs = (), second 

2705 else: 

2706 other_args, other_kwargs = first, second 

2707 else: 

2708 return False 

2709 

2710 if self_name and other_name != self_name: 

2711 return False 

2712 

2713 # this order is important for ANY to work! 

2714 return (other_args, other_kwargs) == (self_args, self_kwargs) 

2715 

2716 

2717 __ne__ = object.__ne__ 

2718 

2719 

2720 def __call__(self, *args, **kwargs): 

2721 if self._mock_name is None: 

2722 return _Call(('', args, kwargs), name='()') 

2723 

2724 name = self._mock_name + '()' 

2725 return _Call((self._mock_name, args, kwargs), name=name, parent=self) 

2726 

2727 

2728 def __getattr__(self, attr): 

2729 if self._mock_name is None: 

2730 return _Call(name=attr, from_kall=False) 

2731 name = '%s.%s' % (self._mock_name, attr) 

2732 return _Call(name=name, parent=self, from_kall=False) 

2733 

2734 

2735 def __getattribute__(self, attr): 

2736 if attr in tuple.__dict__: 

2737 raise AttributeError 

2738 return tuple.__getattribute__(self, attr) 

2739 

2740 

2741 def _get_call_arguments(self): 

2742 if len(self) == 2: 

2743 args, kwargs = self 

2744 else: 

2745 name, args, kwargs = self 

2746 

2747 return args, kwargs 

2748 

2749 @property 

2750 def args(self): 

2751 return self._get_call_arguments()[0] 

2752 

2753 @property 

2754 def kwargs(self): 

2755 return self._get_call_arguments()[1] 

2756 

2757 def __repr__(self): 

2758 if not self._mock_from_kall: 

2759 name = self._mock_name or 'call' 

2760 if name.startswith('()'): 

2761 name = 'call%s' % name 

2762 return name 

2763 

2764 if len(self) == 2: 

2765 name = 'call' 

2766 args, kwargs = self 

2767 else: 

2768 name, args, kwargs = self 

2769 if not name: 

2770 name = 'call' 

2771 elif not name.startswith('()'): 

2772 name = 'call.%s' % name 

2773 else: 

2774 name = 'call%s' % name 

2775 return _format_call_signature(name, args, kwargs) 

2776 

2777 

2778 def call_list(self): 

2779 """For a call object that represents multiple calls, `call_list` 

2780 returns a list of all the intermediate calls as well as the 

2781 final call.""" 

2782 vals = [] 

2783 thing = self 

2784 while thing is not None: 

2785 if thing._mock_from_kall: 

2786 vals.append(thing) 

2787 thing = thing._mock_parent 

2788 return _CallList(reversed(vals)) 

2789 

2790 

2791call = _Call(from_kall=False) 

2792 

2793 

2794def create_autospec(spec, spec_set=False, instance=False, _parent=None, 

2795 _name=None, *, unsafe=False, **kwargs): 

2796 """Create a mock object using another object as a spec. Attributes on the 

2797 mock will use the corresponding attribute on the `spec` object as their 

2798 spec. 

2799 

2800 Functions or methods being mocked will have their arguments checked 

2801 to check that they are called with the correct signature. 

2802 

2803 If `spec_set` is True then attempting to set attributes that don't exist 

2804 on the spec object will raise an `AttributeError`. 

2805 

2806 If a class is used as a spec then the return value of the mock (the 

2807 instance of the class) will have the same spec. You can use a class as the 

2808 spec for an instance object by passing `instance=True`. The returned mock 

2809 will only be callable if instances of the mock are callable. 

2810 

2811 `create_autospec` will raise a `RuntimeError` if passed some common 

2812 misspellings of the arguments autospec and spec_set. Pass the argument 

2813 `unsafe` with the value True to disable that check. 

2814 

2815 `create_autospec` also takes arbitrary keyword arguments that are passed to 

2816 the constructor of the created mock.""" 

2817 if _is_list(spec): 

2818 # can't pass a list instance to the mock constructor as it will be 

2819 # interpreted as a list of strings 

2820 spec = type(spec) 

2821 

2822 is_type = isinstance(spec, type) 

2823 if _is_instance_mock(spec): 

2824 raise InvalidSpecError(f'Cannot autospec a Mock object. ' 

2825 f'[object={spec!r}]') 

2826 is_async_func = _is_async_func(spec) 

2827 

2828 entries = [(entry, _missing) for entry in dir(spec)] 

2829 if is_type and instance and HAS_DATACLASSES and is_dataclass(spec): 

2830 dataclass_fields = fields(spec) 

2831 entries.extend((f.name, f.type) for f in dataclass_fields) 

2832 _kwargs = {'spec': [f.name for f in dataclass_fields]} 

2833 else: 

2834 _kwargs = {'spec': spec} 

2835 

2836 if spec_set: 

2837 _kwargs = {'spec_set': spec} 

2838 elif spec is None: 

2839 # None we mock with a normal mock without a spec 

2840 _kwargs = {} 

2841 if _kwargs and instance: 

2842 _kwargs['_spec_as_instance'] = True 

2843 if not unsafe: 

2844 _check_spec_arg_typos(kwargs) 

2845 

2846 _name = kwargs.pop('name', _name) 

2847 _new_name = _name 

2848 if _parent is None: 

2849 # for a top level object no _new_name should be set 

2850 _new_name = '' 

2851 

2852 _kwargs.update(kwargs) 

2853 

2854 Klass = MagicMock 

2855 if inspect.isdatadescriptor(spec): 

2856 # descriptors don't have a spec 

2857 # because we don't know what type they return 

2858 _kwargs = {} 

2859 elif is_async_func: 

2860 if instance: 

2861 raise RuntimeError("Instance can not be True when create_autospec " 

2862 "is mocking an async function") 

2863 Klass = AsyncMock 

2864 elif not _callable(spec): 

2865 Klass = NonCallableMagicMock 

2866 elif is_type and instance and not _instance_callable(spec): 

2867 Klass = NonCallableMagicMock 

2868 

2869 mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name, 

2870 name=_name, **_kwargs) 

2871 

2872 if isinstance(spec, FunctionTypes): 

2873 # should only happen at the top level because we don't 

2874 # recurse for functions 

2875 if is_async_func: 

2876 mock = _set_async_signature(mock, spec) 

2877 else: 

2878 mock = _set_signature(mock, spec) 

2879 else: 

2880 _check_signature(spec, mock, is_type, instance) 

2881 

2882 if _parent is not None and not instance: 

2883 _parent._mock_children[_name] = mock 

2884 

2885 # Pop wraps from kwargs because it must not be passed to configure_mock. 

2886 wrapped = kwargs.pop('wraps', None) 

2887 if is_type and not instance and 'return_value' not in kwargs: 

2888 mock.return_value = create_autospec(spec, spec_set, instance=True, 

2889 _name='()', _parent=mock, 

2890 wraps=wrapped) 

2891 

2892 for entry, original in entries: 

2893 if _is_magic(entry): 

2894 # MagicMock already does the useful magic methods for us 

2895 continue 

2896 

2897 # XXXX do we need a better way of getting attributes without 

2898 # triggering code execution (?) Probably not - we need the actual 

2899 # object to mock it so we would rather trigger a property than mock 

2900 # the property descriptor. Likewise we want to mock out dynamically 

2901 # provided attributes. 

2902 # XXXX what about attributes that raise exceptions other than 

2903 # AttributeError on being fetched? 

2904 # we could be resilient against it, or catch and propagate the 

2905 # exception when the attribute is fetched from the mock 

2906 if original is _missing: 

2907 try: 

2908 original = getattr(spec, entry) 

2909 except AttributeError: 

2910 continue 

2911 

2912 child_kwargs = {'spec': original} 

2913 # Wrap child attributes also. 

2914 if wrapped and hasattr(wrapped, entry): 

2915 child_kwargs.update(wraps=original) 

2916 if spec_set: 

2917 child_kwargs = {'spec_set': original} 

2918 

2919 if not isinstance(original, FunctionTypes): 

2920 new = _SpecState(original, spec_set, mock, entry, instance) 

2921 mock._mock_children[entry] = new 

2922 else: 

2923 parent = mock 

2924 if isinstance(spec, FunctionTypes): 

2925 parent = mock.mock 

2926 

2927 skipfirst = _must_skip(spec, entry, is_type) 

2928 child_kwargs['_eat_self'] = skipfirst 

2929 if iscoroutinefunction(original): 

2930 child_klass = AsyncMock 

2931 else: 

2932 child_klass = MagicMock 

2933 new = child_klass(parent=parent, name=entry, _new_name=entry, 

2934 _new_parent=parent, **child_kwargs) 

2935 mock._mock_children[entry] = new 

2936 new.return_value = child_klass() 

2937 _check_signature(original, new, skipfirst=skipfirst) 

2938 

2939 # so functions created with _set_signature become instance attributes, 

2940 # *plus* their underlying mock exists in _mock_children of the parent 

2941 # mock. Adding to _mock_children may be unnecessary where we are also 

2942 # setting as an instance attribute? 

2943 if isinstance(new, FunctionTypes): 

2944 setattr(mock, entry, new) 

2945 # kwargs are passed with respect to the parent mock so, they are not used 

2946 # for creating return_value of the parent mock. So, this condition 

2947 # should be true only for the parent mock if kwargs are given. 

2948 if _is_instance_mock(mock) and kwargs: 

2949 mock.configure_mock(**kwargs) 

2950 

2951 return mock 

2952 

2953 

2954def _must_skip(spec, entry, is_type): 

2955 """ 

2956 Return whether we should skip the first argument on spec's `entry` 

2957 attribute. 

2958 """ 

2959 if not isinstance(spec, type): 

2960 if entry in getattr(spec, '__dict__', {}): 

2961 # instance attribute - shouldn't skip 

2962 return False 

2963 spec = spec.__class__ 

2964 

2965 for klass in spec.__mro__: 

2966 result = klass.__dict__.get(entry, DEFAULT) 

2967 if result is DEFAULT: 

2968 continue 

2969 if isinstance(result, (staticmethod, classmethod)): 

2970 return False 

2971 elif isinstance(result, FunctionTypes): 

2972 # Normal method => skip if looked up on type 

2973 # (if looked up on instance, self is already skipped) 

2974 return is_type 

2975 else: 

2976 return False 

2977 

2978 # function is a dynamically provided attribute 

2979 return is_type 

2980 

2981 

2982class _SpecState(object): 

2983 

2984 def __init__(self, spec, spec_set=False, parent=None, 

2985 name=None, ids=None, instance=False): 

2986 self.spec = spec 

2987 self.ids = ids 

2988 self.spec_set = spec_set 

2989 self.parent = parent 

2990 self.instance = instance 

2991 self.name = name 

2992 

2993 

2994FunctionTypes = ( 

2995 # python function 

2996 type(create_autospec), 

2997 # instance method 

2998 type(ANY.__eq__), 

2999) 

3000 

3001 

3002file_spec = None 

3003open_spec = None 

3004 

3005 

3006def _to_stream(read_data): 

3007 if isinstance(read_data, bytes): 

3008 return io.BytesIO(read_data) 

3009 else: 

3010 return io.StringIO(read_data) 

3011 

3012 

3013def mock_open(mock=None, read_data=''): 

3014 """ 

3015 A helper function to create a mock to replace the use of `open`. It works 

3016 for `open` called directly or used as a context manager. 

3017 

3018 The `mock` argument is the mock object to configure. If `None` (the 

3019 default) then a `MagicMock` will be created for you, with the API limited 

3020 to methods or attributes available on standard file handles. 

3021 

3022 `read_data` is a string for the `read`, `readline` and `readlines` of the 

3023 file handle to return. This is an empty string by default. 

3024 """ 

3025 _read_data = _to_stream(read_data) 

3026 _state = [_read_data, None] 

3027 

3028 def _readlines_side_effect(*args, **kwargs): 

3029 if handle.readlines.return_value is not None: 

3030 return handle.readlines.return_value 

3031 return _state[0].readlines(*args, **kwargs) 

3032 

3033 def _read_side_effect(*args, **kwargs): 

3034 if handle.read.return_value is not None: 

3035 return handle.read.return_value 

3036 return _state[0].read(*args, **kwargs) 

3037 

3038 def _readline_side_effect(*args, **kwargs): 

3039 yield from _iter_side_effect() 

3040 while True: 

3041 yield _state[0].readline(*args, **kwargs) 

3042 

3043 def _iter_side_effect(): 

3044 if handle.readline.return_value is not None: 

3045 while True: 

3046 yield handle.readline.return_value 

3047 for line in _state[0]: 

3048 yield line 

3049 

3050 def _next_side_effect(): 

3051 if handle.readline.return_value is not None: 

3052 return handle.readline.return_value 

3053 return next(_state[0]) 

3054 

3055 def _exit_side_effect(exctype, excinst, exctb): 

3056 handle.close() 

3057 

3058 global file_spec 

3059 if file_spec is None: 

3060 import _io 

3061 file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO)))) 

3062 

3063 global open_spec 

3064 if open_spec is None: 

3065 import _io 

3066 open_spec = list(set(dir(_io.open))) 

3067 if mock is None: 

3068 mock = MagicMock(name='open', spec=open_spec) 

3069 

3070 handle = MagicMock(spec=file_spec) 

3071 handle.__enter__.return_value = handle 

3072 

3073 handle.write.return_value = None 

3074 handle.read.return_value = None 

3075 handle.readline.return_value = None 

3076 handle.readlines.return_value = None 

3077 

3078 handle.read.side_effect = _read_side_effect 

3079 _state[1] = _readline_side_effect() 

3080 handle.readline.side_effect = _state[1] 

3081 handle.readlines.side_effect = _readlines_side_effect 

3082 handle.__iter__.side_effect = _iter_side_effect 

3083 handle.__next__.side_effect = _next_side_effect 

3084 handle.__exit__.side_effect = _exit_side_effect 

3085 

3086 def reset_data(*args, **kwargs): 

3087 _state[0] = _to_stream(read_data) 

3088 if handle.readline.side_effect == _state[1]: 

3089 # Only reset the side effect if the user hasn't overridden it. 

3090 _state[1] = _readline_side_effect() 

3091 handle.readline.side_effect = _state[1] 

3092 return DEFAULT 

3093 

3094 mock.side_effect = reset_data 

3095 mock.return_value = handle 

3096 return mock 

3097 

3098 

3099class PropertyMock(Mock): 

3100 """ 

3101 A mock intended to be used as a property, or other descriptor, on a class. 

3102 `PropertyMock` provides `__get__` and `__set__` methods so you can specify 

3103 a return value when it is fetched. 

3104 

3105 Fetching a `PropertyMock` instance from an object calls the mock, with 

3106 no args. Setting it calls the mock with the value being set. 

3107 """ 

3108 def _get_child_mock(self, **kwargs): 

3109 return MagicMock(**kwargs) 

3110 

3111 def __get__(self, obj, obj_type=None): 

3112 return self() 

3113 def __set__(self, obj, val): 

3114 self(val) 

3115 

3116 

3117_timeout_unset = sentinel.TIMEOUT_UNSET 

3118 

3119class ThreadingMixin(Base): 

3120 

3121 DEFAULT_TIMEOUT = None 

3122 

3123 def _get_child_mock(self, **kw): 

3124 if isinstance(kw.get("parent"), ThreadingMixin): 

3125 kw["timeout"] = kw["parent"]._mock_wait_timeout 

3126 elif isinstance(kw.get("_new_parent"), ThreadingMixin): 

3127 kw["timeout"] = kw["_new_parent"]._mock_wait_timeout 

3128 return super()._get_child_mock(**kw) 

3129 

3130 def __init__(self, *args, timeout=_timeout_unset, **kwargs): 

3131 super().__init__(*args, **kwargs) 

3132 if timeout is _timeout_unset: 

3133 timeout = self.DEFAULT_TIMEOUT 

3134 self.__dict__["_mock_event"] = threading.Event() # Event for any call 

3135 self.__dict__["_mock_calls_events"] = [] # Events for each of the calls 

3136 self.__dict__["_mock_calls_events_lock"] = threading.Lock() 

3137 self.__dict__["_mock_wait_timeout"] = timeout 

3138 

3139 def reset_mock(self, *args, **kwargs): 

3140 """ 

3141 See :func:`.Mock.reset_mock()` 

3142 """ 

3143 super().reset_mock(*args, **kwargs) 

3144 self.__dict__["_mock_event"] = threading.Event() 

3145 self.__dict__["_mock_calls_events"] = [] 

3146 

3147 def __get_event(self, expected_args, expected_kwargs): 

3148 with self._mock_calls_events_lock: 

3149 for args, kwargs, event in self._mock_calls_events: 

3150 if (args, kwargs) == (expected_args, expected_kwargs): 

3151 return event 

3152 new_event = threading.Event() 

3153 self._mock_calls_events.append((expected_args, expected_kwargs, new_event)) 

3154 return new_event 

3155 

3156 def _mock_call(self, *args, **kwargs): 

3157 ret_value = super()._mock_call(*args, **kwargs) 

3158 

3159 call_event = self.__get_event(args, kwargs) 

3160 call_event.set() 

3161 

3162 self._mock_event.set() 

3163 

3164 return ret_value 

3165 

3166 def wait_until_called(self, *, timeout=_timeout_unset): 

3167 """Wait until the mock object is called. 

3168 

3169 `timeout` - time to wait for in seconds, waits forever otherwise. 

3170 Defaults to the constructor provided timeout. 

3171 Use None to block undefinetively. 

3172 """ 

3173 if timeout is _timeout_unset: 

3174 timeout = self._mock_wait_timeout 

3175 if not self._mock_event.wait(timeout=timeout): 

3176 msg = (f"{self._mock_name or 'mock'} was not called before" 

3177 f" timeout({timeout}).") 

3178 raise AssertionError(msg) 

3179 

3180 def wait_until_any_call_with(self, *args, **kwargs): 

3181 """Wait until the mock object is called with given args. 

3182 

3183 Waits for the timeout in seconds provided in the constructor. 

3184 """ 

3185 event = self.__get_event(args, kwargs) 

3186 if not event.wait(timeout=self._mock_wait_timeout): 

3187 expected_string = self._format_mock_call_signature(args, kwargs) 

3188 raise AssertionError(f'{expected_string} call not found') 

3189 

3190 

3191class ThreadingMock(ThreadingMixin, MagicMixin, Mock): 

3192 """ 

3193 A mock that can be used to wait until on calls happening 

3194 in a different thread. 

3195 

3196 The constructor can take a `timeout` argument which 

3197 controls the timeout in seconds for all `wait` calls of the mock. 

3198 

3199 You can change the default timeout of all instances via the 

3200 `ThreadingMock.DEFAULT_TIMEOUT` attribute. 

3201 

3202 If no timeout is set, it will block undefinetively. 

3203 """ 

3204 pass 

3205 

3206 

3207def seal(mock): 

3208 """Disable the automatic generation of child mocks. 

3209 

3210 Given an input Mock, seals it to ensure no further mocks will be generated 

3211 when accessing an attribute that was not already defined. 

3212 

3213 The operation recursively seals the mock passed in, meaning that 

3214 the mock itself, any mocks generated by accessing one of its attributes, 

3215 and all assigned mocks without a name or spec will be sealed. 

3216 """ 

3217 mock._mock_sealed = True 

3218 for attr in dir(mock): 

3219 try: 

3220 m = getattr(mock, attr) 

3221 except AttributeError: 

3222 continue 

3223 if not isinstance(m, NonCallableMock): 

3224 continue 

3225 if isinstance(m._mock_children.get(attr), _SpecState): 

3226 continue 

3227 if m._mock_new_parent is mock: 

3228 seal(m) 

3229 

3230 

3231class _AsyncIterator: 

3232 """ 

3233 Wraps an iterator in an asynchronous iterator. 

3234 """ 

3235 def __init__(self, iterator): 

3236 self.iterator = iterator 

3237 code_mock = NonCallableMock(spec_set=CodeType) 

3238 code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE 

3239 self.__dict__['__code__'] = code_mock 

3240 

3241 async def __anext__(self): 

3242 try: 

3243 return next(self.iterator) 

3244 except StopIteration: 

3245 pass 

3246 raise StopAsyncIteration