Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/mock/mock.py: 29%

1626 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-06 06:03 +0000

1# mock.py 

2# Test tools for mocking and patching. 

3# Maintained by Michael Foord 

4# Backport for other versions of Python available from 

5# https://pypi.org/project/mock 

6 

7__all__ = ( 

8 'Mock', 

9 'MagicMock', 

10 'patch', 

11 'sentinel', 

12 'DEFAULT', 

13 'ANY', 

14 'call', 

15 'create_autospec', 

16 'AsyncMock', 

17 'FILTER_DIR', 

18 'NonCallableMock', 

19 'NonCallableMagicMock', 

20 'mock_open', 

21 'PropertyMock', 

22 'seal', 

23) 

24 

25 

26import asyncio 

27import contextlib 

28import io 

29import inspect 

30import pprint 

31import sys 

32import builtins 

33from asyncio import iscoroutinefunction 

34from types import CodeType, ModuleType, MethodType 

35from unittest.util import safe_repr 

36from functools import wraps, partial 

37from threading import RLock 

38 

39 

40from mock import IS_PYPY 

41from .backports import iscoroutinefunction 

42 

43 

44class InvalidSpecError(Exception): 

45 """Indicates that an invalid value was used as a mock spec.""" 

46 

47 

48_builtins = {name for name in dir(builtins) if not name.startswith('_')} 

49 

50FILTER_DIR = True 

51 

52# Workaround for issue #12370 

53# Without this, the __class__ properties wouldn't be set correctly 

54_safe_super = super 

55 

56def _is_async_obj(obj): 

57 if _is_instance_mock(obj) and not isinstance(obj, AsyncMock): 

58 return False 

59 if hasattr(obj, '__func__'): 

60 obj = getattr(obj, '__func__') 

61 return iscoroutinefunction(obj) or inspect.isawaitable(obj) 

62 

63 

64def _is_async_func(func): 

65 if getattr(func, '__code__', None): 

66 return iscoroutinefunction(func) 

67 else: 

68 return False 

69 

70 

71def _is_instance_mock(obj): 

72 # can't use isinstance on Mock objects because they override __class__ 

73 # The base class for all mocks is NonCallableMock 

74 return issubclass(type(obj), NonCallableMock) 

75 

76 

77def _is_exception(obj): 

78 return ( 

79 isinstance(obj, BaseException) or 

80 isinstance(obj, type) and issubclass(obj, BaseException) 

81 ) 

82 

83 

84def _extract_mock(obj): 

85 # Autospecced functions will return a FunctionType with "mock" attribute 

86 # which is the actual mock object that needs to be used. 

87 if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'): 

88 return obj.mock 

89 else: 

90 return obj 

91 

92 

93def _get_signature_object(func, as_instance, eat_self): 

94 """ 

95 Given an arbitrary, possibly callable object, try to create a suitable 

96 signature object. 

97 Return a (reduced func, signature) tuple, or None. 

98 """ 

99 if isinstance(func, type) and not as_instance: 

100 # If it's a type and should be modelled as a type, use __init__. 

101 func = func.__init__ 

102 # Skip the `self` argument in __init__ 

103 eat_self = True 

104 elif isinstance(func, (classmethod, staticmethod)): 

105 if isinstance(func, classmethod): 

106 # Skip the `cls` argument of a class method 

107 eat_self = True 

108 # Use the original decorated method to extract the correct function signature 

109 func = func.__func__ 

110 elif not isinstance(func, FunctionTypes): 

111 # If we really want to model an instance of the passed type, 

112 # __call__ should be looked up, not __init__. 

113 try: 

114 func = func.__call__ 

115 except AttributeError: 

116 return None 

117 if eat_self: 

118 sig_func = partial(func, None) 

119 else: 

120 sig_func = func 

121 try: 

122 return func, inspect.signature(sig_func) 

123 except ValueError: 

124 # Certain callable types are not supported by inspect.signature() 

125 return None 

126 

127 

128def _check_signature(func, mock, skipfirst, instance=False): 

129 sig = _get_signature_object(func, instance, skipfirst) 

130 if sig is None: 

131 return 

132 func, sig = sig 

133 def checksig(_mock_self, *args, **kwargs): 

134 sig.bind(*args, **kwargs) 

135 _copy_func_details(func, checksig) 

136 type(mock)._mock_check_sig = checksig 

137 type(mock).__signature__ = sig 

138 

139 

140def _copy_func_details(func, funcopy): 

141 # we explicitly don't copy func.__dict__ into this copy as it would 

142 # expose original attributes that should be mocked 

143 for attribute in ( 

144 '__name__', '__doc__', '__text_signature__', 

145 '__module__', '__defaults__', '__kwdefaults__', 

146 ): 

147 try: 

148 setattr(funcopy, attribute, getattr(func, attribute)) 

149 except AttributeError: 

150 pass 

151 

152 

153def _callable(obj): 

154 if isinstance(obj, type): 

155 return True 

156 if isinstance(obj, (staticmethod, classmethod, MethodType)): 

157 return _callable(obj.__func__) 

158 if getattr(obj, '__call__', None) is not None: 

159 return True 

160 return False 

161 

162 

163def _is_list(obj): 

164 # checks for list or tuples 

165 # XXXX badly named! 

166 return type(obj) in (list, tuple) 

167 

168 

169def _instance_callable(obj): 

170 """Given an object, return True if the object is callable. 

171 For classes, return True if instances would be callable.""" 

172 if not isinstance(obj, type): 

173 # already an instance 

174 return getattr(obj, '__call__', None) is not None 

175 

176 # *could* be broken by a class overriding __mro__ or __dict__ via 

177 # a metaclass 

178 for base in (obj,) + obj.__mro__: 

179 if base.__dict__.get('__call__') is not None: 

180 return True 

181 return False 

182 

183 

184def _set_signature(mock, original, instance=False): 

185 # creates a function with signature (*args, **kwargs) that delegates to a 

186 # mock. It still does signature checking by calling a lambda with the same 

187 # signature as the original. 

188 

189 skipfirst = isinstance(original, type) 

190 result = _get_signature_object(original, instance, skipfirst) 

191 if result is None: 

192 return mock 

193 func, sig = result 

194 def checksig(*args, **kwargs): 

195 sig.bind(*args, **kwargs) 

196 _copy_func_details(func, checksig) 

197 

198 name = original.__name__ 

199 if not name.isidentifier(): 

200 name = 'funcopy' 

201 context = {'_checksig_': checksig, 'mock': mock} 

202 src = """def %s(*args, **kwargs): 

203 _checksig_(*args, **kwargs) 

204 return mock(*args, **kwargs)""" % name 

205 exec (src, context) 

206 funcopy = context[name] 

207 _setup_func(funcopy, mock, sig) 

208 return funcopy 

209 

210 

211def _setup_func(funcopy, mock, sig): 

212 funcopy.mock = mock 

213 

214 def assert_called_with(*args, **kwargs): 

215 return mock.assert_called_with(*args, **kwargs) 

216 def assert_called(*args, **kwargs): 

217 return mock.assert_called(*args, **kwargs) 

218 def assert_not_called(*args, **kwargs): 

219 return mock.assert_not_called(*args, **kwargs) 

220 def assert_called_once(*args, **kwargs): 

221 return mock.assert_called_once(*args, **kwargs) 

222 def assert_called_once_with(*args, **kwargs): 

223 return mock.assert_called_once_with(*args, **kwargs) 

224 def assert_has_calls(*args, **kwargs): 

225 return mock.assert_has_calls(*args, **kwargs) 

226 def assert_any_call(*args, **kwargs): 

227 return mock.assert_any_call(*args, **kwargs) 

228 def reset_mock(): 

229 funcopy.method_calls = _CallList() 

230 funcopy.mock_calls = _CallList() 

231 mock.reset_mock() 

232 ret = funcopy.return_value 

233 if _is_instance_mock(ret) and not ret is mock: 

234 ret.reset_mock() 

235 

236 funcopy.called = False 

237 funcopy.call_count = 0 

238 funcopy.call_args = None 

239 funcopy.call_args_list = _CallList() 

240 funcopy.method_calls = _CallList() 

241 funcopy.mock_calls = _CallList() 

242 

243 funcopy.return_value = mock.return_value 

244 funcopy.side_effect = mock.side_effect 

245 funcopy._mock_children = mock._mock_children 

246 

247 funcopy.assert_called_with = assert_called_with 

248 funcopy.assert_called_once_with = assert_called_once_with 

249 funcopy.assert_has_calls = assert_has_calls 

250 funcopy.assert_any_call = assert_any_call 

251 funcopy.reset_mock = reset_mock 

252 funcopy.assert_called = assert_called 

253 funcopy.assert_not_called = assert_not_called 

254 funcopy.assert_called_once = assert_called_once 

255 funcopy.__signature__ = sig 

256 

257 mock._mock_delegate = funcopy 

258 

259 

260def _setup_async_mock(mock): 

261 mock._is_coroutine = asyncio.coroutines._is_coroutine 

262 mock.await_count = 0 

263 mock.await_args = None 

264 mock.await_args_list = _CallList() 

265 

266 # Mock is not configured yet so the attributes are set 

267 # to a function and then the corresponding mock helper function 

268 # is called when the helper is accessed similar to _setup_func. 

269 def wrapper(attr, *args, **kwargs): 

270 return getattr(mock.mock, attr)(*args, **kwargs) 

271 

272 for attribute in ('assert_awaited', 

273 'assert_awaited_once', 

274 'assert_awaited_with', 

275 'assert_awaited_once_with', 

276 'assert_any_await', 

277 'assert_has_awaits', 

278 'assert_not_awaited'): 

279 

280 # setattr(mock, attribute, wrapper) causes late binding 

281 # hence attribute will always be the last value in the loop 

282 # Use partial(wrapper, attribute) to ensure the attribute is bound 

283 # correctly. 

284 setattr(mock, attribute, partial(wrapper, attribute)) 

285 

286 

287def _is_magic(name): 

288 return '__%s__' % name[2:-2] == name 

289 

290 

291class _SentinelObject(object): 

292 "A unique, named, sentinel object." 

293 def __init__(self, name): 

294 self.name = name 

295 

296 def __repr__(self): 

297 return 'sentinel.%s' % self.name 

298 

299 def __reduce__(self): 

300 return 'sentinel.%s' % self.name 

301 

302 

303class _Sentinel(object): 

304 """Access attributes to return a named object, usable as a sentinel.""" 

305 def __init__(self): 

306 self._sentinels = {} 

307 

308 def __getattr__(self, name): 

309 if name == '__bases__': 

310 # Without this help(unittest.mock) raises an exception 

311 raise AttributeError 

312 return self._sentinels.setdefault(name, _SentinelObject(name)) 

313 

314 def __reduce__(self): 

315 return 'sentinel' 

316 

317 

318sentinel = _Sentinel() 

319 

320DEFAULT = sentinel.DEFAULT 

321_missing = sentinel.MISSING 

322_deleted = sentinel.DELETED 

323 

324 

325_allowed_names = { 

326 'return_value', '_mock_return_value', 'side_effect', 

327 '_mock_side_effect', '_mock_parent', '_mock_new_parent', 

328 '_mock_name', '_mock_new_name' 

329} 

330 

331 

332def _delegating_property(name): 

333 _allowed_names.add(name) 

334 _the_name = '_mock_' + name 

335 def _get(self, name=name, _the_name=_the_name): 

336 sig = self._mock_delegate 

337 if sig is None: 

338 return getattr(self, _the_name) 

339 return getattr(sig, name) 

340 def _set(self, value, name=name, _the_name=_the_name): 

341 sig = self._mock_delegate 

342 if sig is None: 

343 self.__dict__[_the_name] = value 

344 else: 

345 setattr(sig, name, value) 

346 

347 return property(_get, _set) 

348 

349 

350 

351class _CallList(list): 

352 

353 def __contains__(self, value): 

354 if not isinstance(value, list): 

355 return list.__contains__(self, value) 

356 len_value = len(value) 

357 len_self = len(self) 

358 if len_value > len_self: 

359 return False 

360 

361 for i in range(0, len_self - len_value + 1): 

362 sub_list = self[i:i+len_value] 

363 if sub_list == value: 

364 return True 

365 return False 

366 

367 def __repr__(self): 

368 return pprint.pformat(list(self)) 

369 

370 

371def _check_and_set_parent(parent, value, name, new_name): 

372 value = _extract_mock(value) 

373 

374 if not _is_instance_mock(value): 

375 return False 

376 if ((value._mock_name or value._mock_new_name) or 

377 (value._mock_parent is not None) or 

378 (value._mock_new_parent is not None)): 

379 return False 

380 

381 _parent = parent 

382 while _parent is not None: 

383 # setting a mock (value) as a child or return value of itself 

384 # should not modify the mock 

385 if _parent is value: 

386 return False 

387 _parent = _parent._mock_new_parent 

388 

389 if new_name: 

390 value._mock_new_parent = parent 

391 value._mock_new_name = new_name 

392 if name: 

393 value._mock_parent = parent 

394 value._mock_name = name 

395 return True 

396 

397# Internal class to identify if we wrapped an iterator object or not. 

398class _MockIter(object): 

399 def __init__(self, obj): 

400 self.obj = iter(obj) 

401 def __next__(self): 

402 return next(self.obj) 

403 

404class Base(object): 

405 _mock_return_value = DEFAULT 

406 _mock_side_effect = None 

407 def __init__(self, *args, **kwargs): 

408 pass 

409 

410 

411 

412class NonCallableMock(Base): 

413 """A non-callable version of `Mock`""" 

414 

415 # Store a mutex as a class attribute in order to protect concurrent access 

416 # to mock attributes. Using a class attribute allows all NonCallableMock 

417 # instances to share the mutex for simplicity. 

418 # 

419 # See https://github.com/python/cpython/issues/98624 for why this is 

420 # necessary. 

421 _lock = RLock() 

422 

423 def __new__( 

424 cls, spec=None, wraps=None, name=None, spec_set=None, 

425 parent=None, _spec_state=None, _new_name='', _new_parent=None, 

426 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs 

427 ): 

428 # every instance has its own class 

429 # so we can create magic methods on the 

430 # class without stomping on other mocks 

431 bases = (cls,) 

432 if not issubclass(cls, AsyncMockMixin): 

433 # Check if spec is an async object or function 

434 spec_arg = spec_set or spec 

435 if spec_arg is not None and _is_async_obj(spec_arg): 

436 bases = (AsyncMockMixin, cls) 

437 new = type(cls.__name__, bases, {'__doc__': cls.__doc__}) 

438 instance = _safe_super(NonCallableMock, cls).__new__(new) 

439 return instance 

440 

441 

442 def __init__( 

443 self, spec=None, wraps=None, name=None, spec_set=None, 

444 parent=None, _spec_state=None, _new_name='', _new_parent=None, 

445 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs 

446 ): 

447 if _new_parent is None: 

448 _new_parent = parent 

449 

450 __dict__ = self.__dict__ 

451 __dict__['_mock_parent'] = parent 

452 __dict__['_mock_name'] = name 

453 __dict__['_mock_new_name'] = _new_name 

454 __dict__['_mock_new_parent'] = _new_parent 

455 __dict__['_mock_sealed'] = False 

456 

457 if spec_set is not None: 

458 spec = spec_set 

459 spec_set = True 

460 if _eat_self is None: 

461 _eat_self = parent is not None 

462 

463 self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self) 

464 

465 __dict__['_mock_children'] = {} 

466 __dict__['_mock_wraps'] = wraps 

467 __dict__['_mock_delegate'] = None 

468 

469 __dict__['_mock_called'] = False 

470 __dict__['_mock_call_args'] = None 

471 __dict__['_mock_call_count'] = 0 

472 __dict__['_mock_call_args_list'] = _CallList() 

473 __dict__['_mock_mock_calls'] = _CallList() 

474 

475 __dict__['method_calls'] = _CallList() 

476 __dict__['_mock_unsafe'] = unsafe 

477 

478 if kwargs: 

479 self.configure_mock(**kwargs) 

480 

481 _safe_super(NonCallableMock, self).__init__( 

482 spec, wraps, name, spec_set, parent, 

483 _spec_state 

484 ) 

485 

486 

487 def attach_mock(self, mock, attribute): 

488 """ 

489 Attach a mock as an attribute of this one, replacing its name and 

490 parent. Calls to the attached mock will be recorded in the 

491 `method_calls` and `mock_calls` attributes of this one.""" 

492 inner_mock = _extract_mock(mock) 

493 

494 inner_mock._mock_parent = None 

495 inner_mock._mock_new_parent = None 

496 inner_mock._mock_name = '' 

497 inner_mock._mock_new_name = None 

498 

499 setattr(self, attribute, mock) 

500 

501 

502 def mock_add_spec(self, spec, spec_set=False): 

503 """Add a spec to a mock. `spec` can either be an object or a 

504 list of strings. Only attributes on the `spec` can be fetched as 

505 attributes from the mock. 

506 

507 If `spec_set` is True then only attributes on the spec can be set.""" 

508 self._mock_add_spec(spec, spec_set) 

509 

510 

511 def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False, 

512 _eat_self=False): 

513 if _is_instance_mock(spec): 

514 raise InvalidSpecError(f'Cannot spec a Mock object. [object={spec!r}]') 

515 

516 _spec_class = None 

517 _spec_signature = None 

518 _spec_asyncs = [] 

519 

520 if spec is not None and not _is_list(spec): 

521 if isinstance(spec, type): 

522 _spec_class = spec 

523 else: 

524 _spec_class = type(spec) 

525 res = _get_signature_object(spec, 

526 _spec_as_instance, _eat_self) 

527 _spec_signature = res and res[1] 

528 

529 spec_list = dir(spec) 

530 

531 for attr in spec_list: 

532 if iscoroutinefunction(getattr(spec, attr, None)): 

533 _spec_asyncs.append(attr) 

534 

535 spec = spec_list 

536 

537 __dict__ = self.__dict__ 

538 __dict__['_spec_class'] = _spec_class 

539 __dict__['_spec_set'] = spec_set 

540 __dict__['_spec_signature'] = _spec_signature 

541 __dict__['_mock_methods'] = spec 

542 __dict__['_spec_asyncs'] = _spec_asyncs 

543 

544 def __get_return_value(self): 

545 ret = self._mock_return_value 

546 if self._mock_delegate is not None: 

547 ret = self._mock_delegate.return_value 

548 

549 if ret is DEFAULT: 

550 ret = self._get_child_mock( 

551 _new_parent=self, _new_name='()' 

552 ) 

553 self.return_value = ret 

554 return ret 

555 

556 

557 def __set_return_value(self, value): 

558 if self._mock_delegate is not None: 

559 self._mock_delegate.return_value = value 

560 else: 

561 self._mock_return_value = value 

562 _check_and_set_parent(self, value, None, '()') 

563 

564 __return_value_doc = "The value to be returned when the mock is called." 

565 return_value = property(__get_return_value, __set_return_value, 

566 __return_value_doc) 

567 

568 

569 @property 

570 def __class__(self): 

571 if self._spec_class is None: 

572 return type(self) 

573 return self._spec_class 

574 

575 called = _delegating_property('called') 

576 call_count = _delegating_property('call_count') 

577 call_args = _delegating_property('call_args') 

578 call_args_list = _delegating_property('call_args_list') 

579 mock_calls = _delegating_property('mock_calls') 

580 

581 

582 def __get_side_effect(self): 

583 delegated = self._mock_delegate 

584 if delegated is None: 

585 return self._mock_side_effect 

586 sf = delegated.side_effect 

587 if (sf is not None and not callable(sf) 

588 and not isinstance(sf, _MockIter) and not _is_exception(sf)): 

589 sf = _MockIter(sf) 

590 delegated.side_effect = sf 

591 return sf 

592 

593 def __set_side_effect(self, value): 

594 value = _try_iter(value) 

595 delegated = self._mock_delegate 

596 if delegated is None: 

597 self._mock_side_effect = value 

598 else: 

599 delegated.side_effect = value 

600 

601 side_effect = property(__get_side_effect, __set_side_effect) 

602 

603 

604 def reset_mock(self, visited=None,*, return_value=False, side_effect=False): 

605 "Restore the mock object to its initial state." 

606 if visited is None: 

607 visited = [] 

608 if id(self) in visited: 

609 return 

610 visited.append(id(self)) 

611 

612 self.called = False 

613 self.call_args = None 

614 self.call_count = 0 

615 self.mock_calls = _CallList() 

616 self.call_args_list = _CallList() 

617 self.method_calls = _CallList() 

618 

619 if return_value: 

620 self._mock_return_value = DEFAULT 

621 if side_effect: 

622 self._mock_side_effect = None 

623 

624 for child in self._mock_children.values(): 

625 if isinstance(child, _SpecState) or child is _deleted: 

626 continue 

627 child.reset_mock(visited, return_value=return_value, side_effect=side_effect) 

628 

629 ret = self._mock_return_value 

630 if _is_instance_mock(ret) and ret is not self: 

631 ret.reset_mock(visited) 

632 

633 

634 def configure_mock(self, **kwargs): 

635 """Set attributes on the mock through keyword arguments. 

636 

637 Attributes plus return values and side effects can be set on child 

638 mocks using standard dot notation and unpacking a dictionary in the 

639 method call: 

640 

641 >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError} 

642 >>> mock.configure_mock(**attrs)""" 

643 for arg, val in sorted(kwargs.items(), 

644 # we sort on the number of dots so that 

645 # attributes are set before we set attributes on 

646 # attributes 

647 key=lambda entry: entry[0].count('.')): 

648 args = arg.split('.') 

649 final = args.pop() 

650 obj = self 

651 for entry in args: 

652 obj = getattr(obj, entry) 

653 setattr(obj, final, val) 

654 

655 

656 def __getattr__(self, name): 

657 if name in {'_mock_methods', '_mock_unsafe'}: 

658 raise AttributeError(name) 

659 elif self._mock_methods is not None: 

660 if name not in self._mock_methods or name in _all_magics: 

661 raise AttributeError("Mock object has no attribute %r" % name) 

662 elif _is_magic(name): 

663 raise AttributeError(name) 

664 if not self._mock_unsafe and (not self._mock_methods or name not in self._mock_methods): 

665 if name.startswith(('assert', 'assret', 'asert', 'aseert', 'assrt')) or name in _ATTRIB_DENY_LIST: 

666 raise AttributeError( 

667 f"{name!r} is not a valid assertion. Use a spec " 

668 f"for the mock if {name!r} is meant to be an attribute.") 

669 

670 with NonCallableMock._lock: 

671 result = self._mock_children.get(name) 

672 if result is _deleted: 

673 raise AttributeError(name) 

674 elif result is None: 

675 wraps = None 

676 if self._mock_wraps is not None: 

677 # XXXX should we get the attribute without triggering code 

678 # execution? 

679 wraps = getattr(self._mock_wraps, name) 

680 

681 result = self._get_child_mock( 

682 parent=self, name=name, wraps=wraps, _new_name=name, 

683 _new_parent=self 

684 ) 

685 self._mock_children[name] = result 

686 

687 elif isinstance(result, _SpecState): 

688 try: 

689 result = create_autospec( 

690 result.spec, result.spec_set, result.instance, 

691 result.parent, result.name 

692 ) 

693 except InvalidSpecError: 

694 target_name = self.__dict__['_mock_name'] or self 

695 raise InvalidSpecError( 

696 f'Cannot autospec attr {name!r} from target ' 

697 f'{target_name!r} as it has already been mocked out. ' 

698 f'[target={self!r}, attr={result.spec!r}]') 

699 self._mock_children[name] = result 

700 

701 return result 

702 

703 

704 def _extract_mock_name(self): 

705 _name_list = [self._mock_new_name] 

706 _parent = self._mock_new_parent 

707 last = self 

708 

709 dot = '.' 

710 if _name_list == ['()']: 

711 dot = '' 

712 

713 while _parent is not None: 

714 last = _parent 

715 

716 _name_list.append(_parent._mock_new_name + dot) 

717 dot = '.' 

718 if _parent._mock_new_name == '()': 

719 dot = '' 

720 

721 _parent = _parent._mock_new_parent 

722 

723 _name_list = list(reversed(_name_list)) 

724 _first = last._mock_name or 'mock' 

725 if len(_name_list) > 1: 

726 if _name_list[1] not in ('()', '().'): 

727 _first += '.' 

728 _name_list[0] = _first 

729 return ''.join(_name_list) 

730 

731 def __repr__(self): 

732 name = self._extract_mock_name() 

733 

734 name_string = '' 

735 if name not in ('mock', 'mock.'): 

736 name_string = ' name=%r' % name 

737 

738 spec_string = '' 

739 if self._spec_class is not None: 

740 spec_string = ' spec=%r' 

741 if self._spec_set: 

742 spec_string = ' spec_set=%r' 

743 spec_string = spec_string % self._spec_class.__name__ 

744 return "<%s%s%s id='%s'>" % ( 

745 type(self).__name__, 

746 name_string, 

747 spec_string, 

748 id(self) 

749 ) 

750 

751 

752 def __dir__(self): 

753 """Filter the output of `dir(mock)` to only useful members.""" 

754 if not FILTER_DIR: 

755 return object.__dir__(self) 

756 

757 extras = self._mock_methods or [] 

758 from_type = dir(type(self)) 

759 from_dict = list(self.__dict__) 

760 from_child_mocks = [ 

761 m_name for m_name, m_value in self._mock_children.items() 

762 if m_value is not _deleted] 

763 

764 from_type = [e for e in from_type if not e.startswith('_')] 

765 from_dict = [e for e in from_dict if not e.startswith('_') or 

766 _is_magic(e)] 

767 return sorted(set(extras + from_type + from_dict + from_child_mocks)) 

768 

769 

770 def __setattr__(self, name, value): 

771 if name in _allowed_names: 

772 # property setters go through here 

773 return object.__setattr__(self, name, value) 

774 elif (self._spec_set and self._mock_methods is not None and 

775 name not in self._mock_methods and 

776 name not in self.__dict__): 

777 raise AttributeError("Mock object has no attribute '%s'" % name) 

778 elif name in _unsupported_magics: 

779 msg = 'Attempting to set unsupported magic method %r.' % name 

780 raise AttributeError(msg) 

781 elif name in _all_magics: 

782 if self._mock_methods is not None and name not in self._mock_methods: 

783 raise AttributeError("Mock object has no attribute '%s'" % name) 

784 

785 if not _is_instance_mock(value): 

786 setattr(type(self), name, _get_method(name, value)) 

787 original = value 

788 value = lambda *args, **kw: original(self, *args, **kw) 

789 else: 

790 # only set _new_name and not name so that mock_calls is tracked 

791 # but not method calls 

792 _check_and_set_parent(self, value, None, name) 

793 setattr(type(self), name, value) 

794 self._mock_children[name] = value 

795 elif name == '__class__': 

796 self._spec_class = value 

797 return 

798 else: 

799 if _check_and_set_parent(self, value, name, name): 

800 self._mock_children[name] = value 

801 

802 if self._mock_sealed and not hasattr(self, name): 

803 mock_name = f'{self._extract_mock_name()}.{name}' 

804 raise AttributeError(f'Cannot set {mock_name}') 

805 

806 return object.__setattr__(self, name, value) 

807 

808 

809 def __delattr__(self, name): 

810 if name in _all_magics and name in type(self).__dict__: 

811 delattr(type(self), name) 

812 if name not in self.__dict__: 

813 # for magic methods that are still MagicProxy objects and 

814 # not set on the instance itself 

815 return 

816 

817 obj = self._mock_children.get(name, _missing) 

818 if name in self.__dict__: 

819 _safe_super(NonCallableMock, self).__delattr__(name) 

820 elif obj is _deleted: 

821 raise AttributeError(name) 

822 if obj is not _missing: 

823 del self._mock_children[name] 

824 self._mock_children[name] = _deleted 

825 

826 

827 def _format_mock_call_signature(self, args, kwargs): 

828 name = self._mock_name or 'mock' 

829 return _format_call_signature(name, args, kwargs) 

830 

831 

832 def _format_mock_failure_message(self, args, kwargs, action='call'): 

833 message = 'expected %s not found.\nExpected: %s\nActual: %s' 

834 expected_string = self._format_mock_call_signature(args, kwargs) 

835 call_args = self.call_args 

836 actual_string = self._format_mock_call_signature(*call_args) 

837 return message % (action, expected_string, actual_string) 

838 

839 

840 def _get_call_signature_from_name(self, name): 

841 """ 

842 * If call objects are asserted against a method/function like obj.meth1 

843 then there could be no name for the call object to lookup. Hence just 

844 return the spec_signature of the method/function being asserted against. 

845 * If the name is not empty then remove () and split by '.' to get 

846 list of names to iterate through the children until a potential 

847 match is found. A child mock is created only during attribute access 

848 so if we get a _SpecState then no attributes of the spec were accessed 

849 and can be safely exited. 

850 """ 

851 if not name: 

852 return self._spec_signature 

853 

854 sig = None 

855 names = name.replace('()', '').split('.') 

856 children = self._mock_children 

857 

858 for name in names: 

859 child = children.get(name) 

860 if child is None or isinstance(child, _SpecState): 

861 break 

862 else: 

863 # If an autospecced object is attached using attach_mock the 

864 # child would be a function with mock object as attribute from 

865 # which signature has to be derived. 

866 child = _extract_mock(child) 

867 children = child._mock_children 

868 sig = child._spec_signature 

869 

870 return sig 

871 

872 

873 def _call_matcher(self, _call): 

874 """ 

875 Given a call (or simply an (args, kwargs) tuple), return a 

876 comparison key suitable for matching with other calls. 

877 This is a best effort method which relies on the spec's signature, 

878 if available, or falls back on the arguments themselves. 

879 """ 

880 

881 if isinstance(_call, tuple) and len(_call) > 2: 

882 sig = self._get_call_signature_from_name(_call[0]) 

883 else: 

884 sig = self._spec_signature 

885 

886 if sig is not None: 

887 if len(_call) == 2: 

888 name = '' 

889 args, kwargs = _call 

890 else: 

891 name, args, kwargs = _call 

892 try: 

893 bound_call = sig.bind(*args, **kwargs) 

894 return call(name, bound_call.args, bound_call.kwargs) 

895 except TypeError as e: 

896 return e.with_traceback(None) 

897 else: 

898 return _call 

899 

900 def assert_not_called(_mock_self): 

901 """assert that the mock was never called. 

902 """ 

903 self = _mock_self 

904 if self.call_count != 0: 

905 msg = ("Expected '%s' to not have been called. Called %s times.%s" 

906 % (self._mock_name or 'mock', 

907 self.call_count, 

908 self._calls_repr())) 

909 raise AssertionError(msg) 

910 

911 def assert_called(_mock_self): 

912 """assert that the mock was called at least once 

913 """ 

914 self = _mock_self 

915 if self.call_count == 0: 

916 msg = ("Expected '%s' to have been called." % 

917 (self._mock_name or 'mock')) 

918 raise AssertionError(msg) 

919 

920 def assert_called_once(_mock_self): 

921 """assert that the mock was called only once. 

922 """ 

923 self = _mock_self 

924 if not self.call_count == 1: 

925 msg = ("Expected '%s' to have been called once. Called %s times.%s" 

926 % (self._mock_name or 'mock', 

927 self.call_count, 

928 self._calls_repr())) 

929 raise AssertionError(msg) 

930 

931 def assert_called_with(_mock_self, *args, **kwargs): 

932 """assert that the last call was made with the specified arguments. 

933 

934 Raises an AssertionError if the args and keyword args passed in are 

935 different to the last call to the mock.""" 

936 self = _mock_self 

937 if self.call_args is None: 

938 expected = self._format_mock_call_signature(args, kwargs) 

939 actual = 'not called.' 

940 error_message = ('expected call not found.\nExpected: %s\nActual: %s' 

941 % (expected, actual)) 

942 raise AssertionError(error_message) 

943 

944 def _error_message(): 

945 msg = self._format_mock_failure_message(args, kwargs) 

946 return msg 

947 expected = self._call_matcher(_Call((args, kwargs), two=True)) 

948 actual = self._call_matcher(self.call_args) 

949 if actual != expected: 

950 cause = expected if isinstance(expected, Exception) else None 

951 raise AssertionError(_error_message()) from cause 

952 

953 

954 def assert_called_once_with(_mock_self, *args, **kwargs): 

955 """assert that the mock was called exactly once and that that call was 

956 with the specified arguments.""" 

957 self = _mock_self 

958 if not self.call_count == 1: 

959 msg = ("Expected '%s' to be called once. Called %s times.%s" 

960 % (self._mock_name or 'mock', 

961 self.call_count, 

962 self._calls_repr())) 

963 raise AssertionError(msg) 

964 return self.assert_called_with(*args, **kwargs) 

965 

966 

967 def assert_has_calls(self, calls, any_order=False): 

968 """assert the mock has been called with the specified calls. 

969 The `mock_calls` list is checked for the calls. 

970 

971 If `any_order` is False (the default) then the calls must be 

972 sequential. There can be extra calls before or after the 

973 specified calls. 

974 

975 If `any_order` is True then the calls can be in any order, but 

976 they must all appear in `mock_calls`.""" 

977 expected = [self._call_matcher(c) for c in calls] 

978 cause = next((e for e in expected if isinstance(e, Exception)), None) 

979 all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls) 

980 if not any_order: 

981 if expected not in all_calls: 

982 if cause is None: 

983 problem = 'Calls not found.' 

984 else: 

985 problem = ('Error processing expected calls.\n' 

986 'Errors: {}').format( 

987 [e if isinstance(e, Exception) else None 

988 for e in expected]) 

989 raise AssertionError( 

990 f'{problem}\n' 

991 f'Expected: {_CallList(calls)}' 

992 f'{self._calls_repr(prefix="Actual").rstrip(".")}' 

993 ) from cause 

994 return 

995 

996 all_calls = list(all_calls) 

997 

998 not_found = [] 

999 for kall in expected: 

1000 try: 

1001 all_calls.remove(kall) 

1002 except ValueError: 

1003 not_found.append(kall) 

1004 if not_found: 

1005 raise AssertionError( 

1006 '%r does not contain all of %r in its call list, ' 

1007 'found %r instead' % (self._mock_name or 'mock', 

1008 tuple(not_found), all_calls) 

1009 ) from cause 

1010 

1011 

1012 def assert_any_call(self, *args, **kwargs): 

1013 """assert the mock has been called with the specified arguments. 

1014 

1015 The assert passes if the mock has *ever* been called, unlike 

1016 `assert_called_with` and `assert_called_once_with` that only pass if 

1017 the call is the most recent one.""" 

1018 expected = self._call_matcher(_Call((args, kwargs), two=True)) 

1019 cause = expected if isinstance(expected, Exception) else None 

1020 actual = [self._call_matcher(c) for c in self.call_args_list] 

1021 if cause or expected not in _AnyComparer(actual): 

1022 expected_string = self._format_mock_call_signature(args, kwargs) 

1023 raise AssertionError( 

1024 '%s call not found' % expected_string 

1025 ) from cause 

1026 

1027 

1028 def _get_child_mock(self, **kw): 

1029 """Create the child mocks for attributes and return value. 

1030 By default child mocks will be the same type as the parent. 

1031 Subclasses of Mock may want to override this to customize the way 

1032 child mocks are made. 

1033 

1034 For non-callable mocks the callable variant will be used (rather than 

1035 any custom subclass).""" 

1036 if self._mock_sealed: 

1037 attribute = f".{kw['name']}" if "name" in kw else "()" 

1038 mock_name = self._extract_mock_name() + attribute 

1039 raise AttributeError(mock_name) 

1040 

1041 _new_name = kw.get("_new_name") 

1042 if _new_name in self.__dict__['_spec_asyncs']: 

1043 return AsyncMock(**kw) 

1044 

1045 _type = type(self) 

1046 if issubclass(_type, MagicMock) and _new_name in _async_method_magics: 

1047 # Any asynchronous magic becomes an AsyncMock 

1048 klass = AsyncMock 

1049 elif issubclass(_type, AsyncMockMixin): 

1050 if (_new_name in _all_sync_magics or 

1051 self._mock_methods and _new_name in self._mock_methods): 

1052 # Any synchronous method on AsyncMock becomes a MagicMock 

1053 klass = MagicMock 

1054 else: 

1055 klass = AsyncMock 

1056 elif not issubclass(_type, CallableMixin): 

1057 if issubclass(_type, NonCallableMagicMock): 

1058 klass = MagicMock 

1059 elif issubclass(_type, NonCallableMock): 

1060 klass = Mock 

1061 else: 

1062 klass = _type.__mro__[1] 

1063 return klass(**kw) 

1064 

1065 

1066 def _calls_repr(self, prefix="Calls"): 

1067 """Renders self.mock_calls as a string. 

1068 

1069 Example: "\nCalls: [call(1), call(2)]." 

1070 

1071 If self.mock_calls is empty, an empty string is returned. The 

1072 output will be truncated if very long. 

1073 """ 

1074 if not self.mock_calls: 

1075 return "" 

1076 return f"\n{prefix}: {safe_repr(self.mock_calls)}." 

1077 

1078 

1079try: 

1080 removeprefix = str.removeprefix 

1081except AttributeError: 

1082 # Py 3.8 and earlier: 

1083 def removeprefix(name, prefix): 

1084 return name[len(prefix):] 

1085 

1086# Denylist for forbidden attribute names in safe mode 

1087_ATTRIB_DENY_LIST = frozenset({ 

1088 removeprefix(name, "assert_") 

1089 for name in dir(NonCallableMock) 

1090 if name.startswith("assert_") 

1091}) 

1092 

1093 

1094class _AnyComparer(list): 

1095 """A list which checks if it contains a call which may have an 

1096 argument of ANY, flipping the components of item and self from 

1097 their traditional locations so that ANY is guaranteed to be on 

1098 the left.""" 

1099 def __contains__(self, item): 

1100 for _call in self: 

1101 assert len(item) == len(_call) 

1102 if all([ 

1103 expected == actual 

1104 for expected, actual in zip(item, _call) 

1105 ]): 

1106 return True 

1107 return False 

1108 

1109 

1110def _try_iter(obj): 

1111 if obj is None: 

1112 return obj 

1113 if _is_exception(obj): 

1114 return obj 

1115 if _callable(obj): 

1116 return obj 

1117 try: 

1118 return iter(obj) 

1119 except TypeError: 

1120 # XXXX backwards compatibility 

1121 # but this will blow up on first call - so maybe we should fail early? 

1122 return obj 

1123 

1124 

1125class CallableMixin(Base): 

1126 

1127 def __init__(self, spec=None, side_effect=None, return_value=DEFAULT, 

1128 wraps=None, name=None, spec_set=None, parent=None, 

1129 _spec_state=None, _new_name='', _new_parent=None, **kwargs): 

1130 self.__dict__['_mock_return_value'] = return_value 

1131 _safe_super(CallableMixin, self).__init__( 

1132 spec, wraps, name, spec_set, parent, 

1133 _spec_state, _new_name, _new_parent, **kwargs 

1134 ) 

1135 

1136 self.side_effect = side_effect 

1137 

1138 

1139 def _mock_check_sig(self, *args, **kwargs): 

1140 # stub method that can be replaced with one with a specific signature 

1141 pass 

1142 

1143 

1144 def __call__(_mock_self, *args, **kwargs): 

1145 # can't use self in-case a function / method we are mocking uses self 

1146 # in the signature 

1147 _mock_self._mock_check_sig(*args, **kwargs) 

1148 _mock_self._increment_mock_call(*args, **kwargs) 

1149 return _mock_self._mock_call(*args, **kwargs) 

1150 

1151 

1152 def _mock_call(_mock_self, *args, **kwargs): 

1153 return _mock_self._execute_mock_call(*args, **kwargs) 

1154 

1155 def _increment_mock_call(_mock_self, *args, **kwargs): 

1156 self = _mock_self 

1157 self.called = True 

1158 self.call_count += 1 

1159 

1160 # handle call_args 

1161 # needs to be set here so assertions on call arguments pass before 

1162 # execution in the case of awaited calls 

1163 _call = _Call((args, kwargs), two=True) 

1164 self.call_args = _call 

1165 self.call_args_list.append(_call) 

1166 

1167 # initial stuff for method_calls: 

1168 do_method_calls = self._mock_parent is not None 

1169 method_call_name = self._mock_name 

1170 

1171 # initial stuff for mock_calls: 

1172 mock_call_name = self._mock_new_name 

1173 is_a_call = mock_call_name == '()' 

1174 self.mock_calls.append(_Call(('', args, kwargs))) 

1175 

1176 # follow up the chain of mocks: 

1177 _new_parent = self._mock_new_parent 

1178 while _new_parent is not None: 

1179 

1180 # handle method_calls: 

1181 if do_method_calls: 

1182 _new_parent.method_calls.append(_Call((method_call_name, args, kwargs))) 

1183 do_method_calls = _new_parent._mock_parent is not None 

1184 if do_method_calls: 

1185 method_call_name = _new_parent._mock_name + '.' + method_call_name 

1186 

1187 # handle mock_calls: 

1188 this_mock_call = _Call((mock_call_name, args, kwargs)) 

1189 _new_parent.mock_calls.append(this_mock_call) 

1190 

1191 if _new_parent._mock_new_name: 

1192 if is_a_call: 

1193 dot = '' 

1194 else: 

1195 dot = '.' 

1196 is_a_call = _new_parent._mock_new_name == '()' 

1197 mock_call_name = _new_parent._mock_new_name + dot + mock_call_name 

1198 

1199 # follow the parental chain: 

1200 _new_parent = _new_parent._mock_new_parent 

1201 

1202 def _execute_mock_call(_mock_self, *args, **kwargs): 

1203 self = _mock_self 

1204 # separate from _increment_mock_call so that awaited functions are 

1205 # executed separately from their call, also AsyncMock overrides this method 

1206 

1207 effect = self.side_effect 

1208 if effect is not None: 

1209 if _is_exception(effect): 

1210 raise effect 

1211 elif not _callable(effect): 

1212 result = next(effect) 

1213 if _is_exception(result): 

1214 raise result 

1215 else: 

1216 result = effect(*args, **kwargs) 

1217 

1218 if result is not DEFAULT: 

1219 return result 

1220 

1221 if self._mock_return_value is not DEFAULT: 

1222 return self.return_value 

1223 

1224 if self._mock_wraps is not None: 

1225 return self._mock_wraps(*args, **kwargs) 

1226 

1227 return self.return_value 

1228 

1229 

1230 

1231class Mock(CallableMixin, NonCallableMock): 

1232 """ 

1233 Create a new `Mock` object. `Mock` takes several optional arguments 

1234 that specify the behaviour of the Mock object: 

1235 

1236 * `spec`: This can be either a list of strings or an existing object (a 

1237 class or instance) that acts as the specification for the mock object. If 

1238 you pass in an object then a list of strings is formed by calling dir on 

1239 the object (excluding unsupported magic attributes and methods). Accessing 

1240 any attribute not in this list will raise an `AttributeError`. 

1241 

1242 If `spec` is an object (rather than a list of strings) then 

1243 `mock.__class__` returns the class of the spec object. This allows mocks 

1244 to pass `isinstance` tests. 

1245 

1246 * `spec_set`: A stricter variant of `spec`. If used, attempting to *set* 

1247 or get an attribute on the mock that isn't on the object passed as 

1248 `spec_set` will raise an `AttributeError`. 

1249 

1250 * `side_effect`: A function to be called whenever the Mock is called. See 

1251 the `side_effect` attribute. Useful for raising exceptions or 

1252 dynamically changing return values. The function is called with the same 

1253 arguments as the mock, and unless it returns `DEFAULT`, the return 

1254 value of this function is used as the return value. 

1255 

1256 If `side_effect` is an iterable then each call to the mock will return 

1257 the next value from the iterable. If any of the members of the iterable 

1258 are exceptions they will be raised instead of returned. 

1259 

1260 * `return_value`: The value returned when the mock is called. By default 

1261 this is a new Mock (created on first access). See the 

1262 `return_value` attribute. 

1263 

1264 * `unsafe`: By default, accessing any attribute whose name starts with 

1265 *assert*, *assret*, *asert*, *aseert*, or *assrt* raises an AttributeError. 

1266 Additionally, an AttributeError is raised when accessing 

1267 attributes that match the name of an assertion method without the prefix 

1268 `assert_`, e.g. accessing `called_once` instead of `assert_called_once`. 

1269 Passing `unsafe=True` will allow access to these attributes. 

1270 

1271 * `wraps`: Item for the mock object to wrap. If `wraps` is not None then 

1272 calling the Mock will pass the call through to the wrapped object 

1273 (returning the real result). Attribute access on the mock will return a 

1274 Mock object that wraps the corresponding attribute of the wrapped object 

1275 (so attempting to access an attribute that doesn't exist will raise an 

1276 `AttributeError`). 

1277 

1278 If the mock has an explicit `return_value` set then calls are not passed 

1279 to the wrapped object and the `return_value` is returned instead. 

1280 

1281 * `name`: If the mock has a name then it will be used in the repr of the 

1282 mock. This can be useful for debugging. The name is propagated to child 

1283 mocks. 

1284 

1285 Mocks can also be called with arbitrary keyword arguments. These will be 

1286 used to set attributes on the mock after it is created. 

1287 """ 

1288 

1289 

1290def _dot_lookup(thing, comp, import_path): 

1291 try: 

1292 return getattr(thing, comp) 

1293 except AttributeError: 

1294 __import__(import_path) 

1295 return getattr(thing, comp) 

1296 

1297 

1298def _importer(target): 

1299 components = target.split('.') 

1300 import_path = components.pop(0) 

1301 thing = __import__(import_path) 

1302 

1303 for comp in components: 

1304 import_path += ".%s" % comp 

1305 thing = _dot_lookup(thing, comp, import_path) 

1306 return thing 

1307 

1308 

1309# _check_spec_arg_typos takes kwargs from commands like patch and checks that 

1310# they don't contain common misspellings of arguments related to autospeccing. 

1311def _check_spec_arg_typos(kwargs_to_check): 

1312 typos = ("autospect", "auto_spec", "set_spec") 

1313 for typo in typos: 

1314 if typo in kwargs_to_check: 

1315 raise RuntimeError( 

1316 f"{typo!r} might be a typo; use unsafe=True if this is intended" 

1317 ) 

1318 

1319 

1320class _patch(object): 

1321 

1322 attribute_name = None 

1323 _active_patches = [] 

1324 

1325 def __init__( 

1326 self, getter, attribute, new, spec, create, 

1327 spec_set, autospec, new_callable, kwargs, *, unsafe=False 

1328 ): 

1329 if new_callable is not None: 

1330 if new is not DEFAULT: 

1331 raise ValueError( 

1332 "Cannot use 'new' and 'new_callable' together" 

1333 ) 

1334 if autospec is not None: 

1335 raise ValueError( 

1336 "Cannot use 'autospec' and 'new_callable' together" 

1337 ) 

1338 if not unsafe: 

1339 _check_spec_arg_typos(kwargs) 

1340 if _is_instance_mock(spec): 

1341 raise InvalidSpecError( 

1342 f'Cannot spec attr {attribute!r} as the spec ' 

1343 f'has already been mocked out. [spec={spec!r}]') 

1344 if _is_instance_mock(spec_set): 

1345 raise InvalidSpecError( 

1346 f'Cannot spec attr {attribute!r} as the spec_set ' 

1347 f'target has already been mocked out. [spec_set={spec_set!r}]') 

1348 

1349 self.getter = getter 

1350 self.attribute = attribute 

1351 self.new = new 

1352 self.new_callable = new_callable 

1353 self.spec = spec 

1354 self.create = create 

1355 self.has_local = False 

1356 self.spec_set = spec_set 

1357 self.autospec = autospec 

1358 self.kwargs = kwargs 

1359 self.additional_patchers = [] 

1360 

1361 

1362 def copy(self): 

1363 patcher = _patch( 

1364 self.getter, self.attribute, self.new, self.spec, 

1365 self.create, self.spec_set, 

1366 self.autospec, self.new_callable, self.kwargs 

1367 ) 

1368 patcher.attribute_name = self.attribute_name 

1369 patcher.additional_patchers = [ 

1370 p.copy() for p in self.additional_patchers 

1371 ] 

1372 return patcher 

1373 

1374 

1375 def __call__(self, func): 

1376 if isinstance(func, type): 

1377 return self.decorate_class(func) 

1378 if inspect.iscoroutinefunction(func): 

1379 return self.decorate_async_callable(func) 

1380 return self.decorate_callable(func) 

1381 

1382 

1383 def decorate_class(self, klass): 

1384 for attr in dir(klass): 

1385 if not attr.startswith(patch.TEST_PREFIX): 

1386 continue 

1387 

1388 attr_value = getattr(klass, attr) 

1389 if not hasattr(attr_value, "__call__"): 

1390 continue 

1391 

1392 patcher = self.copy() 

1393 setattr(klass, attr, patcher(attr_value)) 

1394 return klass 

1395 

1396 

1397 @contextlib.contextmanager 

1398 def decoration_helper(self, patched, args, keywargs): 

1399 extra_args = [] 

1400 with contextlib.ExitStack() as exit_stack: 

1401 for patching in patched.patchings: 

1402 arg = exit_stack.enter_context(patching) 

1403 if patching.attribute_name is not None: 

1404 keywargs.update(arg) 

1405 elif patching.new is DEFAULT: 

1406 extra_args.append(arg) 

1407 

1408 args += tuple(extra_args) 

1409 yield (args, keywargs) 

1410 

1411 

1412 def decorate_callable(self, func): 

1413 # NB. Keep the method in sync with decorate_async_callable() 

1414 if hasattr(func, 'patchings'): 

1415 func.patchings.append(self) 

1416 return func 

1417 

1418 @wraps(func) 

1419 def patched(*args, **keywargs): 

1420 with self.decoration_helper(patched, 

1421 args, 

1422 keywargs) as (newargs, newkeywargs): 

1423 return func(*newargs, **newkeywargs) 

1424 

1425 patched.patchings = [self] 

1426 return patched 

1427 

1428 

1429 def decorate_async_callable(self, func): 

1430 # NB. Keep the method in sync with decorate_callable() 

1431 if hasattr(func, 'patchings'): 

1432 func.patchings.append(self) 

1433 return func 

1434 

1435 @wraps(func) 

1436 async def patched(*args, **keywargs): 

1437 with self.decoration_helper(patched, 

1438 args, 

1439 keywargs) as (newargs, newkeywargs): 

1440 return await func(*newargs, **newkeywargs) 

1441 

1442 patched.patchings = [self] 

1443 return patched 

1444 

1445 

1446 def get_original(self): 

1447 target = self.getter() 

1448 name = self.attribute 

1449 

1450 original = DEFAULT 

1451 local = False 

1452 

1453 try: 

1454 original = target.__dict__[name] 

1455 except (AttributeError, KeyError): 

1456 original = getattr(target, name, DEFAULT) 

1457 else: 

1458 local = True 

1459 

1460 if name in _builtins and isinstance(target, ModuleType): 

1461 self.create = True 

1462 

1463 if not self.create and original is DEFAULT: 

1464 raise AttributeError( 

1465 "%s does not have the attribute %r" % (target, name) 

1466 ) 

1467 return original, local 

1468 

1469 

1470 def __enter__(self): 

1471 """Perform the patch.""" 

1472 new, spec, spec_set = self.new, self.spec, self.spec_set 

1473 autospec, kwargs = self.autospec, self.kwargs 

1474 new_callable = self.new_callable 

1475 self.target = self.getter() 

1476 

1477 # normalise False to None 

1478 if spec is False: 

1479 spec = None 

1480 if spec_set is False: 

1481 spec_set = None 

1482 if autospec is False: 

1483 autospec = None 

1484 

1485 if spec is not None and autospec is not None: 

1486 raise TypeError("Can't specify spec and autospec") 

1487 if ((spec is not None or autospec is not None) and 

1488 spec_set not in (True, None)): 

1489 raise TypeError("Can't provide explicit spec_set *and* spec or autospec") 

1490 

1491 original, local = self.get_original() 

1492 

1493 if new is DEFAULT and autospec is None: 

1494 inherit = False 

1495 if spec is True: 

1496 # set spec to the object we are replacing 

1497 spec = original 

1498 if spec_set is True: 

1499 spec_set = original 

1500 spec = None 

1501 elif spec is not None: 

1502 if spec_set is True: 

1503 spec_set = spec 

1504 spec = None 

1505 elif spec_set is True: 

1506 spec_set = original 

1507 

1508 if spec is not None or spec_set is not None: 

1509 if original is DEFAULT: 

1510 raise TypeError("Can't use 'spec' with create=True") 

1511 if isinstance(original, type): 

1512 # If we're patching out a class and there is a spec 

1513 inherit = True 

1514 if spec is None and _is_async_obj(original): 

1515 Klass = AsyncMock 

1516 else: 

1517 Klass = MagicMock 

1518 _kwargs = {} 

1519 if new_callable is not None: 

1520 Klass = new_callable 

1521 elif spec is not None or spec_set is not None: 

1522 this_spec = spec 

1523 if spec_set is not None: 

1524 this_spec = spec_set 

1525 if _is_list(this_spec): 

1526 not_callable = '__call__' not in this_spec 

1527 else: 

1528 not_callable = not callable(this_spec) 

1529 if _is_async_obj(this_spec): 

1530 Klass = AsyncMock 

1531 elif not_callable: 

1532 Klass = NonCallableMagicMock 

1533 

1534 if spec is not None: 

1535 _kwargs['spec'] = spec 

1536 if spec_set is not None: 

1537 _kwargs['spec_set'] = spec_set 

1538 

1539 # add a name to mocks 

1540 if (isinstance(Klass, type) and 

1541 issubclass(Klass, NonCallableMock) and self.attribute): 

1542 _kwargs['name'] = self.attribute 

1543 

1544 _kwargs.update(kwargs) 

1545 new = Klass(**_kwargs) 

1546 

1547 if inherit and _is_instance_mock(new): 

1548 # we can only tell if the instance should be callable if the 

1549 # spec is not a list 

1550 this_spec = spec 

1551 if spec_set is not None: 

1552 this_spec = spec_set 

1553 if (not _is_list(this_spec) and not 

1554 _instance_callable(this_spec)): 

1555 Klass = NonCallableMagicMock 

1556 

1557 _kwargs.pop('name') 

1558 new.return_value = Klass(_new_parent=new, _new_name='()', 

1559 **_kwargs) 

1560 elif autospec is not None: 

1561 # spec is ignored, new *must* be default, spec_set is treated 

1562 # as a boolean. Should we check spec is not None and that spec_set 

1563 # is a bool? 

1564 if new is not DEFAULT: 

1565 raise TypeError( 

1566 "autospec creates the mock for you. Can't specify " 

1567 "autospec and new." 

1568 ) 

1569 if original is DEFAULT: 

1570 raise TypeError("Can't use 'autospec' with create=True") 

1571 spec_set = bool(spec_set) 

1572 if autospec is True: 

1573 autospec = original 

1574 

1575 if _is_instance_mock(self.target): 

1576 raise InvalidSpecError( 

1577 f'Cannot autospec attr {self.attribute!r} as the patch ' 

1578 f'target has already been mocked out. ' 

1579 f'[target={self.target!r}, attr={autospec!r}]') 

1580 if _is_instance_mock(autospec): 

1581 target_name = getattr(self.target, '__name__', self.target) 

1582 raise InvalidSpecError( 

1583 f'Cannot autospec attr {self.attribute!r} from target ' 

1584 f'{target_name!r} as it has already been mocked out. ' 

1585 f'[target={self.target!r}, attr={autospec!r}]') 

1586 

1587 new = create_autospec(autospec, spec_set=spec_set, 

1588 _name=self.attribute, **kwargs) 

1589 elif kwargs: 

1590 # can't set keyword args when we aren't creating the mock 

1591 # XXXX If new is a Mock we could call new.configure_mock(**kwargs) 

1592 raise TypeError("Can't pass kwargs to a mock we aren't creating") 

1593 

1594 new_attr = new 

1595 

1596 self.temp_original = original 

1597 self.is_local = local 

1598 self._exit_stack = contextlib.ExitStack() 

1599 try: 

1600 setattr(self.target, self.attribute, new_attr) 

1601 if self.attribute_name is not None: 

1602 extra_args = {} 

1603 if self.new is DEFAULT: 

1604 extra_args[self.attribute_name] = new 

1605 for patching in self.additional_patchers: 

1606 arg = self._exit_stack.enter_context(patching) 

1607 if patching.new is DEFAULT: 

1608 extra_args.update(arg) 

1609 return extra_args 

1610 

1611 return new 

1612 except: 

1613 if not self.__exit__(*sys.exc_info()): 

1614 raise 

1615 

1616 def __exit__(self, *exc_info): 

1617 """Undo the patch.""" 

1618 if self.is_local and self.temp_original is not DEFAULT: 

1619 setattr(self.target, self.attribute, self.temp_original) 

1620 else: 

1621 delattr(self.target, self.attribute) 

1622 if not self.create and (not hasattr(self.target, self.attribute) or 

1623 self.attribute in ('__doc__', '__module__', 

1624 '__defaults__', '__annotations__', 

1625 '__kwdefaults__')): 

1626 # needed for proxy objects like django settings 

1627 setattr(self.target, self.attribute, self.temp_original) 

1628 

1629 del self.temp_original 

1630 del self.is_local 

1631 del self.target 

1632 exit_stack = self._exit_stack 

1633 del self._exit_stack 

1634 return exit_stack.__exit__(*exc_info) 

1635 

1636 

1637 def start(self): 

1638 """Activate a patch, returning any created mock.""" 

1639 result = self.__enter__() 

1640 self._active_patches.append(self) 

1641 return result 

1642 

1643 

1644 def stop(self): 

1645 """Stop an active patch.""" 

1646 try: 

1647 self._active_patches.remove(self) 

1648 except ValueError: 

1649 # If the patch hasn't been started this will fail 

1650 return None 

1651 

1652 return self.__exit__(None, None, None) 

1653 

1654 

1655 

1656def _get_target(target): 

1657 try: 

1658 target, attribute = target.rsplit('.', 1) 

1659 except (TypeError, ValueError, AttributeError): 

1660 raise TypeError( 

1661 f"Need a valid target to patch. You supplied: {target!r}") 

1662 getter = lambda: _importer(target) 

1663 return getter, attribute 

1664 

1665 

1666def _patch_object( 

1667 target, attribute, new=DEFAULT, spec=None, 

1668 create=False, spec_set=None, autospec=None, 

1669 new_callable=None, *, unsafe=False, **kwargs 

1670 ): 

1671 """ 

1672 patch the named member (`attribute`) on an object (`target`) with a mock 

1673 object. 

1674 

1675 `patch.object` can be used as a decorator, class decorator or a context 

1676 manager. Arguments `new`, `spec`, `create`, `spec_set`, 

1677 `autospec` and `new_callable` have the same meaning as for `patch`. Like 

1678 `patch`, `patch.object` takes arbitrary keyword arguments for configuring 

1679 the mock object it creates. 

1680 

1681 When used as a class decorator `patch.object` honours `patch.TEST_PREFIX` 

1682 for choosing which methods to wrap. 

1683 """ 

1684 if type(target) is str: 

1685 raise TypeError( 

1686 f"{target!r} must be the actual object to be patched, not a str" 

1687 ) 

1688 getter = lambda: target 

1689 return _patch( 

1690 getter, attribute, new, spec, create, 

1691 spec_set, autospec, new_callable, kwargs, unsafe=unsafe 

1692 ) 

1693 

1694 

1695def _patch_multiple(target, spec=None, create=False, spec_set=None, 

1696 autospec=None, new_callable=None, **kwargs): 

1697 """Perform multiple patches in a single call. It takes the object to be 

1698 patched (either as an object or a string to fetch the object by importing) 

1699 and keyword arguments for the patches:: 

1700 

1701 with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'): 

1702 ... 

1703 

1704 Use `DEFAULT` as the value if you want `patch.multiple` to create 

1705 mocks for you. In this case the created mocks are passed into a decorated 

1706 function by keyword, and a dictionary is returned when `patch.multiple` is 

1707 used as a context manager. 

1708 

1709 `patch.multiple` can be used as a decorator, class decorator or a context 

1710 manager. The arguments `spec`, `spec_set`, `create`, 

1711 `autospec` and `new_callable` have the same meaning as for `patch`. These 

1712 arguments will be applied to *all* patches done by `patch.multiple`. 

1713 

1714 When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX` 

1715 for choosing which methods to wrap. 

1716 """ 

1717 if type(target) is str: 

1718 getter = lambda: _importer(target) 

1719 else: 

1720 getter = lambda: target 

1721 

1722 if not kwargs: 

1723 raise ValueError( 

1724 'Must supply at least one keyword argument with patch.multiple' 

1725 ) 

1726 # need to wrap in a list for python 3, where items is a view 

1727 items = list(kwargs.items()) 

1728 attribute, new = items[0] 

1729 patcher = _patch( 

1730 getter, attribute, new, spec, create, spec_set, 

1731 autospec, new_callable, {} 

1732 ) 

1733 patcher.attribute_name = attribute 

1734 for attribute, new in items[1:]: 

1735 this_patcher = _patch( 

1736 getter, attribute, new, spec, create, spec_set, 

1737 autospec, new_callable, {} 

1738 ) 

1739 this_patcher.attribute_name = attribute 

1740 patcher.additional_patchers.append(this_patcher) 

1741 return patcher 

1742 

1743 

1744def patch( 

1745 target, new=DEFAULT, spec=None, create=False, 

1746 spec_set=None, autospec=None, new_callable=None, *, unsafe=False, **kwargs 

1747 ): 

1748 """ 

1749 `patch` acts as a function decorator, class decorator or a context 

1750 manager. Inside the body of the function or with statement, the `target` 

1751 is patched with a `new` object. When the function/with statement exits 

1752 the patch is undone. 

1753 

1754 If `new` is omitted, then the target is replaced with an 

1755 `AsyncMock if the patched object is an async function or a 

1756 `MagicMock` otherwise. If `patch` is used as a decorator and `new` is 

1757 omitted, the created mock is passed in as an extra argument to the 

1758 decorated function. If `patch` is used as a context manager the created 

1759 mock is returned by the context manager. 

1760 

1761 `target` should be a string in the form `'package.module.ClassName'`. The 

1762 `target` is imported and the specified object replaced with the `new` 

1763 object, so the `target` must be importable from the environment you are 

1764 calling `patch` from. The target is imported when the decorated function 

1765 is executed, not at decoration time. 

1766 

1767 The `spec` and `spec_set` keyword arguments are passed to the `MagicMock` 

1768 if patch is creating one for you. 

1769 

1770 In addition you can pass `spec=True` or `spec_set=True`, which causes 

1771 patch to pass in the object being mocked as the spec/spec_set object. 

1772 

1773 `new_callable` allows you to specify a different class, or callable object, 

1774 that will be called to create the `new` object. By default `AsyncMock` is 

1775 used for async functions and `MagicMock` for the rest. 

1776 

1777 A more powerful form of `spec` is `autospec`. If you set `autospec=True` 

1778 then the mock will be created with a spec from the object being replaced. 

1779 All attributes of the mock will also have the spec of the corresponding 

1780 attribute of the object being replaced. Methods and functions being 

1781 mocked will have their arguments checked and will raise a `TypeError` if 

1782 they are called with the wrong signature. For mocks replacing a class, 

1783 their return value (the 'instance') will have the same spec as the class. 

1784 

1785 Instead of `autospec=True` you can pass `autospec=some_object` to use an 

1786 arbitrary object as the spec instead of the one being replaced. 

1787 

1788 By default `patch` will fail to replace attributes that don't exist. If 

1789 you pass in `create=True`, and the attribute doesn't exist, patch will 

1790 create the attribute for you when the patched function is called, and 

1791 delete it again afterwards. This is useful for writing tests against 

1792 attributes that your production code creates at runtime. It is off by 

1793 default because it can be dangerous. With it switched on you can write 

1794 passing tests against APIs that don't actually exist! 

1795 

1796 Patch can be used as a `TestCase` class decorator. It works by 

1797 decorating each test method in the class. This reduces the boilerplate 

1798 code when your test methods share a common patchings set. `patch` finds 

1799 tests by looking for method names that start with `patch.TEST_PREFIX`. 

1800 By default this is `test`, which matches the way `unittest` finds tests. 

1801 You can specify an alternative prefix by setting `patch.TEST_PREFIX`. 

1802 

1803 Patch can be used as a context manager, with the with statement. Here the 

1804 patching applies to the indented block after the with statement. If you 

1805 use "as" then the patched object will be bound to the name after the 

1806 "as"; very useful if `patch` is creating a mock object for you. 

1807 

1808 Patch will raise a `RuntimeError` if passed some common misspellings of 

1809 the arguments autospec and spec_set. Pass the argument `unsafe` with the 

1810 value True to disable that check. 

1811 

1812 `patch` takes arbitrary keyword arguments. These will be passed to 

1813 `AsyncMock` if the patched object is asynchronous, to `MagicMock` 

1814 otherwise or to `new_callable` if specified. 

1815 

1816 `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are 

1817 available for alternate use-cases. 

1818 """ 

1819 getter, attribute = _get_target(target) 

1820 return _patch( 

1821 getter, attribute, new, spec, create, 

1822 spec_set, autospec, new_callable, kwargs, unsafe=unsafe 

1823 ) 

1824 

1825 

1826class _patch_dict(object): 

1827 """ 

1828 Patch a dictionary, or dictionary like object, and restore the dictionary 

1829 to its original state after the test. 

1830 

1831 `in_dict` can be a dictionary or a mapping like container. If it is a 

1832 mapping then it must at least support getting, setting and deleting items 

1833 plus iterating over keys. 

1834 

1835 `in_dict` can also be a string specifying the name of the dictionary, which 

1836 will then be fetched by importing it. 

1837 

1838 `values` can be a dictionary of values to set in the dictionary. `values` 

1839 can also be an iterable of `(key, value)` pairs. 

1840 

1841 If `clear` is True then the dictionary will be cleared before the new 

1842 values are set. 

1843 

1844 `patch.dict` can also be called with arbitrary keyword arguments to set 

1845 values in the dictionary:: 

1846 

1847 with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()): 

1848 ... 

1849 

1850 `patch.dict` can be used as a context manager, decorator or class 

1851 decorator. When used as a class decorator `patch.dict` honours 

1852 `patch.TEST_PREFIX` for choosing which methods to wrap. 

1853 """ 

1854 

1855 def __init__(self, in_dict, values=(), clear=False, **kwargs): 

1856 self.in_dict = in_dict 

1857 # support any argument supported by dict(...) constructor 

1858 self.values = dict(values) 

1859 self.values.update(kwargs) 

1860 self.clear = clear 

1861 self._original = None 

1862 

1863 

1864 def __call__(self, f): 

1865 if isinstance(f, type): 

1866 return self.decorate_class(f) 

1867 if inspect.iscoroutinefunction(f): 

1868 return self.decorate_async_callable(f) 

1869 return self.decorate_callable(f) 

1870 

1871 

1872 def decorate_callable(self, f): 

1873 @wraps(f) 

1874 def _inner(*args, **kw): 

1875 self._patch_dict() 

1876 try: 

1877 return f(*args, **kw) 

1878 finally: 

1879 self._unpatch_dict() 

1880 

1881 return _inner 

1882 

1883 

1884 def decorate_async_callable(self, f): 

1885 @wraps(f) 

1886 async def _inner(*args, **kw): 

1887 self._patch_dict() 

1888 try: 

1889 return await f(*args, **kw) 

1890 finally: 

1891 self._unpatch_dict() 

1892 

1893 return _inner 

1894 

1895 

1896 def decorate_class(self, klass): 

1897 for attr in dir(klass): 

1898 attr_value = getattr(klass, attr) 

1899 if (attr.startswith(patch.TEST_PREFIX) and 

1900 hasattr(attr_value, "__call__")): 

1901 decorator = _patch_dict(self.in_dict, self.values, self.clear) 

1902 decorated = decorator(attr_value) 

1903 setattr(klass, attr, decorated) 

1904 return klass 

1905 

1906 

1907 def __enter__(self): 

1908 """Patch the dict.""" 

1909 self._patch_dict() 

1910 return self.in_dict 

1911 

1912 

1913 def _patch_dict(self): 

1914 values = self.values 

1915 if isinstance(self.in_dict, str): 

1916 self.in_dict = _importer(self.in_dict) 

1917 in_dict = self.in_dict 

1918 clear = self.clear 

1919 

1920 try: 

1921 original = in_dict.copy() 

1922 except AttributeError: 

1923 # dict like object with no copy method 

1924 # must support iteration over keys 

1925 original = {} 

1926 for key in in_dict: 

1927 original[key] = in_dict[key] 

1928 self._original = original 

1929 

1930 if clear: 

1931 _clear_dict(in_dict) 

1932 

1933 try: 

1934 in_dict.update(values) 

1935 except AttributeError: 

1936 # dict like object with no update method 

1937 for key in values: 

1938 in_dict[key] = values[key] 

1939 

1940 

1941 def _unpatch_dict(self): 

1942 in_dict = self.in_dict 

1943 original = self._original 

1944 

1945 _clear_dict(in_dict) 

1946 

1947 try: 

1948 in_dict.update(original) 

1949 except AttributeError: 

1950 for key in original: 

1951 in_dict[key] = original[key] 

1952 

1953 

1954 def __exit__(self, *args): 

1955 """Unpatch the dict.""" 

1956 if self._original is not None: 

1957 self._unpatch_dict() 

1958 return False 

1959 

1960 

1961 def start(self): 

1962 """Activate a patch, returning any created mock.""" 

1963 result = self.__enter__() 

1964 _patch._active_patches.append(self) 

1965 return result 

1966 

1967 

1968 def stop(self): 

1969 """Stop an active patch.""" 

1970 try: 

1971 _patch._active_patches.remove(self) 

1972 except ValueError: 

1973 # If the patch hasn't been started this will fail 

1974 return None 

1975 

1976 return self.__exit__(None, None, None) 

1977 

1978 

1979def _clear_dict(in_dict): 

1980 try: 

1981 in_dict.clear() 

1982 except AttributeError: 

1983 keys = list(in_dict) 

1984 for key in keys: 

1985 del in_dict[key] 

1986 

1987 

1988def _patch_stopall(): 

1989 """Stop all active patches. LIFO to unroll nested patches.""" 

1990 for patch in reversed(_patch._active_patches): 

1991 patch.stop() 

1992 

1993 

1994patch.object = _patch_object 

1995patch.dict = _patch_dict 

1996patch.multiple = _patch_multiple 

1997patch.stopall = _patch_stopall 

1998patch.TEST_PREFIX = 'test' 

1999 

2000magic_methods = ( 

2001 "lt le gt ge eq ne " 

2002 "getitem setitem delitem " 

2003 "len contains iter " 

2004 "hash str sizeof " 

2005 "enter exit " 

2006 # we added divmod and rdivmod here instead of numerics 

2007 # because there is no idivmod 

2008 "divmod rdivmod neg pos abs invert " 

2009 "complex int float index " 

2010 "round trunc floor ceil " 

2011 "bool next " 

2012 "fspath " 

2013 "aiter " 

2014) 

2015 

2016if IS_PYPY: 

2017 # PyPy has no __sizeof__: http://doc.pypy.org/en/latest/cpython_differences.html 

2018 magic_methods = magic_methods.replace('sizeof ', '') 

2019 

2020numerics = ( 

2021 "add sub mul matmul truediv floordiv mod lshift rshift and xor or pow" 

2022) 

2023inplace = ' '.join('i%s' % n for n in numerics.split()) 

2024right = ' '.join('r%s' % n for n in numerics.split()) 

2025 

2026# not including __prepare__, __instancecheck__, __subclasscheck__ 

2027# (as they are metaclass methods) 

2028# __del__ is not supported at all as it causes problems if it exists 

2029 

2030_non_defaults = { 

2031 '__get__', '__set__', '__delete__', '__reversed__', '__missing__', 

2032 '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__', 

2033 '__getstate__', '__setstate__', '__getformat__', 

2034 '__repr__', '__dir__', '__subclasses__', '__format__', 

2035 '__getnewargs_ex__', 

2036} 

2037 

2038 

2039def _get_method(name, func): 

2040 "Turns a callable object (like a mock) into a real function" 

2041 def method(self, *args, **kw): 

2042 return func(self, *args, **kw) 

2043 method.__name__ = name 

2044 return method 

2045 

2046 

2047_magics = { 

2048 '__%s__' % method for method in 

2049 ' '.join([magic_methods, numerics, inplace, right]).split() 

2050} 

2051 

2052# Magic methods used for async `with` statements 

2053_async_method_magics = {"__aenter__", "__aexit__", "__anext__"} 

2054# Magic methods that are only used with async calls but are synchronous functions themselves 

2055_sync_async_magics = {"__aiter__"} 

2056_async_magics = _async_method_magics | _sync_async_magics 

2057 

2058_all_sync_magics = _magics | _non_defaults 

2059_all_magics = _all_sync_magics | _async_magics 

2060 

2061_unsupported_magics = { 

2062 '__getattr__', '__setattr__', 

2063 '__init__', '__new__', '__prepare__', 

2064 '__instancecheck__', '__subclasscheck__', 

2065 '__del__' 

2066} 

2067 

2068_calculate_return_value = { 

2069 '__hash__': lambda self: object.__hash__(self), 

2070 '__str__': lambda self: object.__str__(self), 

2071 '__sizeof__': lambda self: object.__sizeof__(self), 

2072 '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}", 

2073} 

2074 

2075_return_values = { 

2076 '__lt__': NotImplemented, 

2077 '__gt__': NotImplemented, 

2078 '__le__': NotImplemented, 

2079 '__ge__': NotImplemented, 

2080 '__int__': 1, 

2081 '__contains__': False, 

2082 '__len__': 0, 

2083 '__exit__': False, 

2084 '__complex__': 1j, 

2085 '__float__': 1.0, 

2086 '__bool__': True, 

2087 '__index__': 1, 

2088 '__aexit__': False, 

2089} 

2090 

2091 

2092def _get_eq(self): 

2093 def __eq__(other): 

2094 ret_val = self.__eq__._mock_return_value 

2095 if ret_val is not DEFAULT: 

2096 return ret_val 

2097 if self is other: 

2098 return True 

2099 return NotImplemented 

2100 return __eq__ 

2101 

2102def _get_ne(self): 

2103 def __ne__(other): 

2104 if self.__ne__._mock_return_value is not DEFAULT: 

2105 return DEFAULT 

2106 if self is other: 

2107 return False 

2108 return NotImplemented 

2109 return __ne__ 

2110 

2111def _get_iter(self): 

2112 def __iter__(): 

2113 ret_val = self.__iter__._mock_return_value 

2114 if ret_val is DEFAULT: 

2115 return iter([]) 

2116 # if ret_val was already an iterator, then calling iter on it should 

2117 # return the iterator unchanged 

2118 return iter(ret_val) 

2119 return __iter__ 

2120 

2121def _get_async_iter(self): 

2122 def __aiter__(): 

2123 ret_val = self.__aiter__._mock_return_value 

2124 if ret_val is DEFAULT: 

2125 return _AsyncIterator(iter([])) 

2126 return _AsyncIterator(iter(ret_val)) 

2127 return __aiter__ 

2128 

2129_side_effect_methods = { 

2130 '__eq__': _get_eq, 

2131 '__ne__': _get_ne, 

2132 '__iter__': _get_iter, 

2133 '__aiter__': _get_async_iter 

2134} 

2135 

2136 

2137 

2138def _set_return_value(mock, method, name): 

2139 fixed = _return_values.get(name, DEFAULT) 

2140 if fixed is not DEFAULT: 

2141 method.return_value = fixed 

2142 return 

2143 

2144 return_calculator = _calculate_return_value.get(name) 

2145 if return_calculator is not None: 

2146 return_value = return_calculator(mock) 

2147 method.return_value = return_value 

2148 return 

2149 

2150 side_effector = _side_effect_methods.get(name) 

2151 if side_effector is not None: 

2152 method.side_effect = side_effector(mock) 

2153 

2154 

2155 

2156class MagicMixin(Base): 

2157 def __init__(self, *args, **kw): 

2158 self._mock_set_magics() # make magic work for kwargs in init 

2159 _safe_super(MagicMixin, self).__init__(*args, **kw) 

2160 self._mock_set_magics() # fix magic broken by upper level init 

2161 

2162 

2163 def _mock_set_magics(self): 

2164 orig_magics = _magics | _async_method_magics 

2165 these_magics = orig_magics 

2166 

2167 if getattr(self, "_mock_methods", None) is not None: 

2168 these_magics = orig_magics.intersection(self._mock_methods) 

2169 

2170 remove_magics = set() 

2171 remove_magics = orig_magics - these_magics 

2172 

2173 for entry in remove_magics: 

2174 if entry in type(self).__dict__: 

2175 # remove unneeded magic methods 

2176 delattr(self, entry) 

2177 

2178 # don't overwrite existing attributes if called a second time 

2179 these_magics = these_magics - set(type(self).__dict__) 

2180 

2181 _type = type(self) 

2182 for entry in these_magics: 

2183 setattr(_type, entry, MagicProxy(entry, self)) 

2184 

2185 

2186 

2187class NonCallableMagicMock(MagicMixin, NonCallableMock): 

2188 """A version of `MagicMock` that isn't callable.""" 

2189 def mock_add_spec(self, spec, spec_set=False): 

2190 """Add a spec to a mock. `spec` can either be an object or a 

2191 list of strings. Only attributes on the `spec` can be fetched as 

2192 attributes from the mock. 

2193 

2194 If `spec_set` is True then only attributes on the spec can be set.""" 

2195 self._mock_add_spec(spec, spec_set) 

2196 self._mock_set_magics() 

2197 

2198 

2199class AsyncMagicMixin(MagicMixin): 

2200 pass 

2201 

2202 

2203class MagicMock(MagicMixin, Mock): 

2204 """ 

2205 MagicMock is a subclass of Mock with default implementations 

2206 of most of the magic methods. You can use MagicMock without having to 

2207 configure the magic methods yourself. 

2208 

2209 If you use the `spec` or `spec_set` arguments then *only* magic 

2210 methods that exist in the spec will be created. 

2211 

2212 Attributes and the return value of a `MagicMock` will also be `MagicMocks`. 

2213 """ 

2214 def mock_add_spec(self, spec, spec_set=False): 

2215 """Add a spec to a mock. `spec` can either be an object or a 

2216 list of strings. Only attributes on the `spec` can be fetched as 

2217 attributes from the mock. 

2218 

2219 If `spec_set` is True then only attributes on the spec can be set.""" 

2220 self._mock_add_spec(spec, spec_set) 

2221 self._mock_set_magics() 

2222 

2223 

2224 

2225class MagicProxy(Base): 

2226 def __init__(self, name, parent): 

2227 self.name = name 

2228 self.parent = parent 

2229 

2230 def create_mock(self): 

2231 entry = self.name 

2232 parent = self.parent 

2233 m = parent._get_child_mock(name=entry, _new_name=entry, 

2234 _new_parent=parent) 

2235 setattr(parent, entry, m) 

2236 _set_return_value(parent, m, entry) 

2237 return m 

2238 

2239 def __get__(self, obj, _type=None): 

2240 return self.create_mock() 

2241 

2242 

2243_CODE_ATTRS = dir(CodeType) 

2244_CODE_SIG = inspect.signature(partial(CodeType.__init__, None)) 

2245 

2246 

2247class AsyncMockMixin(Base): 

2248 await_count = _delegating_property('await_count') 

2249 await_args = _delegating_property('await_args') 

2250 await_args_list = _delegating_property('await_args_list') 

2251 

2252 def __init__(self, *args, **kwargs): 

2253 super().__init__(*args, **kwargs) 

2254 # iscoroutinefunction() checks _is_coroutine property to say if an 

2255 # object is a coroutine. Without this check it looks to see if it is a 

2256 # function/method, which in this case it is not (since it is an 

2257 # AsyncMock). 

2258 # It is set through __dict__ because when spec_set is True, this 

2259 # attribute is likely undefined. 

2260 self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine 

2261 self.__dict__['_mock_await_count'] = 0 

2262 self.__dict__['_mock_await_args'] = None 

2263 self.__dict__['_mock_await_args_list'] = _CallList() 

2264 code_mock = NonCallableMock(spec_set=_CODE_ATTRS) 

2265 code_mock.__dict__["_spec_class"] = CodeType 

2266 code_mock.__dict__["_spec_signature"] = _CODE_SIG 

2267 code_mock.co_flags = ( 

2268 inspect.CO_COROUTINE 

2269 + inspect.CO_VARARGS 

2270 + inspect.CO_VARKEYWORDS 

2271 ) 

2272 code_mock.co_argcount = 0 

2273 code_mock.co_varnames = ('args', 'kwargs') 

2274 try: 

2275 code_mock.co_posonlyargcount = 0 

2276 except AttributeError: 

2277 # Python 3.7 and earlier. 

2278 pass 

2279 code_mock.co_kwonlyargcount = 0 

2280 self.__dict__['__code__'] = code_mock 

2281 self.__dict__['__name__'] = 'AsyncMock' 

2282 self.__dict__['__defaults__'] = tuple() 

2283 self.__dict__['__kwdefaults__'] = {} 

2284 self.__dict__['__annotations__'] = None 

2285 

2286 async def _execute_mock_call(_mock_self, *args, **kwargs): 

2287 self = _mock_self 

2288 # This is nearly just like super(), except for special handling 

2289 # of coroutines 

2290 

2291 _call = _Call((args, kwargs), two=True) 

2292 self.await_count += 1 

2293 self.await_args = _call 

2294 self.await_args_list.append(_call) 

2295 

2296 effect = self.side_effect 

2297 if effect is not None: 

2298 if _is_exception(effect): 

2299 raise effect 

2300 elif not _callable(effect): 

2301 try: 

2302 result = next(effect) 

2303 except StopIteration: 

2304 # It is impossible to propagate a StopIteration 

2305 # through coroutines because of PEP 479 

2306 raise StopAsyncIteration 

2307 if _is_exception(result): 

2308 raise result 

2309 elif iscoroutinefunction(effect): 

2310 result = await effect(*args, **kwargs) 

2311 else: 

2312 result = effect(*args, **kwargs) 

2313 

2314 if result is not DEFAULT: 

2315 return result 

2316 

2317 if self._mock_return_value is not DEFAULT: 

2318 return self.return_value 

2319 

2320 if self._mock_wraps is not None: 

2321 if iscoroutinefunction(self._mock_wraps): 

2322 return await self._mock_wraps(*args, **kwargs) 

2323 return self._mock_wraps(*args, **kwargs) 

2324 

2325 return self.return_value 

2326 

2327 def assert_awaited(_mock_self): 

2328 """ 

2329 Assert that the mock was awaited at least once. 

2330 """ 

2331 self = _mock_self 

2332 if self.await_count == 0: 

2333 msg = f"Expected {self._mock_name or 'mock'} to have been awaited." 

2334 raise AssertionError(msg) 

2335 

2336 def assert_awaited_once(_mock_self): 

2337 """ 

2338 Assert that the mock was awaited exactly once. 

2339 """ 

2340 self = _mock_self 

2341 if not self.await_count == 1: 

2342 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 

2343 f" Awaited {self.await_count} times.") 

2344 raise AssertionError(msg) 

2345 

2346 def assert_awaited_with(_mock_self, *args, **kwargs): 

2347 """ 

2348 Assert that the last await was with the specified arguments. 

2349 """ 

2350 self = _mock_self 

2351 if self.await_args is None: 

2352 expected = self._format_mock_call_signature(args, kwargs) 

2353 raise AssertionError(f'Expected await: {expected}\nNot awaited') 

2354 

2355 def _error_message(): 

2356 msg = self._format_mock_failure_message(args, kwargs, action='await') 

2357 return msg 

2358 

2359 expected = self._call_matcher(_Call((args, kwargs), two=True)) 

2360 actual = self._call_matcher(self.await_args) 

2361 if actual != expected: 

2362 cause = expected if isinstance(expected, Exception) else None 

2363 raise AssertionError(_error_message()) from cause 

2364 

2365 def assert_awaited_once_with(_mock_self, *args, **kwargs): 

2366 """ 

2367 Assert that the mock was awaited exactly once and with the specified 

2368 arguments. 

2369 """ 

2370 self = _mock_self 

2371 if not self.await_count == 1: 

2372 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 

2373 f" Awaited {self.await_count} times.") 

2374 raise AssertionError(msg) 

2375 return self.assert_awaited_with(*args, **kwargs) 

2376 

2377 def assert_any_await(_mock_self, *args, **kwargs): 

2378 """ 

2379 Assert the mock has ever been awaited with the specified arguments. 

2380 """ 

2381 self = _mock_self 

2382 expected = self._call_matcher(_Call((args, kwargs), two=True)) 

2383 cause = expected if isinstance(expected, Exception) else None 

2384 actual = [self._call_matcher(c) for c in self.await_args_list] 

2385 if cause or expected not in _AnyComparer(actual): 

2386 expected_string = self._format_mock_call_signature(args, kwargs) 

2387 raise AssertionError( 

2388 '%s await not found' % expected_string 

2389 ) from cause 

2390 

2391 def assert_has_awaits(_mock_self, calls, any_order=False): 

2392 """ 

2393 Assert the mock has been awaited with the specified calls. 

2394 The :attr:`await_args_list` list is checked for the awaits. 

2395 

2396 If `any_order` is False (the default) then the awaits must be 

2397 sequential. There can be extra calls before or after the 

2398 specified awaits. 

2399 

2400 If `any_order` is True then the awaits can be in any order, but 

2401 they must all appear in :attr:`await_args_list`. 

2402 """ 

2403 self = _mock_self 

2404 expected = [self._call_matcher(c) for c in calls] 

2405 cause = cause = next((e for e in expected if isinstance(e, Exception)), None) 

2406 all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list) 

2407 if not any_order: 

2408 if expected not in all_awaits: 

2409 if cause is None: 

2410 problem = 'Awaits not found.' 

2411 else: 

2412 problem = ('Error processing expected awaits.\n' 

2413 'Errors: {}').format( 

2414 [e if isinstance(e, Exception) else None 

2415 for e in expected]) 

2416 raise AssertionError( 

2417 f'{problem}\n' 

2418 f'Expected: {_CallList(calls)}\n' 

2419 f'Actual: {self.await_args_list}' 

2420 ) from cause 

2421 return 

2422 

2423 all_awaits = list(all_awaits) 

2424 

2425 not_found = [] 

2426 for kall in expected: 

2427 try: 

2428 all_awaits.remove(kall) 

2429 except ValueError: 

2430 not_found.append(kall) 

2431 if not_found: 

2432 raise AssertionError( 

2433 '%r not all found in await list' % (tuple(not_found),) 

2434 ) from cause 

2435 

2436 def assert_not_awaited(_mock_self): 

2437 """ 

2438 Assert that the mock was never awaited. 

2439 """ 

2440 self = _mock_self 

2441 if self.await_count != 0: 

2442 msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited." 

2443 f" Awaited {self.await_count} times.") 

2444 raise AssertionError(msg) 

2445 

2446 def reset_mock(self, *args, **kwargs): 

2447 """ 

2448 See :func:`.Mock.reset_mock()` 

2449 """ 

2450 super().reset_mock(*args, **kwargs) 

2451 self.await_count = 0 

2452 self.await_args = None 

2453 self.await_args_list = _CallList() 

2454 

2455 

2456class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock): 

2457 """ 

2458 Enhance :class:`Mock` with features allowing to mock 

2459 an async function. 

2460 

2461 The :class:`AsyncMock` object will behave so the object is 

2462 recognized as an async function, and the result of a call is an awaitable: 

2463 

2464 >>> mock = AsyncMock() 

2465 >>> iscoroutinefunction(mock) 

2466 True 

2467 >>> inspect.isawaitable(mock()) 

2468 True 

2469 

2470 

2471 The result of ``mock()`` is an async function which will have the outcome 

2472 of ``side_effect`` or ``return_value``: 

2473 

2474 - if ``side_effect`` is a function, the async function will return the 

2475 result of that function, 

2476 - if ``side_effect`` is an exception, the async function will raise the 

2477 exception, 

2478 - if ``side_effect`` is an iterable, the async function will return the 

2479 next value of the iterable, however, if the sequence of result is 

2480 exhausted, ``StopIteration`` is raised immediately, 

2481 - if ``side_effect`` is not defined, the async function will return the 

2482 value defined by ``return_value``, hence, by default, the async function 

2483 returns a new :class:`AsyncMock` object. 

2484 

2485 If the outcome of ``side_effect`` or ``return_value`` is an async function, 

2486 the mock async function obtained when the mock object is called will be this 

2487 async function itself (and not an async function returning an async 

2488 function). 

2489 

2490 The test author can also specify a wrapped object with ``wraps``. In this 

2491 case, the :class:`Mock` object behavior is the same as with an 

2492 :class:`.Mock` object: the wrapped object may have methods 

2493 defined as async function functions. 

2494 

2495 Based on Martin Richard's asynctest project. 

2496 """ 

2497 

2498 

2499class _ANY(object): 

2500 "A helper object that compares equal to everything." 

2501 

2502 def __eq__(self, other): 

2503 return True 

2504 

2505 def __ne__(self, other): 

2506 return False 

2507 

2508 def __repr__(self): 

2509 return '<ANY>' 

2510 

2511ANY = _ANY() 

2512 

2513 

2514 

2515def _format_call_signature(name, args, kwargs): 

2516 message = '%s(%%s)' % name 

2517 formatted_args = '' 

2518 args_string = ', '.join([repr(arg) for arg in args]) 

2519 kwargs_string = ', '.join([ 

2520 '%s=%r' % (key, value) for key, value in kwargs.items() 

2521 ]) 

2522 if args_string: 

2523 formatted_args = args_string 

2524 if kwargs_string: 

2525 if formatted_args: 

2526 formatted_args += ', ' 

2527 formatted_args += kwargs_string 

2528 

2529 return message % formatted_args 

2530 

2531 

2532 

2533class _Call(tuple): 

2534 """ 

2535 A tuple for holding the results of a call to a mock, either in the form 

2536 `(args, kwargs)` or `(name, args, kwargs)`. 

2537 

2538 If args or kwargs are empty then a call tuple will compare equal to 

2539 a tuple without those values. This makes comparisons less verbose:: 

2540 

2541 _Call(('name', (), {})) == ('name',) 

2542 _Call(('name', (1,), {})) == ('name', (1,)) 

2543 _Call(((), {'a': 'b'})) == ({'a': 'b'},) 

2544 

2545 The `_Call` object provides a useful shortcut for comparing with call:: 

2546 

2547 _Call(((1, 2), {'a': 3})) == call(1, 2, a=3) 

2548 _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3) 

2549 

2550 If the _Call has no name then it will match any name. 

2551 """ 

2552 def __new__(cls, value=(), name='', parent=None, two=False, 

2553 from_kall=True): 

2554 args = () 

2555 kwargs = {} 

2556 _len = len(value) 

2557 if _len == 3: 

2558 name, args, kwargs = value 

2559 elif _len == 2: 

2560 first, second = value 

2561 if isinstance(first, str): 

2562 name = first 

2563 if isinstance(second, tuple): 

2564 args = second 

2565 else: 

2566 kwargs = second 

2567 else: 

2568 args, kwargs = first, second 

2569 elif _len == 1: 

2570 value, = value 

2571 if isinstance(value, str): 

2572 name = value 

2573 elif isinstance(value, tuple): 

2574 args = value 

2575 else: 

2576 kwargs = value 

2577 

2578 if two: 

2579 return tuple.__new__(cls, (args, kwargs)) 

2580 

2581 return tuple.__new__(cls, (name, args, kwargs)) 

2582 

2583 

2584 def __init__(self, value=(), name=None, parent=None, two=False, 

2585 from_kall=True): 

2586 self._mock_name = name 

2587 self._mock_parent = parent 

2588 self._mock_from_kall = from_kall 

2589 

2590 

2591 def __eq__(self, other): 

2592 try: 

2593 len_other = len(other) 

2594 except TypeError: 

2595 return NotImplemented 

2596 

2597 self_name = '' 

2598 if len(self) == 2: 

2599 self_args, self_kwargs = self 

2600 else: 

2601 self_name, self_args, self_kwargs = self 

2602 

2603 if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None) 

2604 and self._mock_parent != other._mock_parent): 

2605 return False 

2606 

2607 other_name = '' 

2608 if len_other == 0: 

2609 other_args, other_kwargs = (), {} 

2610 elif len_other == 3: 

2611 other_name, other_args, other_kwargs = other 

2612 elif len_other == 1: 

2613 value, = other 

2614 if isinstance(value, tuple): 

2615 other_args = value 

2616 other_kwargs = {} 

2617 elif isinstance(value, str): 

2618 other_name = value 

2619 other_args, other_kwargs = (), {} 

2620 else: 

2621 other_args = () 

2622 other_kwargs = value 

2623 elif len_other == 2: 

2624 # could be (name, args) or (name, kwargs) or (args, kwargs) 

2625 first, second = other 

2626 if isinstance(first, str): 

2627 other_name = first 

2628 if isinstance(second, tuple): 

2629 other_args, other_kwargs = second, {} 

2630 else: 

2631 other_args, other_kwargs = (), second 

2632 else: 

2633 other_args, other_kwargs = first, second 

2634 else: 

2635 return False 

2636 

2637 if self_name and other_name != self_name: 

2638 return False 

2639 

2640 # this order is important for ANY to work! 

2641 return (other_args, other_kwargs) == (self_args, self_kwargs) 

2642 

2643 

2644 __ne__ = object.__ne__ 

2645 

2646 

2647 def __call__(self, *args, **kwargs): 

2648 if self._mock_name is None: 

2649 return _Call(('', args, kwargs), name='()') 

2650 

2651 name = self._mock_name + '()' 

2652 return _Call((self._mock_name, args, kwargs), name=name, parent=self) 

2653 

2654 

2655 def __getattr__(self, attr): 

2656 if self._mock_name is None: 

2657 return _Call(name=attr, from_kall=False) 

2658 name = '%s.%s' % (self._mock_name, attr) 

2659 return _Call(name=name, parent=self, from_kall=False) 

2660 

2661 

2662 def __getattribute__(self, attr): 

2663 if attr in tuple.__dict__: 

2664 raise AttributeError 

2665 return tuple.__getattribute__(self, attr) 

2666 

2667 

2668 def _get_call_arguments(self): 

2669 if len(self) == 2: 

2670 args, kwargs = self 

2671 else: 

2672 name, args, kwargs = self 

2673 

2674 return args, kwargs 

2675 

2676 @property 

2677 def args(self): 

2678 return self._get_call_arguments()[0] 

2679 

2680 @property 

2681 def kwargs(self): 

2682 return self._get_call_arguments()[1] 

2683 

2684 def __repr__(self): 

2685 if not self._mock_from_kall: 

2686 name = self._mock_name or 'call' 

2687 if name.startswith('()'): 

2688 name = 'call%s' % name 

2689 return name 

2690 

2691 if len(self) == 2: 

2692 name = 'call' 

2693 args, kwargs = self 

2694 else: 

2695 name, args, kwargs = self 

2696 if not name: 

2697 name = 'call' 

2698 elif not name.startswith('()'): 

2699 name = 'call.%s' % name 

2700 else: 

2701 name = 'call%s' % name 

2702 return _format_call_signature(name, args, kwargs) 

2703 

2704 

2705 def call_list(self): 

2706 """For a call object that represents multiple calls, `call_list` 

2707 returns a list of all the intermediate calls as well as the 

2708 final call.""" 

2709 vals = [] 

2710 thing = self 

2711 while thing is not None: 

2712 if thing._mock_from_kall: 

2713 vals.append(thing) 

2714 thing = thing._mock_parent 

2715 return _CallList(reversed(vals)) 

2716 

2717 

2718call = _Call(from_kall=False) 

2719 

2720 

2721def create_autospec(spec, spec_set=False, instance=False, _parent=None, 

2722 _name=None, *, unsafe=False, **kwargs): 

2723 """Create a mock object using another object as a spec. Attributes on the 

2724 mock will use the corresponding attribute on the `spec` object as their 

2725 spec. 

2726 

2727 Functions or methods being mocked will have their arguments checked 

2728 to check that they are called with the correct signature. 

2729 

2730 If `spec_set` is True then attempting to set attributes that don't exist 

2731 on the spec object will raise an `AttributeError`. 

2732 

2733 If a class is used as a spec then the return value of the mock (the 

2734 instance of the class) will have the same spec. You can use a class as the 

2735 spec for an instance object by passing `instance=True`. The returned mock 

2736 will only be callable if instances of the mock are callable. 

2737 

2738 `create_autospec` will raise a `RuntimeError` if passed some common 

2739 misspellings of the arguments autospec and spec_set. Pass the argument 

2740 `unsafe` with the value True to disable that check. 

2741 

2742 `create_autospec` also takes arbitrary keyword arguments that are passed to 

2743 the constructor of the created mock.""" 

2744 if _is_list(spec): 

2745 # can't pass a list instance to the mock constructor as it will be 

2746 # interpreted as a list of strings 

2747 spec = type(spec) 

2748 

2749 is_type = isinstance(spec, type) 

2750 if _is_instance_mock(spec): 

2751 raise InvalidSpecError(f'Cannot autospec a Mock object. ' 

2752 f'[object={spec!r}]') 

2753 is_async_func = _is_async_func(spec) 

2754 _kwargs = {'spec': spec} 

2755 if spec_set: 

2756 _kwargs = {'spec_set': spec} 

2757 elif spec is None: 

2758 # None we mock with a normal mock without a spec 

2759 _kwargs = {} 

2760 if _kwargs and instance: 

2761 _kwargs['_spec_as_instance'] = True 

2762 if not unsafe: 

2763 _check_spec_arg_typos(kwargs) 

2764 

2765 _kwargs.update(kwargs) 

2766 

2767 Klass = MagicMock 

2768 if inspect.isdatadescriptor(spec): 

2769 # descriptors don't have a spec 

2770 # because we don't know what type they return 

2771 _kwargs = {} 

2772 elif is_async_func: 

2773 if instance: 

2774 raise RuntimeError("Instance can not be True when create_autospec " 

2775 "is mocking an async function") 

2776 Klass = AsyncMock 

2777 elif not _callable(spec): 

2778 Klass = NonCallableMagicMock 

2779 elif is_type and instance and not _instance_callable(spec): 

2780 Klass = NonCallableMagicMock 

2781 

2782 _name = _kwargs.pop('name', _name) 

2783 

2784 _new_name = _name 

2785 if _parent is None: 

2786 # for a top level object no _new_name should be set 

2787 _new_name = '' 

2788 

2789 mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name, 

2790 name=_name, **_kwargs) 

2791 

2792 if isinstance(spec, FunctionTypes): 

2793 # should only happen at the top level because we don't 

2794 # recurse for functions 

2795 mock = _set_signature(mock, spec) 

2796 if is_async_func: 

2797 _setup_async_mock(mock) 

2798 else: 

2799 _check_signature(spec, mock, is_type, instance) 

2800 

2801 if _parent is not None and not instance: 

2802 _parent._mock_children[_name] = mock 

2803 

2804 if is_type and not instance and 'return_value' not in kwargs: 

2805 mock.return_value = create_autospec(spec, spec_set, instance=True, 

2806 _name='()', _parent=mock) 

2807 

2808 for entry in dir(spec): 

2809 if _is_magic(entry): 

2810 # MagicMock already does the useful magic methods for us 

2811 continue 

2812 

2813 # XXXX do we need a better way of getting attributes without 

2814 # triggering code execution (?) Probably not - we need the actual 

2815 # object to mock it so we would rather trigger a property than mock 

2816 # the property descriptor. Likewise we want to mock out dynamically 

2817 # provided attributes. 

2818 # XXXX what about attributes that raise exceptions other than 

2819 # AttributeError on being fetched? 

2820 # we could be resilient against it, or catch and propagate the 

2821 # exception when the attribute is fetched from the mock 

2822 try: 

2823 original = getattr(spec, entry) 

2824 except AttributeError: 

2825 continue 

2826 

2827 kwargs = {'spec': original} 

2828 if spec_set: 

2829 kwargs = {'spec_set': original} 

2830 

2831 if not isinstance(original, FunctionTypes): 

2832 new = _SpecState(original, spec_set, mock, entry, instance) 

2833 mock._mock_children[entry] = new 

2834 else: 

2835 parent = mock 

2836 if isinstance(spec, FunctionTypes): 

2837 parent = mock.mock 

2838 

2839 skipfirst = _must_skip(spec, entry, is_type) 

2840 kwargs['_eat_self'] = skipfirst 

2841 if iscoroutinefunction(original): 

2842 child_klass = AsyncMock 

2843 else: 

2844 child_klass = MagicMock 

2845 new = child_klass(parent=parent, name=entry, _new_name=entry, 

2846 _new_parent=parent, 

2847 **kwargs) 

2848 mock._mock_children[entry] = new 

2849 new.return_value = child_klass() 

2850 _check_signature(original, new, skipfirst=skipfirst) 

2851 

2852 # so functions created with _set_signature become instance attributes, 

2853 # *plus* their underlying mock exists in _mock_children of the parent 

2854 # mock. Adding to _mock_children may be unnecessary where we are also 

2855 # setting as an instance attribute? 

2856 if isinstance(new, FunctionTypes): 

2857 setattr(mock, entry, new) 

2858 

2859 return mock 

2860 

2861 

2862def _must_skip(spec, entry, is_type): 

2863 """ 

2864 Return whether we should skip the first argument on spec's `entry` 

2865 attribute. 

2866 """ 

2867 if not isinstance(spec, type): 

2868 if entry in getattr(spec, '__dict__', {}): 

2869 # instance attribute - shouldn't skip 

2870 return False 

2871 spec = spec.__class__ 

2872 

2873 for klass in spec.__mro__: 

2874 result = klass.__dict__.get(entry, DEFAULT) 

2875 if result is DEFAULT: 

2876 continue 

2877 if isinstance(result, (staticmethod, classmethod)): 

2878 return False 

2879 elif isinstance(result, FunctionTypes): 

2880 # Normal method => skip if looked up on type 

2881 # (if looked up on instance, self is already skipped) 

2882 return is_type 

2883 else: 

2884 return False 

2885 

2886 # function is a dynamically provided attribute 

2887 return is_type 

2888 

2889 

2890class _SpecState(object): 

2891 

2892 def __init__(self, spec, spec_set=False, parent=None, 

2893 name=None, ids=None, instance=False): 

2894 self.spec = spec 

2895 self.ids = ids 

2896 self.spec_set = spec_set 

2897 self.parent = parent 

2898 self.instance = instance 

2899 self.name = name 

2900 

2901 

2902FunctionTypes = ( 

2903 # python function 

2904 type(create_autospec), 

2905 # instance method 

2906 type(ANY.__eq__), 

2907) 

2908 

2909 

2910file_spec = None 

2911open_spec = None 

2912 

2913 

2914def _to_stream(read_data): 

2915 if isinstance(read_data, bytes): 

2916 return io.BytesIO(read_data) 

2917 else: 

2918 return io.StringIO(read_data) 

2919 

2920 

2921def mock_open(mock=None, read_data=''): 

2922 """ 

2923 A helper function to create a mock to replace the use of `open`. It works 

2924 for `open` called directly or used as a context manager. 

2925 

2926 The `mock` argument is the mock object to configure. If `None` (the 

2927 default) then a `MagicMock` will be created for you, with the API limited 

2928 to methods or attributes available on standard file handles. 

2929 

2930 `read_data` is a string for the `read`, `readline` and `readlines` of the 

2931 file handle to return. This is an empty string by default. 

2932 """ 

2933 _read_data = _to_stream(read_data) 

2934 _state = [_read_data, None] 

2935 

2936 def _readlines_side_effect(*args, **kwargs): 

2937 if handle.readlines.return_value is not None: 

2938 return handle.readlines.return_value 

2939 return _state[0].readlines(*args, **kwargs) 

2940 

2941 def _read_side_effect(*args, **kwargs): 

2942 if handle.read.return_value is not None: 

2943 return handle.read.return_value 

2944 return _state[0].read(*args, **kwargs) 

2945 

2946 def _readline_side_effect(*args, **kwargs): 

2947 yield from _iter_side_effect() 

2948 while True: 

2949 yield _state[0].readline(*args, **kwargs) 

2950 

2951 def _iter_side_effect(): 

2952 if handle.readline.return_value is not None: 

2953 while True: 

2954 yield handle.readline.return_value 

2955 for line in _state[0]: 

2956 yield line 

2957 

2958 def _next_side_effect(): 

2959 if handle.readline.return_value is not None: 

2960 return handle.readline.return_value 

2961 return next(_state[0]) 

2962 

2963 global file_spec 

2964 if file_spec is None: 

2965 import _io 

2966 file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO)))) 

2967 

2968 global open_spec 

2969 if open_spec is None: 

2970 import _io 

2971 open_spec = list(set(dir(_io.open))) 

2972 if mock is None: 

2973 mock = MagicMock(name='open', spec=open_spec) 

2974 

2975 handle = MagicMock(spec=file_spec) 

2976 handle.__enter__.return_value = handle 

2977 

2978 handle.write.return_value = None 

2979 handle.read.return_value = None 

2980 handle.readline.return_value = None 

2981 handle.readlines.return_value = None 

2982 

2983 handle.read.side_effect = _read_side_effect 

2984 _state[1] = _readline_side_effect() 

2985 handle.readline.side_effect = _state[1] 

2986 handle.readlines.side_effect = _readlines_side_effect 

2987 handle.__iter__.side_effect = _iter_side_effect 

2988 handle.__next__.side_effect = _next_side_effect 

2989 

2990 def reset_data(*args, **kwargs): 

2991 _state[0] = _to_stream(read_data) 

2992 if handle.readline.side_effect == _state[1]: 

2993 # Only reset the side effect if the user hasn't overridden it. 

2994 _state[1] = _readline_side_effect() 

2995 handle.readline.side_effect = _state[1] 

2996 return DEFAULT 

2997 

2998 mock.side_effect = reset_data 

2999 mock.return_value = handle 

3000 return mock 

3001 

3002 

3003class PropertyMock(Mock): 

3004 """ 

3005 A mock intended to be used as a property, or other descriptor, on a class. 

3006 `PropertyMock` provides `__get__` and `__set__` methods so you can specify 

3007 a return value when it is fetched. 

3008 

3009 Fetching a `PropertyMock` instance from an object calls the mock, with 

3010 no args. Setting it calls the mock with the value being set. 

3011 """ 

3012 def _get_child_mock(self, **kwargs): 

3013 return MagicMock(**kwargs) 

3014 

3015 def __get__(self, obj, obj_type=None): 

3016 return self() 

3017 def __set__(self, obj, val): 

3018 self(val) 

3019 

3020 

3021def seal(mock): 

3022 """Disable the automatic generation of child mocks. 

3023 

3024 Given an input Mock, seals it to ensure no further mocks will be generated 

3025 when accessing an attribute that was not already defined. 

3026 

3027 The operation recursively seals the mock passed in, meaning that 

3028 the mock itself, any mocks generated by accessing one of its attributes, 

3029 and all assigned mocks without a name or spec will be sealed. 

3030 """ 

3031 mock._mock_sealed = True 

3032 for attr in dir(mock): 

3033 try: 

3034 m = getattr(mock, attr) 

3035 except AttributeError: 

3036 continue 

3037 if not isinstance(m, NonCallableMock): 

3038 continue 

3039 if isinstance(m._mock_children.get(attr), _SpecState): 

3040 continue 

3041 if m._mock_new_parent is mock: 

3042 seal(m) 

3043 

3044 

3045class _AsyncIterator: 

3046 """ 

3047 Wraps an iterator in an asynchronous iterator. 

3048 """ 

3049 def __init__(self, iterator): 

3050 self.iterator = iterator 

3051 code_mock = NonCallableMock(spec_set=CodeType) 

3052 code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE 

3053 self.__dict__['__code__'] = code_mock 

3054 

3055 async def __anext__(self): 

3056 try: 

3057 return next(self.iterator) 

3058 except StopIteration: 

3059 pass 

3060 raise StopAsyncIteration