Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/mock/mock.py: 21%

1622 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 07:30 +0000

1# mock.py 

2# Test tools for mocking and patching. 

3# Maintained by Michael Foord 

4# Backport for other versions of Python available from 

5# https://pypi.org/project/mock 

6 

7__all__ = ( 

8 'Mock', 

9 'MagicMock', 

10 'patch', 

11 'sentinel', 

12 'DEFAULT', 

13 'ANY', 

14 'call', 

15 'create_autospec', 

16 'AsyncMock', 

17 'FILTER_DIR', 

18 'NonCallableMock', 

19 'NonCallableMagicMock', 

20 'mock_open', 

21 'PropertyMock', 

22 'seal', 

23) 

24 

25 

26import asyncio 

27import contextlib 

28import io 

29import inspect 

30import pprint 

31import sys 

32import builtins 

33from asyncio import iscoroutinefunction 

34from types import CodeType, ModuleType, MethodType 

35from unittest.util import safe_repr 

36from functools import wraps, partial 

37from threading import RLock 

38 

39 

40from mock import IS_PYPY 

41from .backports import iscoroutinefunction 

42 

43 

44class InvalidSpecError(Exception): 

45 """Indicates that an invalid value was used as a mock spec.""" 

46 

47 

48_builtins = {name for name in dir(builtins) if not name.startswith('_')} 

49 

50FILTER_DIR = True 

51 

52# Workaround for issue #12370 

53# Without this, the __class__ properties wouldn't be set correctly 

54_safe_super = super 

55 

56def _is_async_obj(obj): 

57 if _is_instance_mock(obj) and not isinstance(obj, AsyncMock): 

58 return False 

59 if hasattr(obj, '__func__'): 

60 obj = getattr(obj, '__func__') 

61 return iscoroutinefunction(obj) or inspect.isawaitable(obj) 

62 

63 

64def _is_async_func(func): 

65 if getattr(func, '__code__', None): 

66 return iscoroutinefunction(func) 

67 else: 

68 return False 

69 

70 

71def _is_instance_mock(obj): 

72 # can't use isinstance on Mock objects because they override __class__ 

73 # The base class for all mocks is NonCallableMock 

74 return issubclass(type(obj), NonCallableMock) 

75 

76 

77def _is_exception(obj): 

78 return ( 

79 isinstance(obj, BaseException) or 

80 isinstance(obj, type) and issubclass(obj, BaseException) 

81 ) 

82 

83 

84def _extract_mock(obj): 

85 # Autospecced functions will return a FunctionType with "mock" attribute 

86 # which is the actual mock object that needs to be used. 

87 if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'): 

88 return obj.mock 

89 else: 

90 return obj 

91 

92 

93def _get_signature_object(func, as_instance, eat_self): 

94 """ 

95 Given an arbitrary, possibly callable object, try to create a suitable 

96 signature object. 

97 Return a (reduced func, signature) tuple, or None. 

98 """ 

99 if isinstance(func, type) and not as_instance: 

100 # If it's a type and should be modelled as a type, use __init__. 

101 func = func.__init__ 

102 # Skip the `self` argument in __init__ 

103 eat_self = True 

104 elif not isinstance(func, FunctionTypes): 

105 # If we really want to model an instance of the passed type, 

106 # __call__ should be looked up, not __init__. 

107 try: 

108 func = func.__call__ 

109 except AttributeError: 

110 return None 

111 if eat_self: 

112 sig_func = partial(func, None) 

113 else: 

114 sig_func = func 

115 try: 

116 return func, inspect.signature(sig_func) 

117 except ValueError: 

118 # Certain callable types are not supported by inspect.signature() 

119 return None 

120 

121 

122def _check_signature(func, mock, skipfirst, instance=False): 

123 sig = _get_signature_object(func, instance, skipfirst) 

124 if sig is None: 

125 return 

126 func, sig = sig 

127 def checksig(_mock_self, *args, **kwargs): 

128 sig.bind(*args, **kwargs) 

129 _copy_func_details(func, checksig) 

130 type(mock)._mock_check_sig = checksig 

131 type(mock).__signature__ = sig 

132 

133 

134def _copy_func_details(func, funcopy): 

135 # we explicitly don't copy func.__dict__ into this copy as it would 

136 # expose original attributes that should be mocked 

137 for attribute in ( 

138 '__name__', '__doc__', '__text_signature__', 

139 '__module__', '__defaults__', '__kwdefaults__', 

140 ): 

141 try: 

142 setattr(funcopy, attribute, getattr(func, attribute)) 

143 except AttributeError: 

144 pass 

145 

146 

147def _callable(obj): 

148 if isinstance(obj, type): 

149 return True 

150 if isinstance(obj, (staticmethod, classmethod, MethodType)): 

151 return _callable(obj.__func__) 

152 if getattr(obj, '__call__', None) is not None: 

153 return True 

154 return False 

155 

156 

157def _is_list(obj): 

158 # checks for list or tuples 

159 # XXXX badly named! 

160 return type(obj) in (list, tuple) 

161 

162 

163def _instance_callable(obj): 

164 """Given an object, return True if the object is callable. 

165 For classes, return True if instances would be callable.""" 

166 if not isinstance(obj, type): 

167 # already an instance 

168 return getattr(obj, '__call__', None) is not None 

169 

170 # *could* be broken by a class overriding __mro__ or __dict__ via 

171 # a metaclass 

172 for base in (obj,) + obj.__mro__: 

173 if base.__dict__.get('__call__') is not None: 

174 return True 

175 return False 

176 

177 

178def _set_signature(mock, original, instance=False): 

179 # creates a function with signature (*args, **kwargs) that delegates to a 

180 # mock. It still does signature checking by calling a lambda with the same 

181 # signature as the original. 

182 

183 skipfirst = isinstance(original, type) 

184 result = _get_signature_object(original, instance, skipfirst) 

185 if result is None: 

186 return mock 

187 func, sig = result 

188 def checksig(*args, **kwargs): 

189 sig.bind(*args, **kwargs) 

190 _copy_func_details(func, checksig) 

191 

192 name = original.__name__ 

193 if not name.isidentifier(): 

194 name = 'funcopy' 

195 context = {'_checksig_': checksig, 'mock': mock} 

196 src = """def %s(*args, **kwargs): 

197 _checksig_(*args, **kwargs) 

198 return mock(*args, **kwargs)""" % name 

199 exec (src, context) 

200 funcopy = context[name] 

201 _setup_func(funcopy, mock, sig) 

202 return funcopy 

203 

204 

205def _setup_func(funcopy, mock, sig): 

206 funcopy.mock = mock 

207 

208 def assert_called_with(*args, **kwargs): 

209 return mock.assert_called_with(*args, **kwargs) 

210 def assert_called(*args, **kwargs): 

211 return mock.assert_called(*args, **kwargs) 

212 def assert_not_called(*args, **kwargs): 

213 return mock.assert_not_called(*args, **kwargs) 

214 def assert_called_once(*args, **kwargs): 

215 return mock.assert_called_once(*args, **kwargs) 

216 def assert_called_once_with(*args, **kwargs): 

217 return mock.assert_called_once_with(*args, **kwargs) 

218 def assert_has_calls(*args, **kwargs): 

219 return mock.assert_has_calls(*args, **kwargs) 

220 def assert_any_call(*args, **kwargs): 

221 return mock.assert_any_call(*args, **kwargs) 

222 def reset_mock(): 

223 funcopy.method_calls = _CallList() 

224 funcopy.mock_calls = _CallList() 

225 mock.reset_mock() 

226 ret = funcopy.return_value 

227 if _is_instance_mock(ret) and not ret is mock: 

228 ret.reset_mock() 

229 

230 funcopy.called = False 

231 funcopy.call_count = 0 

232 funcopy.call_args = None 

233 funcopy.call_args_list = _CallList() 

234 funcopy.method_calls = _CallList() 

235 funcopy.mock_calls = _CallList() 

236 

237 funcopy.return_value = mock.return_value 

238 funcopy.side_effect = mock.side_effect 

239 funcopy._mock_children = mock._mock_children 

240 

241 funcopy.assert_called_with = assert_called_with 

242 funcopy.assert_called_once_with = assert_called_once_with 

243 funcopy.assert_has_calls = assert_has_calls 

244 funcopy.assert_any_call = assert_any_call 

245 funcopy.reset_mock = reset_mock 

246 funcopy.assert_called = assert_called 

247 funcopy.assert_not_called = assert_not_called 

248 funcopy.assert_called_once = assert_called_once 

249 funcopy.__signature__ = sig 

250 

251 mock._mock_delegate = funcopy 

252 

253 

254def _setup_async_mock(mock): 

255 mock._is_coroutine = asyncio.coroutines._is_coroutine 

256 mock.await_count = 0 

257 mock.await_args = None 

258 mock.await_args_list = _CallList() 

259 

260 # Mock is not configured yet so the attributes are set 

261 # to a function and then the corresponding mock helper function 

262 # is called when the helper is accessed similar to _setup_func. 

263 def wrapper(attr, *args, **kwargs): 

264 return getattr(mock.mock, attr)(*args, **kwargs) 

265 

266 for attribute in ('assert_awaited', 

267 'assert_awaited_once', 

268 'assert_awaited_with', 

269 'assert_awaited_once_with', 

270 'assert_any_await', 

271 'assert_has_awaits', 

272 'assert_not_awaited'): 

273 

274 # setattr(mock, attribute, wrapper) causes late binding 

275 # hence attribute will always be the last value in the loop 

276 # Use partial(wrapper, attribute) to ensure the attribute is bound 

277 # correctly. 

278 setattr(mock, attribute, partial(wrapper, attribute)) 

279 

280 

281def _is_magic(name): 

282 return '__%s__' % name[2:-2] == name 

283 

284 

285class _SentinelObject(object): 

286 "A unique, named, sentinel object." 

287 def __init__(self, name): 

288 self.name = name 

289 

290 def __repr__(self): 

291 return 'sentinel.%s' % self.name 

292 

293 def __reduce__(self): 

294 return 'sentinel.%s' % self.name 

295 

296 

297class _Sentinel(object): 

298 """Access attributes to return a named object, usable as a sentinel.""" 

299 def __init__(self): 

300 self._sentinels = {} 

301 

302 def __getattr__(self, name): 

303 if name == '__bases__': 

304 # Without this help(unittest.mock) raises an exception 

305 raise AttributeError 

306 return self._sentinels.setdefault(name, _SentinelObject(name)) 

307 

308 def __reduce__(self): 

309 return 'sentinel' 

310 

311 

312sentinel = _Sentinel() 

313 

314DEFAULT = sentinel.DEFAULT 

315_missing = sentinel.MISSING 

316_deleted = sentinel.DELETED 

317 

318 

319_allowed_names = { 

320 'return_value', '_mock_return_value', 'side_effect', 

321 '_mock_side_effect', '_mock_parent', '_mock_new_parent', 

322 '_mock_name', '_mock_new_name' 

323} 

324 

325 

326def _delegating_property(name): 

327 _allowed_names.add(name) 

328 _the_name = '_mock_' + name 

329 def _get(self, name=name, _the_name=_the_name): 

330 sig = self._mock_delegate 

331 if sig is None: 

332 return getattr(self, _the_name) 

333 return getattr(sig, name) 

334 def _set(self, value, name=name, _the_name=_the_name): 

335 sig = self._mock_delegate 

336 if sig is None: 

337 self.__dict__[_the_name] = value 

338 else: 

339 setattr(sig, name, value) 

340 

341 return property(_get, _set) 

342 

343 

344 

345class _CallList(list): 

346 

347 def __contains__(self, value): 

348 if not isinstance(value, list): 

349 return list.__contains__(self, value) 

350 len_value = len(value) 

351 len_self = len(self) 

352 if len_value > len_self: 

353 return False 

354 

355 for i in range(0, len_self - len_value + 1): 

356 sub_list = self[i:i+len_value] 

357 if sub_list == value: 

358 return True 

359 return False 

360 

361 def __repr__(self): 

362 return pprint.pformat(list(self)) 

363 

364 

365def _check_and_set_parent(parent, value, name, new_name): 

366 value = _extract_mock(value) 

367 

368 if not _is_instance_mock(value): 

369 return False 

370 if ((value._mock_name or value._mock_new_name) or 

371 (value._mock_parent is not None) or 

372 (value._mock_new_parent is not None)): 

373 return False 

374 

375 _parent = parent 

376 while _parent is not None: 

377 # setting a mock (value) as a child or return value of itself 

378 # should not modify the mock 

379 if _parent is value: 

380 return False 

381 _parent = _parent._mock_new_parent 

382 

383 if new_name: 

384 value._mock_new_parent = parent 

385 value._mock_new_name = new_name 

386 if name: 

387 value._mock_parent = parent 

388 value._mock_name = name 

389 return True 

390 

391# Internal class to identify if we wrapped an iterator object or not. 

392class _MockIter(object): 

393 def __init__(self, obj): 

394 self.obj = iter(obj) 

395 def __next__(self): 

396 return next(self.obj) 

397 

398class Base(object): 

399 _mock_return_value = DEFAULT 

400 _mock_side_effect = None 

401 def __init__(self, *args, **kwargs): 

402 pass 

403 

404 

405 

406class NonCallableMock(Base): 

407 """A non-callable version of `Mock`""" 

408 

409 # Store a mutex as a class attribute in order to protect concurrent access 

410 # to mock attributes. Using a class attribute allows all NonCallableMock 

411 # instances to share the mutex for simplicity. 

412 # 

413 # See https://github.com/python/cpython/issues/98624 for why this is 

414 # necessary. 

415 _lock = RLock() 

416 

417 def __new__( 

418 cls, spec=None, wraps=None, name=None, spec_set=None, 

419 parent=None, _spec_state=None, _new_name='', _new_parent=None, 

420 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs 

421 ): 

422 # every instance has its own class 

423 # so we can create magic methods on the 

424 # class without stomping on other mocks 

425 bases = (cls,) 

426 if not issubclass(cls, AsyncMockMixin): 

427 # Check if spec is an async object or function 

428 spec_arg = spec_set or spec 

429 if spec_arg is not None and _is_async_obj(spec_arg): 

430 bases = (AsyncMockMixin, cls) 

431 new = type(cls.__name__, bases, {'__doc__': cls.__doc__}) 

432 instance = _safe_super(NonCallableMock, cls).__new__(new) 

433 return instance 

434 

435 

436 def __init__( 

437 self, spec=None, wraps=None, name=None, spec_set=None, 

438 parent=None, _spec_state=None, _new_name='', _new_parent=None, 

439 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs 

440 ): 

441 if _new_parent is None: 

442 _new_parent = parent 

443 

444 __dict__ = self.__dict__ 

445 __dict__['_mock_parent'] = parent 

446 __dict__['_mock_name'] = name 

447 __dict__['_mock_new_name'] = _new_name 

448 __dict__['_mock_new_parent'] = _new_parent 

449 __dict__['_mock_sealed'] = False 

450 

451 if spec_set is not None: 

452 spec = spec_set 

453 spec_set = True 

454 if _eat_self is None: 

455 _eat_self = parent is not None 

456 

457 self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self) 

458 

459 __dict__['_mock_children'] = {} 

460 __dict__['_mock_wraps'] = wraps 

461 __dict__['_mock_delegate'] = None 

462 

463 __dict__['_mock_called'] = False 

464 __dict__['_mock_call_args'] = None 

465 __dict__['_mock_call_count'] = 0 

466 __dict__['_mock_call_args_list'] = _CallList() 

467 __dict__['_mock_mock_calls'] = _CallList() 

468 

469 __dict__['method_calls'] = _CallList() 

470 __dict__['_mock_unsafe'] = unsafe 

471 

472 if kwargs: 

473 self.configure_mock(**kwargs) 

474 

475 _safe_super(NonCallableMock, self).__init__( 

476 spec, wraps, name, spec_set, parent, 

477 _spec_state 

478 ) 

479 

480 

481 def attach_mock(self, mock, attribute): 

482 """ 

483 Attach a mock as an attribute of this one, replacing its name and 

484 parent. Calls to the attached mock will be recorded in the 

485 `method_calls` and `mock_calls` attributes of this one.""" 

486 inner_mock = _extract_mock(mock) 

487 

488 inner_mock._mock_parent = None 

489 inner_mock._mock_new_parent = None 

490 inner_mock._mock_name = '' 

491 inner_mock._mock_new_name = None 

492 

493 setattr(self, attribute, mock) 

494 

495 

496 def mock_add_spec(self, spec, spec_set=False): 

497 """Add a spec to a mock. `spec` can either be an object or a 

498 list of strings. Only attributes on the `spec` can be fetched as 

499 attributes from the mock. 

500 

501 If `spec_set` is True then only attributes on the spec can be set.""" 

502 self._mock_add_spec(spec, spec_set) 

503 

504 

505 def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False, 

506 _eat_self=False): 

507 if _is_instance_mock(spec): 

508 raise InvalidSpecError(f'Cannot spec a Mock object. [object={spec!r}]') 

509 

510 _spec_class = None 

511 _spec_signature = None 

512 _spec_asyncs = [] 

513 

514 if spec is not None and not _is_list(spec): 

515 if isinstance(spec, type): 

516 _spec_class = spec 

517 else: 

518 _spec_class = type(spec) 

519 res = _get_signature_object(spec, 

520 _spec_as_instance, _eat_self) 

521 _spec_signature = res and res[1] 

522 

523 spec_list = dir(spec) 

524 

525 for attr in spec_list: 

526 if iscoroutinefunction(getattr(spec, attr, None)): 

527 _spec_asyncs.append(attr) 

528 

529 spec = spec_list 

530 

531 __dict__ = self.__dict__ 

532 __dict__['_spec_class'] = _spec_class 

533 __dict__['_spec_set'] = spec_set 

534 __dict__['_spec_signature'] = _spec_signature 

535 __dict__['_mock_methods'] = spec 

536 __dict__['_spec_asyncs'] = _spec_asyncs 

537 

538 def __get_return_value(self): 

539 ret = self._mock_return_value 

540 if self._mock_delegate is not None: 

541 ret = self._mock_delegate.return_value 

542 

543 if ret is DEFAULT: 

544 ret = self._get_child_mock( 

545 _new_parent=self, _new_name='()' 

546 ) 

547 self.return_value = ret 

548 return ret 

549 

550 

551 def __set_return_value(self, value): 

552 if self._mock_delegate is not None: 

553 self._mock_delegate.return_value = value 

554 else: 

555 self._mock_return_value = value 

556 _check_and_set_parent(self, value, None, '()') 

557 

558 __return_value_doc = "The value to be returned when the mock is called." 

559 return_value = property(__get_return_value, __set_return_value, 

560 __return_value_doc) 

561 

562 

563 @property 

564 def __class__(self): 

565 if self._spec_class is None: 

566 return type(self) 

567 return self._spec_class 

568 

569 called = _delegating_property('called') 

570 call_count = _delegating_property('call_count') 

571 call_args = _delegating_property('call_args') 

572 call_args_list = _delegating_property('call_args_list') 

573 mock_calls = _delegating_property('mock_calls') 

574 

575 

576 def __get_side_effect(self): 

577 delegated = self._mock_delegate 

578 if delegated is None: 

579 return self._mock_side_effect 

580 sf = delegated.side_effect 

581 if (sf is not None and not callable(sf) 

582 and not isinstance(sf, _MockIter) and not _is_exception(sf)): 

583 sf = _MockIter(sf) 

584 delegated.side_effect = sf 

585 return sf 

586 

587 def __set_side_effect(self, value): 

588 value = _try_iter(value) 

589 delegated = self._mock_delegate 

590 if delegated is None: 

591 self._mock_side_effect = value 

592 else: 

593 delegated.side_effect = value 

594 

595 side_effect = property(__get_side_effect, __set_side_effect) 

596 

597 

598 def reset_mock(self, visited=None,*, return_value=False, side_effect=False): 

599 "Restore the mock object to its initial state." 

600 if visited is None: 

601 visited = [] 

602 if id(self) in visited: 

603 return 

604 visited.append(id(self)) 

605 

606 self.called = False 

607 self.call_args = None 

608 self.call_count = 0 

609 self.mock_calls = _CallList() 

610 self.call_args_list = _CallList() 

611 self.method_calls = _CallList() 

612 

613 if return_value: 

614 self._mock_return_value = DEFAULT 

615 if side_effect: 

616 self._mock_side_effect = None 

617 

618 for child in self._mock_children.values(): 

619 if isinstance(child, _SpecState) or child is _deleted: 

620 continue 

621 child.reset_mock(visited, return_value=return_value, side_effect=side_effect) 

622 

623 ret = self._mock_return_value 

624 if _is_instance_mock(ret) and ret is not self: 

625 ret.reset_mock(visited) 

626 

627 

628 def configure_mock(self, **kwargs): 

629 """Set attributes on the mock through keyword arguments. 

630 

631 Attributes plus return values and side effects can be set on child 

632 mocks using standard dot notation and unpacking a dictionary in the 

633 method call: 

634 

635 >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError} 

636 >>> mock.configure_mock(**attrs)""" 

637 for arg, val in sorted(kwargs.items(), 

638 # we sort on the number of dots so that 

639 # attributes are set before we set attributes on 

640 # attributes 

641 key=lambda entry: entry[0].count('.')): 

642 args = arg.split('.') 

643 final = args.pop() 

644 obj = self 

645 for entry in args: 

646 obj = getattr(obj, entry) 

647 setattr(obj, final, val) 

648 

649 

650 def __getattr__(self, name): 

651 if name in {'_mock_methods', '_mock_unsafe'}: 

652 raise AttributeError(name) 

653 elif self._mock_methods is not None: 

654 if name not in self._mock_methods or name in _all_magics: 

655 raise AttributeError("Mock object has no attribute %r" % name) 

656 elif _is_magic(name): 

657 raise AttributeError(name) 

658 if not self._mock_unsafe and (not self._mock_methods or name not in self._mock_methods): 

659 if name.startswith(('assert', 'assret', 'asert', 'aseert', 'assrt')) or name in _ATTRIB_DENY_LIST: 

660 raise AttributeError( 

661 f"{name!r} is not a valid assertion. Use a spec " 

662 f"for the mock if {name!r} is meant to be an attribute.") 

663 

664 with NonCallableMock._lock: 

665 result = self._mock_children.get(name) 

666 if result is _deleted: 

667 raise AttributeError(name) 

668 elif result is None: 

669 wraps = None 

670 if self._mock_wraps is not None: 

671 # XXXX should we get the attribute without triggering code 

672 # execution? 

673 wraps = getattr(self._mock_wraps, name) 

674 

675 result = self._get_child_mock( 

676 parent=self, name=name, wraps=wraps, _new_name=name, 

677 _new_parent=self 

678 ) 

679 self._mock_children[name] = result 

680 

681 elif isinstance(result, _SpecState): 

682 try: 

683 result = create_autospec( 

684 result.spec, result.spec_set, result.instance, 

685 result.parent, result.name 

686 ) 

687 except InvalidSpecError: 

688 target_name = self.__dict__['_mock_name'] or self 

689 raise InvalidSpecError( 

690 f'Cannot autospec attr {name!r} from target ' 

691 f'{target_name!r} as it has already been mocked out. ' 

692 f'[target={self!r}, attr={result.spec!r}]') 

693 self._mock_children[name] = result 

694 

695 return result 

696 

697 

698 def _extract_mock_name(self): 

699 _name_list = [self._mock_new_name] 

700 _parent = self._mock_new_parent 

701 last = self 

702 

703 dot = '.' 

704 if _name_list == ['()']: 

705 dot = '' 

706 

707 while _parent is not None: 

708 last = _parent 

709 

710 _name_list.append(_parent._mock_new_name + dot) 

711 dot = '.' 

712 if _parent._mock_new_name == '()': 

713 dot = '' 

714 

715 _parent = _parent._mock_new_parent 

716 

717 _name_list = list(reversed(_name_list)) 

718 _first = last._mock_name or 'mock' 

719 if len(_name_list) > 1: 

720 if _name_list[1] not in ('()', '().'): 

721 _first += '.' 

722 _name_list[0] = _first 

723 return ''.join(_name_list) 

724 

725 def __repr__(self): 

726 name = self._extract_mock_name() 

727 

728 name_string = '' 

729 if name not in ('mock', 'mock.'): 

730 name_string = ' name=%r' % name 

731 

732 spec_string = '' 

733 if self._spec_class is not None: 

734 spec_string = ' spec=%r' 

735 if self._spec_set: 

736 spec_string = ' spec_set=%r' 

737 spec_string = spec_string % self._spec_class.__name__ 

738 return "<%s%s%s id='%s'>" % ( 

739 type(self).__name__, 

740 name_string, 

741 spec_string, 

742 id(self) 

743 ) 

744 

745 

746 def __dir__(self): 

747 """Filter the output of `dir(mock)` to only useful members.""" 

748 if not FILTER_DIR: 

749 return object.__dir__(self) 

750 

751 extras = self._mock_methods or [] 

752 from_type = dir(type(self)) 

753 from_dict = list(self.__dict__) 

754 from_child_mocks = [ 

755 m_name for m_name, m_value in self._mock_children.items() 

756 if m_value is not _deleted] 

757 

758 from_type = [e for e in from_type if not e.startswith('_')] 

759 from_dict = [e for e in from_dict if not e.startswith('_') or 

760 _is_magic(e)] 

761 return sorted(set(extras + from_type + from_dict + from_child_mocks)) 

762 

763 

764 def __setattr__(self, name, value): 

765 if name in _allowed_names: 

766 # property setters go through here 

767 return object.__setattr__(self, name, value) 

768 elif (self._spec_set and self._mock_methods is not None and 

769 name not in self._mock_methods and 

770 name not in self.__dict__): 

771 raise AttributeError("Mock object has no attribute '%s'" % name) 

772 elif name in _unsupported_magics: 

773 msg = 'Attempting to set unsupported magic method %r.' % name 

774 raise AttributeError(msg) 

775 elif name in _all_magics: 

776 if self._mock_methods is not None and name not in self._mock_methods: 

777 raise AttributeError("Mock object has no attribute '%s'" % name) 

778 

779 if not _is_instance_mock(value): 

780 setattr(type(self), name, _get_method(name, value)) 

781 original = value 

782 value = lambda *args, **kw: original(self, *args, **kw) 

783 else: 

784 # only set _new_name and not name so that mock_calls is tracked 

785 # but not method calls 

786 _check_and_set_parent(self, value, None, name) 

787 setattr(type(self), name, value) 

788 self._mock_children[name] = value 

789 elif name == '__class__': 

790 self._spec_class = value 

791 return 

792 else: 

793 if _check_and_set_parent(self, value, name, name): 

794 self._mock_children[name] = value 

795 

796 if self._mock_sealed and not hasattr(self, name): 

797 mock_name = f'{self._extract_mock_name()}.{name}' 

798 raise AttributeError(f'Cannot set {mock_name}') 

799 

800 return object.__setattr__(self, name, value) 

801 

802 

803 def __delattr__(self, name): 

804 if name in _all_magics and name in type(self).__dict__: 

805 delattr(type(self), name) 

806 if name not in self.__dict__: 

807 # for magic methods that are still MagicProxy objects and 

808 # not set on the instance itself 

809 return 

810 

811 obj = self._mock_children.get(name, _missing) 

812 if name in self.__dict__: 

813 _safe_super(NonCallableMock, self).__delattr__(name) 

814 elif obj is _deleted: 

815 raise AttributeError(name) 

816 if obj is not _missing: 

817 del self._mock_children[name] 

818 self._mock_children[name] = _deleted 

819 

820 

821 def _format_mock_call_signature(self, args, kwargs): 

822 name = self._mock_name or 'mock' 

823 return _format_call_signature(name, args, kwargs) 

824 

825 

826 def _format_mock_failure_message(self, args, kwargs, action='call'): 

827 message = 'expected %s not found.\nExpected: %s\nActual: %s' 

828 expected_string = self._format_mock_call_signature(args, kwargs) 

829 call_args = self.call_args 

830 actual_string = self._format_mock_call_signature(*call_args) 

831 return message % (action, expected_string, actual_string) 

832 

833 

834 def _get_call_signature_from_name(self, name): 

835 """ 

836 * If call objects are asserted against a method/function like obj.meth1 

837 then there could be no name for the call object to lookup. Hence just 

838 return the spec_signature of the method/function being asserted against. 

839 * If the name is not empty then remove () and split by '.' to get 

840 list of names to iterate through the children until a potential 

841 match is found. A child mock is created only during attribute access 

842 so if we get a _SpecState then no attributes of the spec were accessed 

843 and can be safely exited. 

844 """ 

845 if not name: 

846 return self._spec_signature 

847 

848 sig = None 

849 names = name.replace('()', '').split('.') 

850 children = self._mock_children 

851 

852 for name in names: 

853 child = children.get(name) 

854 if child is None or isinstance(child, _SpecState): 

855 break 

856 else: 

857 # If an autospecced object is attached using attach_mock the 

858 # child would be a function with mock object as attribute from 

859 # which signature has to be derived. 

860 child = _extract_mock(child) 

861 children = child._mock_children 

862 sig = child._spec_signature 

863 

864 return sig 

865 

866 

867 def _call_matcher(self, _call): 

868 """ 

869 Given a call (or simply an (args, kwargs) tuple), return a 

870 comparison key suitable for matching with other calls. 

871 This is a best effort method which relies on the spec's signature, 

872 if available, or falls back on the arguments themselves. 

873 """ 

874 

875 if isinstance(_call, tuple) and len(_call) > 2: 

876 sig = self._get_call_signature_from_name(_call[0]) 

877 else: 

878 sig = self._spec_signature 

879 

880 if sig is not None: 

881 if len(_call) == 2: 

882 name = '' 

883 args, kwargs = _call 

884 else: 

885 name, args, kwargs = _call 

886 try: 

887 bound_call = sig.bind(*args, **kwargs) 

888 return call(name, bound_call.args, bound_call.kwargs) 

889 except TypeError as e: 

890 return e.with_traceback(None) 

891 else: 

892 return _call 

893 

894 def assert_not_called(_mock_self): 

895 """assert that the mock was never called. 

896 """ 

897 self = _mock_self 

898 if self.call_count != 0: 

899 msg = ("Expected '%s' to not have been called. Called %s times.%s" 

900 % (self._mock_name or 'mock', 

901 self.call_count, 

902 self._calls_repr())) 

903 raise AssertionError(msg) 

904 

905 def assert_called(_mock_self): 

906 """assert that the mock was called at least once 

907 """ 

908 self = _mock_self 

909 if self.call_count == 0: 

910 msg = ("Expected '%s' to have been called." % 

911 (self._mock_name or 'mock')) 

912 raise AssertionError(msg) 

913 

914 def assert_called_once(_mock_self): 

915 """assert that the mock was called only once. 

916 """ 

917 self = _mock_self 

918 if not self.call_count == 1: 

919 msg = ("Expected '%s' to have been called once. Called %s times.%s" 

920 % (self._mock_name or 'mock', 

921 self.call_count, 

922 self._calls_repr())) 

923 raise AssertionError(msg) 

924 

925 def assert_called_with(_mock_self, *args, **kwargs): 

926 """assert that the last call was made with the specified arguments. 

927 

928 Raises an AssertionError if the args and keyword args passed in are 

929 different to the last call to the mock.""" 

930 self = _mock_self 

931 if self.call_args is None: 

932 expected = self._format_mock_call_signature(args, kwargs) 

933 actual = 'not called.' 

934 error_message = ('expected call not found.\nExpected: %s\nActual: %s' 

935 % (expected, actual)) 

936 raise AssertionError(error_message) 

937 

938 def _error_message(): 

939 msg = self._format_mock_failure_message(args, kwargs) 

940 return msg 

941 expected = self._call_matcher(_Call((args, kwargs), two=True)) 

942 actual = self._call_matcher(self.call_args) 

943 if actual != expected: 

944 cause = expected if isinstance(expected, Exception) else None 

945 raise AssertionError(_error_message()) from cause 

946 

947 

948 def assert_called_once_with(_mock_self, *args, **kwargs): 

949 """assert that the mock was called exactly once and that that call was 

950 with the specified arguments.""" 

951 self = _mock_self 

952 if not self.call_count == 1: 

953 msg = ("Expected '%s' to be called once. Called %s times.%s" 

954 % (self._mock_name or 'mock', 

955 self.call_count, 

956 self._calls_repr())) 

957 raise AssertionError(msg) 

958 return self.assert_called_with(*args, **kwargs) 

959 

960 

961 def assert_has_calls(self, calls, any_order=False): 

962 """assert the mock has been called with the specified calls. 

963 The `mock_calls` list is checked for the calls. 

964 

965 If `any_order` is False (the default) then the calls must be 

966 sequential. There can be extra calls before or after the 

967 specified calls. 

968 

969 If `any_order` is True then the calls can be in any order, but 

970 they must all appear in `mock_calls`.""" 

971 expected = [self._call_matcher(c) for c in calls] 

972 cause = next((e for e in expected if isinstance(e, Exception)), None) 

973 all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls) 

974 if not any_order: 

975 if expected not in all_calls: 

976 if cause is None: 

977 problem = 'Calls not found.' 

978 else: 

979 problem = ('Error processing expected calls.\n' 

980 'Errors: {}').format( 

981 [e if isinstance(e, Exception) else None 

982 for e in expected]) 

983 raise AssertionError( 

984 f'{problem}\n' 

985 f'Expected: {_CallList(calls)}' 

986 f'{self._calls_repr(prefix="Actual").rstrip(".")}' 

987 ) from cause 

988 return 

989 

990 all_calls = list(all_calls) 

991 

992 not_found = [] 

993 for kall in expected: 

994 try: 

995 all_calls.remove(kall) 

996 except ValueError: 

997 not_found.append(kall) 

998 if not_found: 

999 raise AssertionError( 

1000 '%r does not contain all of %r in its call list, ' 

1001 'found %r instead' % (self._mock_name or 'mock', 

1002 tuple(not_found), all_calls) 

1003 ) from cause 

1004 

1005 

1006 def assert_any_call(self, *args, **kwargs): 

1007 """assert the mock has been called with the specified arguments. 

1008 

1009 The assert passes if the mock has *ever* been called, unlike 

1010 `assert_called_with` and `assert_called_once_with` that only pass if 

1011 the call is the most recent one.""" 

1012 expected = self._call_matcher(_Call((args, kwargs), two=True)) 

1013 cause = expected if isinstance(expected, Exception) else None 

1014 actual = [self._call_matcher(c) for c in self.call_args_list] 

1015 if cause or expected not in _AnyComparer(actual): 

1016 expected_string = self._format_mock_call_signature(args, kwargs) 

1017 raise AssertionError( 

1018 '%s call not found' % expected_string 

1019 ) from cause 

1020 

1021 

1022 def _get_child_mock(self, **kw): 

1023 """Create the child mocks for attributes and return value. 

1024 By default child mocks will be the same type as the parent. 

1025 Subclasses of Mock may want to override this to customize the way 

1026 child mocks are made. 

1027 

1028 For non-callable mocks the callable variant will be used (rather than 

1029 any custom subclass).""" 

1030 if self._mock_sealed: 

1031 attribute = f".{kw['name']}" if "name" in kw else "()" 

1032 mock_name = self._extract_mock_name() + attribute 

1033 raise AttributeError(mock_name) 

1034 

1035 _new_name = kw.get("_new_name") 

1036 if _new_name in self.__dict__['_spec_asyncs']: 

1037 return AsyncMock(**kw) 

1038 

1039 _type = type(self) 

1040 if issubclass(_type, MagicMock) and _new_name in _async_method_magics: 

1041 # Any asynchronous magic becomes an AsyncMock 

1042 klass = AsyncMock 

1043 elif issubclass(_type, AsyncMockMixin): 

1044 if (_new_name in _all_sync_magics or 

1045 self._mock_methods and _new_name in self._mock_methods): 

1046 # Any synchronous method on AsyncMock becomes a MagicMock 

1047 klass = MagicMock 

1048 else: 

1049 klass = AsyncMock 

1050 elif not issubclass(_type, CallableMixin): 

1051 if issubclass(_type, NonCallableMagicMock): 

1052 klass = MagicMock 

1053 elif issubclass(_type, NonCallableMock): 

1054 klass = Mock 

1055 else: 

1056 klass = _type.__mro__[1] 

1057 return klass(**kw) 

1058 

1059 

1060 def _calls_repr(self, prefix="Calls"): 

1061 """Renders self.mock_calls as a string. 

1062 

1063 Example: "\nCalls: [call(1), call(2)]." 

1064 

1065 If self.mock_calls is empty, an empty string is returned. The 

1066 output will be truncated if very long. 

1067 """ 

1068 if not self.mock_calls: 

1069 return "" 

1070 return f"\n{prefix}: {safe_repr(self.mock_calls)}." 

1071 

1072 

1073try: 

1074 removeprefix = str.removeprefix 

1075except AttributeError: 

1076 # Py 3.8 and earlier: 

1077 def removeprefix(name, prefix): 

1078 return name[len(prefix):] 

1079 

1080# Denylist for forbidden attribute names in safe mode 

1081_ATTRIB_DENY_LIST = frozenset({ 

1082 removeprefix(name, "assert_") 

1083 for name in dir(NonCallableMock) 

1084 if name.startswith("assert_") 

1085}) 

1086 

1087 

1088class _AnyComparer(list): 

1089 """A list which checks if it contains a call which may have an 

1090 argument of ANY, flipping the components of item and self from 

1091 their traditional locations so that ANY is guaranteed to be on 

1092 the left.""" 

1093 def __contains__(self, item): 

1094 for _call in self: 

1095 assert len(item) == len(_call) 

1096 if all([ 

1097 expected == actual 

1098 for expected, actual in zip(item, _call) 

1099 ]): 

1100 return True 

1101 return False 

1102 

1103 

1104def _try_iter(obj): 

1105 if obj is None: 

1106 return obj 

1107 if _is_exception(obj): 

1108 return obj 

1109 if _callable(obj): 

1110 return obj 

1111 try: 

1112 return iter(obj) 

1113 except TypeError: 

1114 # XXXX backwards compatibility 

1115 # but this will blow up on first call - so maybe we should fail early? 

1116 return obj 

1117 

1118 

1119class CallableMixin(Base): 

1120 

1121 def __init__(self, spec=None, side_effect=None, return_value=DEFAULT, 

1122 wraps=None, name=None, spec_set=None, parent=None, 

1123 _spec_state=None, _new_name='', _new_parent=None, **kwargs): 

1124 self.__dict__['_mock_return_value'] = return_value 

1125 _safe_super(CallableMixin, self).__init__( 

1126 spec, wraps, name, spec_set, parent, 

1127 _spec_state, _new_name, _new_parent, **kwargs 

1128 ) 

1129 

1130 self.side_effect = side_effect 

1131 

1132 

1133 def _mock_check_sig(self, *args, **kwargs): 

1134 # stub method that can be replaced with one with a specific signature 

1135 pass 

1136 

1137 

1138 def __call__(_mock_self, *args, **kwargs): 

1139 # can't use self in-case a function / method we are mocking uses self 

1140 # in the signature 

1141 _mock_self._mock_check_sig(*args, **kwargs) 

1142 _mock_self._increment_mock_call(*args, **kwargs) 

1143 return _mock_self._mock_call(*args, **kwargs) 

1144 

1145 

1146 def _mock_call(_mock_self, *args, **kwargs): 

1147 return _mock_self._execute_mock_call(*args, **kwargs) 

1148 

1149 def _increment_mock_call(_mock_self, *args, **kwargs): 

1150 self = _mock_self 

1151 self.called = True 

1152 self.call_count += 1 

1153 

1154 # handle call_args 

1155 # needs to be set here so assertions on call arguments pass before 

1156 # execution in the case of awaited calls 

1157 _call = _Call((args, kwargs), two=True) 

1158 self.call_args = _call 

1159 self.call_args_list.append(_call) 

1160 

1161 # initial stuff for method_calls: 

1162 do_method_calls = self._mock_parent is not None 

1163 method_call_name = self._mock_name 

1164 

1165 # initial stuff for mock_calls: 

1166 mock_call_name = self._mock_new_name 

1167 is_a_call = mock_call_name == '()' 

1168 self.mock_calls.append(_Call(('', args, kwargs))) 

1169 

1170 # follow up the chain of mocks: 

1171 _new_parent = self._mock_new_parent 

1172 while _new_parent is not None: 

1173 

1174 # handle method_calls: 

1175 if do_method_calls: 

1176 _new_parent.method_calls.append(_Call((method_call_name, args, kwargs))) 

1177 do_method_calls = _new_parent._mock_parent is not None 

1178 if do_method_calls: 

1179 method_call_name = _new_parent._mock_name + '.' + method_call_name 

1180 

1181 # handle mock_calls: 

1182 this_mock_call = _Call((mock_call_name, args, kwargs)) 

1183 _new_parent.mock_calls.append(this_mock_call) 

1184 

1185 if _new_parent._mock_new_name: 

1186 if is_a_call: 

1187 dot = '' 

1188 else: 

1189 dot = '.' 

1190 is_a_call = _new_parent._mock_new_name == '()' 

1191 mock_call_name = _new_parent._mock_new_name + dot + mock_call_name 

1192 

1193 # follow the parental chain: 

1194 _new_parent = _new_parent._mock_new_parent 

1195 

1196 def _execute_mock_call(_mock_self, *args, **kwargs): 

1197 self = _mock_self 

1198 # separate from _increment_mock_call so that awaited functions are 

1199 # executed separately from their call, also AsyncMock overrides this method 

1200 

1201 effect = self.side_effect 

1202 if effect is not None: 

1203 if _is_exception(effect): 

1204 raise effect 

1205 elif not _callable(effect): 

1206 result = next(effect) 

1207 if _is_exception(result): 

1208 raise result 

1209 else: 

1210 result = effect(*args, **kwargs) 

1211 

1212 if result is not DEFAULT: 

1213 return result 

1214 

1215 if self._mock_return_value is not DEFAULT: 

1216 return self.return_value 

1217 

1218 if self._mock_wraps is not None: 

1219 return self._mock_wraps(*args, **kwargs) 

1220 

1221 return self.return_value 

1222 

1223 

1224 

1225class Mock(CallableMixin, NonCallableMock): 

1226 """ 

1227 Create a new `Mock` object. `Mock` takes several optional arguments 

1228 that specify the behaviour of the Mock object: 

1229 

1230 * `spec`: This can be either a list of strings or an existing object (a 

1231 class or instance) that acts as the specification for the mock object. If 

1232 you pass in an object then a list of strings is formed by calling dir on 

1233 the object (excluding unsupported magic attributes and methods). Accessing 

1234 any attribute not in this list will raise an `AttributeError`. 

1235 

1236 If `spec` is an object (rather than a list of strings) then 

1237 `mock.__class__` returns the class of the spec object. This allows mocks 

1238 to pass `isinstance` tests. 

1239 

1240 * `spec_set`: A stricter variant of `spec`. If used, attempting to *set* 

1241 or get an attribute on the mock that isn't on the object passed as 

1242 `spec_set` will raise an `AttributeError`. 

1243 

1244 * `side_effect`: A function to be called whenever the Mock is called. See 

1245 the `side_effect` attribute. Useful for raising exceptions or 

1246 dynamically changing return values. The function is called with the same 

1247 arguments as the mock, and unless it returns `DEFAULT`, the return 

1248 value of this function is used as the return value. 

1249 

1250 If `side_effect` is an iterable then each call to the mock will return 

1251 the next value from the iterable. If any of the members of the iterable 

1252 are exceptions they will be raised instead of returned. 

1253 

1254 * `return_value`: The value returned when the mock is called. By default 

1255 this is a new Mock (created on first access). See the 

1256 `return_value` attribute. 

1257 

1258 * `unsafe`: By default, accessing any attribute whose name starts with 

1259 *assert*, *assret*, *asert*, *aseert*, or *assrt* raises an AttributeError. 

1260 Additionally, an AttributeError is raised when accessing 

1261 attributes that match the name of an assertion method without the prefix 

1262 `assert_`, e.g. accessing `called_once` instead of `assert_called_once`. 

1263 Passing `unsafe=True` will allow access to these attributes. 

1264 

1265 * `wraps`: Item for the mock object to wrap. If `wraps` is not None then 

1266 calling the Mock will pass the call through to the wrapped object 

1267 (returning the real result). Attribute access on the mock will return a 

1268 Mock object that wraps the corresponding attribute of the wrapped object 

1269 (so attempting to access an attribute that doesn't exist will raise an 

1270 `AttributeError`). 

1271 

1272 If the mock has an explicit `return_value` set then calls are not passed 

1273 to the wrapped object and the `return_value` is returned instead. 

1274 

1275 * `name`: If the mock has a name then it will be used in the repr of the 

1276 mock. This can be useful for debugging. The name is propagated to child 

1277 mocks. 

1278 

1279 Mocks can also be called with arbitrary keyword arguments. These will be 

1280 used to set attributes on the mock after it is created. 

1281 """ 

1282 

1283 

1284def _dot_lookup(thing, comp, import_path): 

1285 try: 

1286 return getattr(thing, comp) 

1287 except AttributeError: 

1288 __import__(import_path) 

1289 return getattr(thing, comp) 

1290 

1291 

1292def _importer(target): 

1293 components = target.split('.') 

1294 import_path = components.pop(0) 

1295 thing = __import__(import_path) 

1296 

1297 for comp in components: 

1298 import_path += ".%s" % comp 

1299 thing = _dot_lookup(thing, comp, import_path) 

1300 return thing 

1301 

1302 

1303# _check_spec_arg_typos takes kwargs from commands like patch and checks that 

1304# they don't contain common misspellings of arguments related to autospeccing. 

1305def _check_spec_arg_typos(kwargs_to_check): 

1306 typos = ("autospect", "auto_spec", "set_spec") 

1307 for typo in typos: 

1308 if typo in kwargs_to_check: 

1309 raise RuntimeError( 

1310 f"{typo!r} might be a typo; use unsafe=True if this is intended" 

1311 ) 

1312 

1313 

1314class _patch(object): 

1315 

1316 attribute_name = None 

1317 _active_patches = [] 

1318 

1319 def __init__( 

1320 self, getter, attribute, new, spec, create, 

1321 spec_set, autospec, new_callable, kwargs, *, unsafe=False 

1322 ): 

1323 if new_callable is not None: 

1324 if new is not DEFAULT: 

1325 raise ValueError( 

1326 "Cannot use 'new' and 'new_callable' together" 

1327 ) 

1328 if autospec is not None: 

1329 raise ValueError( 

1330 "Cannot use 'autospec' and 'new_callable' together" 

1331 ) 

1332 if not unsafe: 

1333 _check_spec_arg_typos(kwargs) 

1334 if _is_instance_mock(spec): 

1335 raise InvalidSpecError( 

1336 f'Cannot spec attr {attribute!r} as the spec ' 

1337 f'has already been mocked out. [spec={spec!r}]') 

1338 if _is_instance_mock(spec_set): 

1339 raise InvalidSpecError( 

1340 f'Cannot spec attr {attribute!r} as the spec_set ' 

1341 f'target has already been mocked out. [spec_set={spec_set!r}]') 

1342 

1343 self.getter = getter 

1344 self.attribute = attribute 

1345 self.new = new 

1346 self.new_callable = new_callable 

1347 self.spec = spec 

1348 self.create = create 

1349 self.has_local = False 

1350 self.spec_set = spec_set 

1351 self.autospec = autospec 

1352 self.kwargs = kwargs 

1353 self.additional_patchers = [] 

1354 

1355 

1356 def copy(self): 

1357 patcher = _patch( 

1358 self.getter, self.attribute, self.new, self.spec, 

1359 self.create, self.spec_set, 

1360 self.autospec, self.new_callable, self.kwargs 

1361 ) 

1362 patcher.attribute_name = self.attribute_name 

1363 patcher.additional_patchers = [ 

1364 p.copy() for p in self.additional_patchers 

1365 ] 

1366 return patcher 

1367 

1368 

1369 def __call__(self, func): 

1370 if isinstance(func, type): 

1371 return self.decorate_class(func) 

1372 if inspect.iscoroutinefunction(func): 

1373 return self.decorate_async_callable(func) 

1374 return self.decorate_callable(func) 

1375 

1376 

1377 def decorate_class(self, klass): 

1378 for attr in dir(klass): 

1379 if not attr.startswith(patch.TEST_PREFIX): 

1380 continue 

1381 

1382 attr_value = getattr(klass, attr) 

1383 if not hasattr(attr_value, "__call__"): 

1384 continue 

1385 

1386 patcher = self.copy() 

1387 setattr(klass, attr, patcher(attr_value)) 

1388 return klass 

1389 

1390 

1391 @contextlib.contextmanager 

1392 def decoration_helper(self, patched, args, keywargs): 

1393 extra_args = [] 

1394 with contextlib.ExitStack() as exit_stack: 

1395 for patching in patched.patchings: 

1396 arg = exit_stack.enter_context(patching) 

1397 if patching.attribute_name is not None: 

1398 keywargs.update(arg) 

1399 elif patching.new is DEFAULT: 

1400 extra_args.append(arg) 

1401 

1402 args += tuple(extra_args) 

1403 yield (args, keywargs) 

1404 

1405 

1406 def decorate_callable(self, func): 

1407 # NB. Keep the method in sync with decorate_async_callable() 

1408 if hasattr(func, 'patchings'): 

1409 func.patchings.append(self) 

1410 return func 

1411 

1412 @wraps(func) 

1413 def patched(*args, **keywargs): 

1414 with self.decoration_helper(patched, 

1415 args, 

1416 keywargs) as (newargs, newkeywargs): 

1417 return func(*newargs, **newkeywargs) 

1418 

1419 patched.patchings = [self] 

1420 return patched 

1421 

1422 

1423 def decorate_async_callable(self, func): 

1424 # NB. Keep the method in sync with decorate_callable() 

1425 if hasattr(func, 'patchings'): 

1426 func.patchings.append(self) 

1427 return func 

1428 

1429 @wraps(func) 

1430 async def patched(*args, **keywargs): 

1431 with self.decoration_helper(patched, 

1432 args, 

1433 keywargs) as (newargs, newkeywargs): 

1434 return await func(*newargs, **newkeywargs) 

1435 

1436 patched.patchings = [self] 

1437 return patched 

1438 

1439 

1440 def get_original(self): 

1441 target = self.getter() 

1442 name = self.attribute 

1443 

1444 original = DEFAULT 

1445 local = False 

1446 

1447 try: 

1448 original = target.__dict__[name] 

1449 except (AttributeError, KeyError): 

1450 original = getattr(target, name, DEFAULT) 

1451 else: 

1452 local = True 

1453 

1454 if name in _builtins and isinstance(target, ModuleType): 

1455 self.create = True 

1456 

1457 if not self.create and original is DEFAULT: 

1458 raise AttributeError( 

1459 "%s does not have the attribute %r" % (target, name) 

1460 ) 

1461 return original, local 

1462 

1463 

1464 def __enter__(self): 

1465 """Perform the patch.""" 

1466 new, spec, spec_set = self.new, self.spec, self.spec_set 

1467 autospec, kwargs = self.autospec, self.kwargs 

1468 new_callable = self.new_callable 

1469 self.target = self.getter() 

1470 

1471 # normalise False to None 

1472 if spec is False: 

1473 spec = None 

1474 if spec_set is False: 

1475 spec_set = None 

1476 if autospec is False: 

1477 autospec = None 

1478 

1479 if spec is not None and autospec is not None: 

1480 raise TypeError("Can't specify spec and autospec") 

1481 if ((spec is not None or autospec is not None) and 

1482 spec_set not in (True, None)): 

1483 raise TypeError("Can't provide explicit spec_set *and* spec or autospec") 

1484 

1485 original, local = self.get_original() 

1486 

1487 if new is DEFAULT and autospec is None: 

1488 inherit = False 

1489 if spec is True: 

1490 # set spec to the object we are replacing 

1491 spec = original 

1492 if spec_set is True: 

1493 spec_set = original 

1494 spec = None 

1495 elif spec is not None: 

1496 if spec_set is True: 

1497 spec_set = spec 

1498 spec = None 

1499 elif spec_set is True: 

1500 spec_set = original 

1501 

1502 if spec is not None or spec_set is not None: 

1503 if original is DEFAULT: 

1504 raise TypeError("Can't use 'spec' with create=True") 

1505 if isinstance(original, type): 

1506 # If we're patching out a class and there is a spec 

1507 inherit = True 

1508 if spec is None and _is_async_obj(original): 

1509 Klass = AsyncMock 

1510 else: 

1511 Klass = MagicMock 

1512 _kwargs = {} 

1513 if new_callable is not None: 

1514 Klass = new_callable 

1515 elif spec is not None or spec_set is not None: 

1516 this_spec = spec 

1517 if spec_set is not None: 

1518 this_spec = spec_set 

1519 if _is_list(this_spec): 

1520 not_callable = '__call__' not in this_spec 

1521 else: 

1522 not_callable = not callable(this_spec) 

1523 if _is_async_obj(this_spec): 

1524 Klass = AsyncMock 

1525 elif not_callable: 

1526 Klass = NonCallableMagicMock 

1527 

1528 if spec is not None: 

1529 _kwargs['spec'] = spec 

1530 if spec_set is not None: 

1531 _kwargs['spec_set'] = spec_set 

1532 

1533 # add a name to mocks 

1534 if (isinstance(Klass, type) and 

1535 issubclass(Klass, NonCallableMock) and self.attribute): 

1536 _kwargs['name'] = self.attribute 

1537 

1538 _kwargs.update(kwargs) 

1539 new = Klass(**_kwargs) 

1540 

1541 if inherit and _is_instance_mock(new): 

1542 # we can only tell if the instance should be callable if the 

1543 # spec is not a list 

1544 this_spec = spec 

1545 if spec_set is not None: 

1546 this_spec = spec_set 

1547 if (not _is_list(this_spec) and not 

1548 _instance_callable(this_spec)): 

1549 Klass = NonCallableMagicMock 

1550 

1551 _kwargs.pop('name') 

1552 new.return_value = Klass(_new_parent=new, _new_name='()', 

1553 **_kwargs) 

1554 elif autospec is not None: 

1555 # spec is ignored, new *must* be default, spec_set is treated 

1556 # as a boolean. Should we check spec is not None and that spec_set 

1557 # is a bool? 

1558 if new is not DEFAULT: 

1559 raise TypeError( 

1560 "autospec creates the mock for you. Can't specify " 

1561 "autospec and new." 

1562 ) 

1563 if original is DEFAULT: 

1564 raise TypeError("Can't use 'autospec' with create=True") 

1565 spec_set = bool(spec_set) 

1566 if autospec is True: 

1567 autospec = original 

1568 

1569 if _is_instance_mock(self.target): 

1570 raise InvalidSpecError( 

1571 f'Cannot autospec attr {self.attribute!r} as the patch ' 

1572 f'target has already been mocked out. ' 

1573 f'[target={self.target!r}, attr={autospec!r}]') 

1574 if _is_instance_mock(autospec): 

1575 target_name = getattr(self.target, '__name__', self.target) 

1576 raise InvalidSpecError( 

1577 f'Cannot autospec attr {self.attribute!r} from target ' 

1578 f'{target_name!r} as it has already been mocked out. ' 

1579 f'[target={self.target!r}, attr={autospec!r}]') 

1580 

1581 new = create_autospec(autospec, spec_set=spec_set, 

1582 _name=self.attribute, **kwargs) 

1583 elif kwargs: 

1584 # can't set keyword args when we aren't creating the mock 

1585 # XXXX If new is a Mock we could call new.configure_mock(**kwargs) 

1586 raise TypeError("Can't pass kwargs to a mock we aren't creating") 

1587 

1588 new_attr = new 

1589 

1590 self.temp_original = original 

1591 self.is_local = local 

1592 self._exit_stack = contextlib.ExitStack() 

1593 try: 

1594 setattr(self.target, self.attribute, new_attr) 

1595 if self.attribute_name is not None: 

1596 extra_args = {} 

1597 if self.new is DEFAULT: 

1598 extra_args[self.attribute_name] = new 

1599 for patching in self.additional_patchers: 

1600 arg = self._exit_stack.enter_context(patching) 

1601 if patching.new is DEFAULT: 

1602 extra_args.update(arg) 

1603 return extra_args 

1604 

1605 return new 

1606 except: 

1607 if not self.__exit__(*sys.exc_info()): 

1608 raise 

1609 

1610 def __exit__(self, *exc_info): 

1611 """Undo the patch.""" 

1612 if self.is_local and self.temp_original is not DEFAULT: 

1613 setattr(self.target, self.attribute, self.temp_original) 

1614 else: 

1615 delattr(self.target, self.attribute) 

1616 if not self.create and (not hasattr(self.target, self.attribute) or 

1617 self.attribute in ('__doc__', '__module__', 

1618 '__defaults__', '__annotations__', 

1619 '__kwdefaults__')): 

1620 # needed for proxy objects like django settings 

1621 setattr(self.target, self.attribute, self.temp_original) 

1622 

1623 del self.temp_original 

1624 del self.is_local 

1625 del self.target 

1626 exit_stack = self._exit_stack 

1627 del self._exit_stack 

1628 return exit_stack.__exit__(*exc_info) 

1629 

1630 

1631 def start(self): 

1632 """Activate a patch, returning any created mock.""" 

1633 result = self.__enter__() 

1634 self._active_patches.append(self) 

1635 return result 

1636 

1637 

1638 def stop(self): 

1639 """Stop an active patch.""" 

1640 try: 

1641 self._active_patches.remove(self) 

1642 except ValueError: 

1643 # If the patch hasn't been started this will fail 

1644 return None 

1645 

1646 return self.__exit__(None, None, None) 

1647 

1648 

1649 

1650def _get_target(target): 

1651 try: 

1652 target, attribute = target.rsplit('.', 1) 

1653 except (TypeError, ValueError, AttributeError): 

1654 raise TypeError( 

1655 f"Need a valid target to patch. You supplied: {target!r}") 

1656 getter = lambda: _importer(target) 

1657 return getter, attribute 

1658 

1659 

1660def _patch_object( 

1661 target, attribute, new=DEFAULT, spec=None, 

1662 create=False, spec_set=None, autospec=None, 

1663 new_callable=None, *, unsafe=False, **kwargs 

1664 ): 

1665 """ 

1666 patch the named member (`attribute`) on an object (`target`) with a mock 

1667 object. 

1668 

1669 `patch.object` can be used as a decorator, class decorator or a context 

1670 manager. Arguments `new`, `spec`, `create`, `spec_set`, 

1671 `autospec` and `new_callable` have the same meaning as for `patch`. Like 

1672 `patch`, `patch.object` takes arbitrary keyword arguments for configuring 

1673 the mock object it creates. 

1674 

1675 When used as a class decorator `patch.object` honours `patch.TEST_PREFIX` 

1676 for choosing which methods to wrap. 

1677 """ 

1678 if type(target) is str: 

1679 raise TypeError( 

1680 f"{target!r} must be the actual object to be patched, not a str" 

1681 ) 

1682 getter = lambda: target 

1683 return _patch( 

1684 getter, attribute, new, spec, create, 

1685 spec_set, autospec, new_callable, kwargs, unsafe=unsafe 

1686 ) 

1687 

1688 

1689def _patch_multiple(target, spec=None, create=False, spec_set=None, 

1690 autospec=None, new_callable=None, **kwargs): 

1691 """Perform multiple patches in a single call. It takes the object to be 

1692 patched (either as an object or a string to fetch the object by importing) 

1693 and keyword arguments for the patches:: 

1694 

1695 with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'): 

1696 ... 

1697 

1698 Use `DEFAULT` as the value if you want `patch.multiple` to create 

1699 mocks for you. In this case the created mocks are passed into a decorated 

1700 function by keyword, and a dictionary is returned when `patch.multiple` is 

1701 used as a context manager. 

1702 

1703 `patch.multiple` can be used as a decorator, class decorator or a context 

1704 manager. The arguments `spec`, `spec_set`, `create`, 

1705 `autospec` and `new_callable` have the same meaning as for `patch`. These 

1706 arguments will be applied to *all* patches done by `patch.multiple`. 

1707 

1708 When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX` 

1709 for choosing which methods to wrap. 

1710 """ 

1711 if type(target) is str: 

1712 getter = lambda: _importer(target) 

1713 else: 

1714 getter = lambda: target 

1715 

1716 if not kwargs: 

1717 raise ValueError( 

1718 'Must supply at least one keyword argument with patch.multiple' 

1719 ) 

1720 # need to wrap in a list for python 3, where items is a view 

1721 items = list(kwargs.items()) 

1722 attribute, new = items[0] 

1723 patcher = _patch( 

1724 getter, attribute, new, spec, create, spec_set, 

1725 autospec, new_callable, {} 

1726 ) 

1727 patcher.attribute_name = attribute 

1728 for attribute, new in items[1:]: 

1729 this_patcher = _patch( 

1730 getter, attribute, new, spec, create, spec_set, 

1731 autospec, new_callable, {} 

1732 ) 

1733 this_patcher.attribute_name = attribute 

1734 patcher.additional_patchers.append(this_patcher) 

1735 return patcher 

1736 

1737 

1738def patch( 

1739 target, new=DEFAULT, spec=None, create=False, 

1740 spec_set=None, autospec=None, new_callable=None, *, unsafe=False, **kwargs 

1741 ): 

1742 """ 

1743 `patch` acts as a function decorator, class decorator or a context 

1744 manager. Inside the body of the function or with statement, the `target` 

1745 is patched with a `new` object. When the function/with statement exits 

1746 the patch is undone. 

1747 

1748 If `new` is omitted, then the target is replaced with an 

1749 `AsyncMock if the patched object is an async function or a 

1750 `MagicMock` otherwise. If `patch` is used as a decorator and `new` is 

1751 omitted, the created mock is passed in as an extra argument to the 

1752 decorated function. If `patch` is used as a context manager the created 

1753 mock is returned by the context manager. 

1754 

1755 `target` should be a string in the form `'package.module.ClassName'`. The 

1756 `target` is imported and the specified object replaced with the `new` 

1757 object, so the `target` must be importable from the environment you are 

1758 calling `patch` from. The target is imported when the decorated function 

1759 is executed, not at decoration time. 

1760 

1761 The `spec` and `spec_set` keyword arguments are passed to the `MagicMock` 

1762 if patch is creating one for you. 

1763 

1764 In addition you can pass `spec=True` or `spec_set=True`, which causes 

1765 patch to pass in the object being mocked as the spec/spec_set object. 

1766 

1767 `new_callable` allows you to specify a different class, or callable object, 

1768 that will be called to create the `new` object. By default `AsyncMock` is 

1769 used for async functions and `MagicMock` for the rest. 

1770 

1771 A more powerful form of `spec` is `autospec`. If you set `autospec=True` 

1772 then the mock will be created with a spec from the object being replaced. 

1773 All attributes of the mock will also have the spec of the corresponding 

1774 attribute of the object being replaced. Methods and functions being 

1775 mocked will have their arguments checked and will raise a `TypeError` if 

1776 they are called with the wrong signature. For mocks replacing a class, 

1777 their return value (the 'instance') will have the same spec as the class. 

1778 

1779 Instead of `autospec=True` you can pass `autospec=some_object` to use an 

1780 arbitrary object as the spec instead of the one being replaced. 

1781 

1782 By default `patch` will fail to replace attributes that don't exist. If 

1783 you pass in `create=True`, and the attribute doesn't exist, patch will 

1784 create the attribute for you when the patched function is called, and 

1785 delete it again afterwards. This is useful for writing tests against 

1786 attributes that your production code creates at runtime. It is off by 

1787 default because it can be dangerous. With it switched on you can write 

1788 passing tests against APIs that don't actually exist! 

1789 

1790 Patch can be used as a `TestCase` class decorator. It works by 

1791 decorating each test method in the class. This reduces the boilerplate 

1792 code when your test methods share a common patchings set. `patch` finds 

1793 tests by looking for method names that start with `patch.TEST_PREFIX`. 

1794 By default this is `test`, which matches the way `unittest` finds tests. 

1795 You can specify an alternative prefix by setting `patch.TEST_PREFIX`. 

1796 

1797 Patch can be used as a context manager, with the with statement. Here the 

1798 patching applies to the indented block after the with statement. If you 

1799 use "as" then the patched object will be bound to the name after the 

1800 "as"; very useful if `patch` is creating a mock object for you. 

1801 

1802 Patch will raise a `RuntimeError` if passed some common misspellings of 

1803 the arguments autospec and spec_set. Pass the argument `unsafe` with the 

1804 value True to disable that check. 

1805 

1806 `patch` takes arbitrary keyword arguments. These will be passed to 

1807 `AsyncMock` if the patched object is asynchronous, to `MagicMock` 

1808 otherwise or to `new_callable` if specified. 

1809 

1810 `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are 

1811 available for alternate use-cases. 

1812 """ 

1813 getter, attribute = _get_target(target) 

1814 return _patch( 

1815 getter, attribute, new, spec, create, 

1816 spec_set, autospec, new_callable, kwargs, unsafe=unsafe 

1817 ) 

1818 

1819 

1820class _patch_dict(object): 

1821 """ 

1822 Patch a dictionary, or dictionary like object, and restore the dictionary 

1823 to its original state after the test. 

1824 

1825 `in_dict` can be a dictionary or a mapping like container. If it is a 

1826 mapping then it must at least support getting, setting and deleting items 

1827 plus iterating over keys. 

1828 

1829 `in_dict` can also be a string specifying the name of the dictionary, which 

1830 will then be fetched by importing it. 

1831 

1832 `values` can be a dictionary of values to set in the dictionary. `values` 

1833 can also be an iterable of `(key, value)` pairs. 

1834 

1835 If `clear` is True then the dictionary will be cleared before the new 

1836 values are set. 

1837 

1838 `patch.dict` can also be called with arbitrary keyword arguments to set 

1839 values in the dictionary:: 

1840 

1841 with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()): 

1842 ... 

1843 

1844 `patch.dict` can be used as a context manager, decorator or class 

1845 decorator. When used as a class decorator `patch.dict` honours 

1846 `patch.TEST_PREFIX` for choosing which methods to wrap. 

1847 """ 

1848 

1849 def __init__(self, in_dict, values=(), clear=False, **kwargs): 

1850 self.in_dict = in_dict 

1851 # support any argument supported by dict(...) constructor 

1852 self.values = dict(values) 

1853 self.values.update(kwargs) 

1854 self.clear = clear 

1855 self._original = None 

1856 

1857 

1858 def __call__(self, f): 

1859 if isinstance(f, type): 

1860 return self.decorate_class(f) 

1861 if inspect.iscoroutinefunction(f): 

1862 return self.decorate_async_callable(f) 

1863 return self.decorate_callable(f) 

1864 

1865 

1866 def decorate_callable(self, f): 

1867 @wraps(f) 

1868 def _inner(*args, **kw): 

1869 self._patch_dict() 

1870 try: 

1871 return f(*args, **kw) 

1872 finally: 

1873 self._unpatch_dict() 

1874 

1875 return _inner 

1876 

1877 

1878 def decorate_async_callable(self, f): 

1879 @wraps(f) 

1880 async def _inner(*args, **kw): 

1881 self._patch_dict() 

1882 try: 

1883 return await f(*args, **kw) 

1884 finally: 

1885 self._unpatch_dict() 

1886 

1887 return _inner 

1888 

1889 

1890 def decorate_class(self, klass): 

1891 for attr in dir(klass): 

1892 attr_value = getattr(klass, attr) 

1893 if (attr.startswith(patch.TEST_PREFIX) and 

1894 hasattr(attr_value, "__call__")): 

1895 decorator = _patch_dict(self.in_dict, self.values, self.clear) 

1896 decorated = decorator(attr_value) 

1897 setattr(klass, attr, decorated) 

1898 return klass 

1899 

1900 

1901 def __enter__(self): 

1902 """Patch the dict.""" 

1903 self._patch_dict() 

1904 return self.in_dict 

1905 

1906 

1907 def _patch_dict(self): 

1908 values = self.values 

1909 if isinstance(self.in_dict, str): 

1910 self.in_dict = _importer(self.in_dict) 

1911 in_dict = self.in_dict 

1912 clear = self.clear 

1913 

1914 try: 

1915 original = in_dict.copy() 

1916 except AttributeError: 

1917 # dict like object with no copy method 

1918 # must support iteration over keys 

1919 original = {} 

1920 for key in in_dict: 

1921 original[key] = in_dict[key] 

1922 self._original = original 

1923 

1924 if clear: 

1925 _clear_dict(in_dict) 

1926 

1927 try: 

1928 in_dict.update(values) 

1929 except AttributeError: 

1930 # dict like object with no update method 

1931 for key in values: 

1932 in_dict[key] = values[key] 

1933 

1934 

1935 def _unpatch_dict(self): 

1936 in_dict = self.in_dict 

1937 original = self._original 

1938 

1939 _clear_dict(in_dict) 

1940 

1941 try: 

1942 in_dict.update(original) 

1943 except AttributeError: 

1944 for key in original: 

1945 in_dict[key] = original[key] 

1946 

1947 

1948 def __exit__(self, *args): 

1949 """Unpatch the dict.""" 

1950 if self._original is not None: 

1951 self._unpatch_dict() 

1952 return False 

1953 

1954 

1955 def start(self): 

1956 """Activate a patch, returning any created mock.""" 

1957 result = self.__enter__() 

1958 _patch._active_patches.append(self) 

1959 return result 

1960 

1961 

1962 def stop(self): 

1963 """Stop an active patch.""" 

1964 try: 

1965 _patch._active_patches.remove(self) 

1966 except ValueError: 

1967 # If the patch hasn't been started this will fail 

1968 return None 

1969 

1970 return self.__exit__(None, None, None) 

1971 

1972 

1973def _clear_dict(in_dict): 

1974 try: 

1975 in_dict.clear() 

1976 except AttributeError: 

1977 keys = list(in_dict) 

1978 for key in keys: 

1979 del in_dict[key] 

1980 

1981 

1982def _patch_stopall(): 

1983 """Stop all active patches. LIFO to unroll nested patches.""" 

1984 for patch in reversed(_patch._active_patches): 

1985 patch.stop() 

1986 

1987 

1988patch.object = _patch_object 

1989patch.dict = _patch_dict 

1990patch.multiple = _patch_multiple 

1991patch.stopall = _patch_stopall 

1992patch.TEST_PREFIX = 'test' 

1993 

1994magic_methods = ( 

1995 "lt le gt ge eq ne " 

1996 "getitem setitem delitem " 

1997 "len contains iter " 

1998 "hash str sizeof " 

1999 "enter exit " 

2000 # we added divmod and rdivmod here instead of numerics 

2001 # because there is no idivmod 

2002 "divmod rdivmod neg pos abs invert " 

2003 "complex int float index " 

2004 "round trunc floor ceil " 

2005 "bool next " 

2006 "fspath " 

2007 "aiter " 

2008) 

2009 

2010if IS_PYPY: 

2011 # PyPy has no __sizeof__: http://doc.pypy.org/en/latest/cpython_differences.html 

2012 magic_methods = magic_methods.replace('sizeof ', '') 

2013 

2014numerics = ( 

2015 "add sub mul matmul truediv floordiv mod lshift rshift and xor or pow" 

2016) 

2017inplace = ' '.join('i%s' % n for n in numerics.split()) 

2018right = ' '.join('r%s' % n for n in numerics.split()) 

2019 

2020# not including __prepare__, __instancecheck__, __subclasscheck__ 

2021# (as they are metaclass methods) 

2022# __del__ is not supported at all as it causes problems if it exists 

2023 

2024_non_defaults = { 

2025 '__get__', '__set__', '__delete__', '__reversed__', '__missing__', 

2026 '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__', 

2027 '__getstate__', '__setstate__', '__getformat__', 

2028 '__repr__', '__dir__', '__subclasses__', '__format__', 

2029 '__getnewargs_ex__', 

2030} 

2031 

2032 

2033def _get_method(name, func): 

2034 "Turns a callable object (like a mock) into a real function" 

2035 def method(self, *args, **kw): 

2036 return func(self, *args, **kw) 

2037 method.__name__ = name 

2038 return method 

2039 

2040 

2041_magics = { 

2042 '__%s__' % method for method in 

2043 ' '.join([magic_methods, numerics, inplace, right]).split() 

2044} 

2045 

2046# Magic methods used for async `with` statements 

2047_async_method_magics = {"__aenter__", "__aexit__", "__anext__"} 

2048# Magic methods that are only used with async calls but are synchronous functions themselves 

2049_sync_async_magics = {"__aiter__"} 

2050_async_magics = _async_method_magics | _sync_async_magics 

2051 

2052_all_sync_magics = _magics | _non_defaults 

2053_all_magics = _all_sync_magics | _async_magics 

2054 

2055_unsupported_magics = { 

2056 '__getattr__', '__setattr__', 

2057 '__init__', '__new__', '__prepare__', 

2058 '__instancecheck__', '__subclasscheck__', 

2059 '__del__' 

2060} 

2061 

2062_calculate_return_value = { 

2063 '__hash__': lambda self: object.__hash__(self), 

2064 '__str__': lambda self: object.__str__(self), 

2065 '__sizeof__': lambda self: object.__sizeof__(self), 

2066 '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}", 

2067} 

2068 

2069_return_values = { 

2070 '__lt__': NotImplemented, 

2071 '__gt__': NotImplemented, 

2072 '__le__': NotImplemented, 

2073 '__ge__': NotImplemented, 

2074 '__int__': 1, 

2075 '__contains__': False, 

2076 '__len__': 0, 

2077 '__exit__': False, 

2078 '__complex__': 1j, 

2079 '__float__': 1.0, 

2080 '__bool__': True, 

2081 '__index__': 1, 

2082 '__aexit__': False, 

2083} 

2084 

2085 

2086def _get_eq(self): 

2087 def __eq__(other): 

2088 ret_val = self.__eq__._mock_return_value 

2089 if ret_val is not DEFAULT: 

2090 return ret_val 

2091 if self is other: 

2092 return True 

2093 return NotImplemented 

2094 return __eq__ 

2095 

2096def _get_ne(self): 

2097 def __ne__(other): 

2098 if self.__ne__._mock_return_value is not DEFAULT: 

2099 return DEFAULT 

2100 if self is other: 

2101 return False 

2102 return NotImplemented 

2103 return __ne__ 

2104 

2105def _get_iter(self): 

2106 def __iter__(): 

2107 ret_val = self.__iter__._mock_return_value 

2108 if ret_val is DEFAULT: 

2109 return iter([]) 

2110 # if ret_val was already an iterator, then calling iter on it should 

2111 # return the iterator unchanged 

2112 return iter(ret_val) 

2113 return __iter__ 

2114 

2115def _get_async_iter(self): 

2116 def __aiter__(): 

2117 ret_val = self.__aiter__._mock_return_value 

2118 if ret_val is DEFAULT: 

2119 return _AsyncIterator(iter([])) 

2120 return _AsyncIterator(iter(ret_val)) 

2121 return __aiter__ 

2122 

2123_side_effect_methods = { 

2124 '__eq__': _get_eq, 

2125 '__ne__': _get_ne, 

2126 '__iter__': _get_iter, 

2127 '__aiter__': _get_async_iter 

2128} 

2129 

2130 

2131 

2132def _set_return_value(mock, method, name): 

2133 fixed = _return_values.get(name, DEFAULT) 

2134 if fixed is not DEFAULT: 

2135 method.return_value = fixed 

2136 return 

2137 

2138 return_calculator = _calculate_return_value.get(name) 

2139 if return_calculator is not None: 

2140 return_value = return_calculator(mock) 

2141 method.return_value = return_value 

2142 return 

2143 

2144 side_effector = _side_effect_methods.get(name) 

2145 if side_effector is not None: 

2146 method.side_effect = side_effector(mock) 

2147 

2148 

2149 

2150class MagicMixin(Base): 

2151 def __init__(self, *args, **kw): 

2152 self._mock_set_magics() # make magic work for kwargs in init 

2153 _safe_super(MagicMixin, self).__init__(*args, **kw) 

2154 self._mock_set_magics() # fix magic broken by upper level init 

2155 

2156 

2157 def _mock_set_magics(self): 

2158 orig_magics = _magics | _async_method_magics 

2159 these_magics = orig_magics 

2160 

2161 if getattr(self, "_mock_methods", None) is not None: 

2162 these_magics = orig_magics.intersection(self._mock_methods) 

2163 

2164 remove_magics = set() 

2165 remove_magics = orig_magics - these_magics 

2166 

2167 for entry in remove_magics: 

2168 if entry in type(self).__dict__: 

2169 # remove unneeded magic methods 

2170 delattr(self, entry) 

2171 

2172 # don't overwrite existing attributes if called a second time 

2173 these_magics = these_magics - set(type(self).__dict__) 

2174 

2175 _type = type(self) 

2176 for entry in these_magics: 

2177 setattr(_type, entry, MagicProxy(entry, self)) 

2178 

2179 

2180 

2181class NonCallableMagicMock(MagicMixin, NonCallableMock): 

2182 """A version of `MagicMock` that isn't callable.""" 

2183 def mock_add_spec(self, spec, spec_set=False): 

2184 """Add a spec to a mock. `spec` can either be an object or a 

2185 list of strings. Only attributes on the `spec` can be fetched as 

2186 attributes from the mock. 

2187 

2188 If `spec_set` is True then only attributes on the spec can be set.""" 

2189 self._mock_add_spec(spec, spec_set) 

2190 self._mock_set_magics() 

2191 

2192 

2193class AsyncMagicMixin(MagicMixin): 

2194 pass 

2195 

2196 

2197class MagicMock(MagicMixin, Mock): 

2198 """ 

2199 MagicMock is a subclass of Mock with default implementations 

2200 of most of the magic methods. You can use MagicMock without having to 

2201 configure the magic methods yourself. 

2202 

2203 If you use the `spec` or `spec_set` arguments then *only* magic 

2204 methods that exist in the spec will be created. 

2205 

2206 Attributes and the return value of a `MagicMock` will also be `MagicMocks`. 

2207 """ 

2208 def mock_add_spec(self, spec, spec_set=False): 

2209 """Add a spec to a mock. `spec` can either be an object or a 

2210 list of strings. Only attributes on the `spec` can be fetched as 

2211 attributes from the mock. 

2212 

2213 If `spec_set` is True then only attributes on the spec can be set.""" 

2214 self._mock_add_spec(spec, spec_set) 

2215 self._mock_set_magics() 

2216 

2217 

2218 

2219class MagicProxy(Base): 

2220 def __init__(self, name, parent): 

2221 self.name = name 

2222 self.parent = parent 

2223 

2224 def create_mock(self): 

2225 entry = self.name 

2226 parent = self.parent 

2227 m = parent._get_child_mock(name=entry, _new_name=entry, 

2228 _new_parent=parent) 

2229 setattr(parent, entry, m) 

2230 _set_return_value(parent, m, entry) 

2231 return m 

2232 

2233 def __get__(self, obj, _type=None): 

2234 return self.create_mock() 

2235 

2236 

2237_CODE_ATTRS = dir(CodeType) 

2238_CODE_SIG = inspect.signature(partial(CodeType.__init__, None)) 

2239 

2240 

2241class AsyncMockMixin(Base): 

2242 await_count = _delegating_property('await_count') 

2243 await_args = _delegating_property('await_args') 

2244 await_args_list = _delegating_property('await_args_list') 

2245 

2246 def __init__(self, *args, **kwargs): 

2247 super().__init__(*args, **kwargs) 

2248 # iscoroutinefunction() checks _is_coroutine property to say if an 

2249 # object is a coroutine. Without this check it looks to see if it is a 

2250 # function/method, which in this case it is not (since it is an 

2251 # AsyncMock). 

2252 # It is set through __dict__ because when spec_set is True, this 

2253 # attribute is likely undefined. 

2254 self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine 

2255 self.__dict__['_mock_await_count'] = 0 

2256 self.__dict__['_mock_await_args'] = None 

2257 self.__dict__['_mock_await_args_list'] = _CallList() 

2258 code_mock = NonCallableMock(spec_set=_CODE_ATTRS) 

2259 code_mock.__dict__["_spec_class"] = CodeType 

2260 code_mock.__dict__["_spec_signature"] = _CODE_SIG 

2261 code_mock.co_flags = ( 

2262 inspect.CO_COROUTINE 

2263 + inspect.CO_VARARGS 

2264 + inspect.CO_VARKEYWORDS 

2265 ) 

2266 code_mock.co_argcount = 0 

2267 code_mock.co_varnames = ('args', 'kwargs') 

2268 try: 

2269 code_mock.co_posonlyargcount = 0 

2270 except AttributeError: 

2271 # Python 3.7 and earlier. 

2272 pass 

2273 code_mock.co_kwonlyargcount = 0 

2274 self.__dict__['__code__'] = code_mock 

2275 self.__dict__['__name__'] = 'AsyncMock' 

2276 self.__dict__['__defaults__'] = tuple() 

2277 self.__dict__['__kwdefaults__'] = {} 

2278 self.__dict__['__annotations__'] = None 

2279 

2280 async def _execute_mock_call(_mock_self, *args, **kwargs): 

2281 self = _mock_self 

2282 # This is nearly just like super(), except for special handling 

2283 # of coroutines 

2284 

2285 _call = _Call((args, kwargs), two=True) 

2286 self.await_count += 1 

2287 self.await_args = _call 

2288 self.await_args_list.append(_call) 

2289 

2290 effect = self.side_effect 

2291 if effect is not None: 

2292 if _is_exception(effect): 

2293 raise effect 

2294 elif not _callable(effect): 

2295 try: 

2296 result = next(effect) 

2297 except StopIteration: 

2298 # It is impossible to propagate a StopIteration 

2299 # through coroutines because of PEP 479 

2300 raise StopAsyncIteration 

2301 if _is_exception(result): 

2302 raise result 

2303 elif iscoroutinefunction(effect): 

2304 result = await effect(*args, **kwargs) 

2305 else: 

2306 result = effect(*args, **kwargs) 

2307 

2308 if result is not DEFAULT: 

2309 return result 

2310 

2311 if self._mock_return_value is not DEFAULT: 

2312 return self.return_value 

2313 

2314 if self._mock_wraps is not None: 

2315 if iscoroutinefunction(self._mock_wraps): 

2316 return await self._mock_wraps(*args, **kwargs) 

2317 return self._mock_wraps(*args, **kwargs) 

2318 

2319 return self.return_value 

2320 

2321 def assert_awaited(_mock_self): 

2322 """ 

2323 Assert that the mock was awaited at least once. 

2324 """ 

2325 self = _mock_self 

2326 if self.await_count == 0: 

2327 msg = f"Expected {self._mock_name or 'mock'} to have been awaited." 

2328 raise AssertionError(msg) 

2329 

2330 def assert_awaited_once(_mock_self): 

2331 """ 

2332 Assert that the mock was awaited exactly once. 

2333 """ 

2334 self = _mock_self 

2335 if not self.await_count == 1: 

2336 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 

2337 f" Awaited {self.await_count} times.") 

2338 raise AssertionError(msg) 

2339 

2340 def assert_awaited_with(_mock_self, *args, **kwargs): 

2341 """ 

2342 Assert that the last await was with the specified arguments. 

2343 """ 

2344 self = _mock_self 

2345 if self.await_args is None: 

2346 expected = self._format_mock_call_signature(args, kwargs) 

2347 raise AssertionError(f'Expected await: {expected}\nNot awaited') 

2348 

2349 def _error_message(): 

2350 msg = self._format_mock_failure_message(args, kwargs, action='await') 

2351 return msg 

2352 

2353 expected = self._call_matcher(_Call((args, kwargs), two=True)) 

2354 actual = self._call_matcher(self.await_args) 

2355 if actual != expected: 

2356 cause = expected if isinstance(expected, Exception) else None 

2357 raise AssertionError(_error_message()) from cause 

2358 

2359 def assert_awaited_once_with(_mock_self, *args, **kwargs): 

2360 """ 

2361 Assert that the mock was awaited exactly once and with the specified 

2362 arguments. 

2363 """ 

2364 self = _mock_self 

2365 if not self.await_count == 1: 

2366 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once." 

2367 f" Awaited {self.await_count} times.") 

2368 raise AssertionError(msg) 

2369 return self.assert_awaited_with(*args, **kwargs) 

2370 

2371 def assert_any_await(_mock_self, *args, **kwargs): 

2372 """ 

2373 Assert the mock has ever been awaited with the specified arguments. 

2374 """ 

2375 self = _mock_self 

2376 expected = self._call_matcher(_Call((args, kwargs), two=True)) 

2377 cause = expected if isinstance(expected, Exception) else None 

2378 actual = [self._call_matcher(c) for c in self.await_args_list] 

2379 if cause or expected not in _AnyComparer(actual): 

2380 expected_string = self._format_mock_call_signature(args, kwargs) 

2381 raise AssertionError( 

2382 '%s await not found' % expected_string 

2383 ) from cause 

2384 

2385 def assert_has_awaits(_mock_self, calls, any_order=False): 

2386 """ 

2387 Assert the mock has been awaited with the specified calls. 

2388 The :attr:`await_args_list` list is checked for the awaits. 

2389 

2390 If `any_order` is False (the default) then the awaits must be 

2391 sequential. There can be extra calls before or after the 

2392 specified awaits. 

2393 

2394 If `any_order` is True then the awaits can be in any order, but 

2395 they must all appear in :attr:`await_args_list`. 

2396 """ 

2397 self = _mock_self 

2398 expected = [self._call_matcher(c) for c in calls] 

2399 cause = cause = next((e for e in expected if isinstance(e, Exception)), None) 

2400 all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list) 

2401 if not any_order: 

2402 if expected not in all_awaits: 

2403 if cause is None: 

2404 problem = 'Awaits not found.' 

2405 else: 

2406 problem = ('Error processing expected awaits.\n' 

2407 'Errors: {}').format( 

2408 [e if isinstance(e, Exception) else None 

2409 for e in expected]) 

2410 raise AssertionError( 

2411 f'{problem}\n' 

2412 f'Expected: {_CallList(calls)}\n' 

2413 f'Actual: {self.await_args_list}' 

2414 ) from cause 

2415 return 

2416 

2417 all_awaits = list(all_awaits) 

2418 

2419 not_found = [] 

2420 for kall in expected: 

2421 try: 

2422 all_awaits.remove(kall) 

2423 except ValueError: 

2424 not_found.append(kall) 

2425 if not_found: 

2426 raise AssertionError( 

2427 '%r not all found in await list' % (tuple(not_found),) 

2428 ) from cause 

2429 

2430 def assert_not_awaited(_mock_self): 

2431 """ 

2432 Assert that the mock was never awaited. 

2433 """ 

2434 self = _mock_self 

2435 if self.await_count != 0: 

2436 msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited." 

2437 f" Awaited {self.await_count} times.") 

2438 raise AssertionError(msg) 

2439 

2440 def reset_mock(self, *args, **kwargs): 

2441 """ 

2442 See :func:`.Mock.reset_mock()` 

2443 """ 

2444 super().reset_mock(*args, **kwargs) 

2445 self.await_count = 0 

2446 self.await_args = None 

2447 self.await_args_list = _CallList() 

2448 

2449 

2450class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock): 

2451 """ 

2452 Enhance :class:`Mock` with features allowing to mock 

2453 an async function. 

2454 

2455 The :class:`AsyncMock` object will behave so the object is 

2456 recognized as an async function, and the result of a call is an awaitable: 

2457 

2458 >>> mock = AsyncMock() 

2459 >>> iscoroutinefunction(mock) 

2460 True 

2461 >>> inspect.isawaitable(mock()) 

2462 True 

2463 

2464 

2465 The result of ``mock()`` is an async function which will have the outcome 

2466 of ``side_effect`` or ``return_value``: 

2467 

2468 - if ``side_effect`` is a function, the async function will return the 

2469 result of that function, 

2470 - if ``side_effect`` is an exception, the async function will raise the 

2471 exception, 

2472 - if ``side_effect`` is an iterable, the async function will return the 

2473 next value of the iterable, however, if the sequence of result is 

2474 exhausted, ``StopIteration`` is raised immediately, 

2475 - if ``side_effect`` is not defined, the async function will return the 

2476 value defined by ``return_value``, hence, by default, the async function 

2477 returns a new :class:`AsyncMock` object. 

2478 

2479 If the outcome of ``side_effect`` or ``return_value`` is an async function, 

2480 the mock async function obtained when the mock object is called will be this 

2481 async function itself (and not an async function returning an async 

2482 function). 

2483 

2484 The test author can also specify a wrapped object with ``wraps``. In this 

2485 case, the :class:`Mock` object behavior is the same as with an 

2486 :class:`.Mock` object: the wrapped object may have methods 

2487 defined as async function functions. 

2488 

2489 Based on Martin Richard's asynctest project. 

2490 """ 

2491 

2492 

2493class _ANY(object): 

2494 "A helper object that compares equal to everything." 

2495 

2496 def __eq__(self, other): 

2497 return True 

2498 

2499 def __ne__(self, other): 

2500 return False 

2501 

2502 def __repr__(self): 

2503 return '<ANY>' 

2504 

2505ANY = _ANY() 

2506 

2507 

2508 

2509def _format_call_signature(name, args, kwargs): 

2510 message = '%s(%%s)' % name 

2511 formatted_args = '' 

2512 args_string = ', '.join([repr(arg) for arg in args]) 

2513 kwargs_string = ', '.join([ 

2514 '%s=%r' % (key, value) for key, value in kwargs.items() 

2515 ]) 

2516 if args_string: 

2517 formatted_args = args_string 

2518 if kwargs_string: 

2519 if formatted_args: 

2520 formatted_args += ', ' 

2521 formatted_args += kwargs_string 

2522 

2523 return message % formatted_args 

2524 

2525 

2526 

2527class _Call(tuple): 

2528 """ 

2529 A tuple for holding the results of a call to a mock, either in the form 

2530 `(args, kwargs)` or `(name, args, kwargs)`. 

2531 

2532 If args or kwargs are empty then a call tuple will compare equal to 

2533 a tuple without those values. This makes comparisons less verbose:: 

2534 

2535 _Call(('name', (), {})) == ('name',) 

2536 _Call(('name', (1,), {})) == ('name', (1,)) 

2537 _Call(((), {'a': 'b'})) == ({'a': 'b'},) 

2538 

2539 The `_Call` object provides a useful shortcut for comparing with call:: 

2540 

2541 _Call(((1, 2), {'a': 3})) == call(1, 2, a=3) 

2542 _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3) 

2543 

2544 If the _Call has no name then it will match any name. 

2545 """ 

2546 def __new__(cls, value=(), name='', parent=None, two=False, 

2547 from_kall=True): 

2548 args = () 

2549 kwargs = {} 

2550 _len = len(value) 

2551 if _len == 3: 

2552 name, args, kwargs = value 

2553 elif _len == 2: 

2554 first, second = value 

2555 if isinstance(first, str): 

2556 name = first 

2557 if isinstance(second, tuple): 

2558 args = second 

2559 else: 

2560 kwargs = second 

2561 else: 

2562 args, kwargs = first, second 

2563 elif _len == 1: 

2564 value, = value 

2565 if isinstance(value, str): 

2566 name = value 

2567 elif isinstance(value, tuple): 

2568 args = value 

2569 else: 

2570 kwargs = value 

2571 

2572 if two: 

2573 return tuple.__new__(cls, (args, kwargs)) 

2574 

2575 return tuple.__new__(cls, (name, args, kwargs)) 

2576 

2577 

2578 def __init__(self, value=(), name=None, parent=None, two=False, 

2579 from_kall=True): 

2580 self._mock_name = name 

2581 self._mock_parent = parent 

2582 self._mock_from_kall = from_kall 

2583 

2584 

2585 def __eq__(self, other): 

2586 try: 

2587 len_other = len(other) 

2588 except TypeError: 

2589 return NotImplemented 

2590 

2591 self_name = '' 

2592 if len(self) == 2: 

2593 self_args, self_kwargs = self 

2594 else: 

2595 self_name, self_args, self_kwargs = self 

2596 

2597 if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None) 

2598 and self._mock_parent != other._mock_parent): 

2599 return False 

2600 

2601 other_name = '' 

2602 if len_other == 0: 

2603 other_args, other_kwargs = (), {} 

2604 elif len_other == 3: 

2605 other_name, other_args, other_kwargs = other 

2606 elif len_other == 1: 

2607 value, = other 

2608 if isinstance(value, tuple): 

2609 other_args = value 

2610 other_kwargs = {} 

2611 elif isinstance(value, str): 

2612 other_name = value 

2613 other_args, other_kwargs = (), {} 

2614 else: 

2615 other_args = () 

2616 other_kwargs = value 

2617 elif len_other == 2: 

2618 # could be (name, args) or (name, kwargs) or (args, kwargs) 

2619 first, second = other 

2620 if isinstance(first, str): 

2621 other_name = first 

2622 if isinstance(second, tuple): 

2623 other_args, other_kwargs = second, {} 

2624 else: 

2625 other_args, other_kwargs = (), second 

2626 else: 

2627 other_args, other_kwargs = first, second 

2628 else: 

2629 return False 

2630 

2631 if self_name and other_name != self_name: 

2632 return False 

2633 

2634 # this order is important for ANY to work! 

2635 return (other_args, other_kwargs) == (self_args, self_kwargs) 

2636 

2637 

2638 __ne__ = object.__ne__ 

2639 

2640 

2641 def __call__(self, *args, **kwargs): 

2642 if self._mock_name is None: 

2643 return _Call(('', args, kwargs), name='()') 

2644 

2645 name = self._mock_name + '()' 

2646 return _Call((self._mock_name, args, kwargs), name=name, parent=self) 

2647 

2648 

2649 def __getattr__(self, attr): 

2650 if self._mock_name is None: 

2651 return _Call(name=attr, from_kall=False) 

2652 name = '%s.%s' % (self._mock_name, attr) 

2653 return _Call(name=name, parent=self, from_kall=False) 

2654 

2655 

2656 def __getattribute__(self, attr): 

2657 if attr in tuple.__dict__: 

2658 raise AttributeError 

2659 return tuple.__getattribute__(self, attr) 

2660 

2661 

2662 def _get_call_arguments(self): 

2663 if len(self) == 2: 

2664 args, kwargs = self 

2665 else: 

2666 name, args, kwargs = self 

2667 

2668 return args, kwargs 

2669 

2670 @property 

2671 def args(self): 

2672 return self._get_call_arguments()[0] 

2673 

2674 @property 

2675 def kwargs(self): 

2676 return self._get_call_arguments()[1] 

2677 

2678 def __repr__(self): 

2679 if not self._mock_from_kall: 

2680 name = self._mock_name or 'call' 

2681 if name.startswith('()'): 

2682 name = 'call%s' % name 

2683 return name 

2684 

2685 if len(self) == 2: 

2686 name = 'call' 

2687 args, kwargs = self 

2688 else: 

2689 name, args, kwargs = self 

2690 if not name: 

2691 name = 'call' 

2692 elif not name.startswith('()'): 

2693 name = 'call.%s' % name 

2694 else: 

2695 name = 'call%s' % name 

2696 return _format_call_signature(name, args, kwargs) 

2697 

2698 

2699 def call_list(self): 

2700 """For a call object that represents multiple calls, `call_list` 

2701 returns a list of all the intermediate calls as well as the 

2702 final call.""" 

2703 vals = [] 

2704 thing = self 

2705 while thing is not None: 

2706 if thing._mock_from_kall: 

2707 vals.append(thing) 

2708 thing = thing._mock_parent 

2709 return _CallList(reversed(vals)) 

2710 

2711 

2712call = _Call(from_kall=False) 

2713 

2714 

2715def create_autospec(spec, spec_set=False, instance=False, _parent=None, 

2716 _name=None, *, unsafe=False, **kwargs): 

2717 """Create a mock object using another object as a spec. Attributes on the 

2718 mock will use the corresponding attribute on the `spec` object as their 

2719 spec. 

2720 

2721 Functions or methods being mocked will have their arguments checked 

2722 to check that they are called with the correct signature. 

2723 

2724 If `spec_set` is True then attempting to set attributes that don't exist 

2725 on the spec object will raise an `AttributeError`. 

2726 

2727 If a class is used as a spec then the return value of the mock (the 

2728 instance of the class) will have the same spec. You can use a class as the 

2729 spec for an instance object by passing `instance=True`. The returned mock 

2730 will only be callable if instances of the mock are callable. 

2731 

2732 `create_autospec` will raise a `RuntimeError` if passed some common 

2733 misspellings of the arguments autospec and spec_set. Pass the argument 

2734 `unsafe` with the value True to disable that check. 

2735 

2736 `create_autospec` also takes arbitrary keyword arguments that are passed to 

2737 the constructor of the created mock.""" 

2738 if _is_list(spec): 

2739 # can't pass a list instance to the mock constructor as it will be 

2740 # interpreted as a list of strings 

2741 spec = type(spec) 

2742 

2743 is_type = isinstance(spec, type) 

2744 if _is_instance_mock(spec): 

2745 raise InvalidSpecError(f'Cannot autospec a Mock object. ' 

2746 f'[object={spec!r}]') 

2747 is_async_func = _is_async_func(spec) 

2748 _kwargs = {'spec': spec} 

2749 if spec_set: 

2750 _kwargs = {'spec_set': spec} 

2751 elif spec is None: 

2752 # None we mock with a normal mock without a spec 

2753 _kwargs = {} 

2754 if _kwargs and instance: 

2755 _kwargs['_spec_as_instance'] = True 

2756 if not unsafe: 

2757 _check_spec_arg_typos(kwargs) 

2758 

2759 _kwargs.update(kwargs) 

2760 

2761 Klass = MagicMock 

2762 if inspect.isdatadescriptor(spec): 

2763 # descriptors don't have a spec 

2764 # because we don't know what type they return 

2765 _kwargs = {} 

2766 elif is_async_func: 

2767 if instance: 

2768 raise RuntimeError("Instance can not be True when create_autospec " 

2769 "is mocking an async function") 

2770 Klass = AsyncMock 

2771 elif not _callable(spec): 

2772 Klass = NonCallableMagicMock 

2773 elif is_type and instance and not _instance_callable(spec): 

2774 Klass = NonCallableMagicMock 

2775 

2776 _name = _kwargs.pop('name', _name) 

2777 

2778 _new_name = _name 

2779 if _parent is None: 

2780 # for a top level object no _new_name should be set 

2781 _new_name = '' 

2782 

2783 mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name, 

2784 name=_name, **_kwargs) 

2785 

2786 if isinstance(spec, FunctionTypes): 

2787 # should only happen at the top level because we don't 

2788 # recurse for functions 

2789 mock = _set_signature(mock, spec) 

2790 if is_async_func: 

2791 _setup_async_mock(mock) 

2792 else: 

2793 _check_signature(spec, mock, is_type, instance) 

2794 

2795 if _parent is not None and not instance: 

2796 _parent._mock_children[_name] = mock 

2797 

2798 if is_type and not instance and 'return_value' not in kwargs: 

2799 mock.return_value = create_autospec(spec, spec_set, instance=True, 

2800 _name='()', _parent=mock) 

2801 

2802 for entry in dir(spec): 

2803 if _is_magic(entry): 

2804 # MagicMock already does the useful magic methods for us 

2805 continue 

2806 

2807 # XXXX do we need a better way of getting attributes without 

2808 # triggering code execution (?) Probably not - we need the actual 

2809 # object to mock it so we would rather trigger a property than mock 

2810 # the property descriptor. Likewise we want to mock out dynamically 

2811 # provided attributes. 

2812 # XXXX what about attributes that raise exceptions other than 

2813 # AttributeError on being fetched? 

2814 # we could be resilient against it, or catch and propagate the 

2815 # exception when the attribute is fetched from the mock 

2816 try: 

2817 original = getattr(spec, entry) 

2818 except AttributeError: 

2819 continue 

2820 

2821 kwargs = {'spec': original} 

2822 if spec_set: 

2823 kwargs = {'spec_set': original} 

2824 

2825 if not isinstance(original, FunctionTypes): 

2826 new = _SpecState(original, spec_set, mock, entry, instance) 

2827 mock._mock_children[entry] = new 

2828 else: 

2829 parent = mock 

2830 if isinstance(spec, FunctionTypes): 

2831 parent = mock.mock 

2832 

2833 skipfirst = _must_skip(spec, entry, is_type) 

2834 kwargs['_eat_self'] = skipfirst 

2835 if iscoroutinefunction(original): 

2836 child_klass = AsyncMock 

2837 else: 

2838 child_klass = MagicMock 

2839 new = child_klass(parent=parent, name=entry, _new_name=entry, 

2840 _new_parent=parent, 

2841 **kwargs) 

2842 mock._mock_children[entry] = new 

2843 new.return_value = child_klass() 

2844 _check_signature(original, new, skipfirst=skipfirst) 

2845 

2846 # so functions created with _set_signature become instance attributes, 

2847 # *plus* their underlying mock exists in _mock_children of the parent 

2848 # mock. Adding to _mock_children may be unnecessary where we are also 

2849 # setting as an instance attribute? 

2850 if isinstance(new, FunctionTypes): 

2851 setattr(mock, entry, new) 

2852 

2853 return mock 

2854 

2855 

2856def _must_skip(spec, entry, is_type): 

2857 """ 

2858 Return whether we should skip the first argument on spec's `entry` 

2859 attribute. 

2860 """ 

2861 if not isinstance(spec, type): 

2862 if entry in getattr(spec, '__dict__', {}): 

2863 # instance attribute - shouldn't skip 

2864 return False 

2865 spec = spec.__class__ 

2866 

2867 for klass in spec.__mro__: 

2868 result = klass.__dict__.get(entry, DEFAULT) 

2869 if result is DEFAULT: 

2870 continue 

2871 if isinstance(result, (staticmethod, classmethod)): 

2872 return False 

2873 elif isinstance(result, FunctionTypes): 

2874 # Normal method => skip if looked up on type 

2875 # (if looked up on instance, self is already skipped) 

2876 return is_type 

2877 else: 

2878 return False 

2879 

2880 # function is a dynamically provided attribute 

2881 return is_type 

2882 

2883 

2884class _SpecState(object): 

2885 

2886 def __init__(self, spec, spec_set=False, parent=None, 

2887 name=None, ids=None, instance=False): 

2888 self.spec = spec 

2889 self.ids = ids 

2890 self.spec_set = spec_set 

2891 self.parent = parent 

2892 self.instance = instance 

2893 self.name = name 

2894 

2895 

2896FunctionTypes = ( 

2897 # python function 

2898 type(create_autospec), 

2899 # instance method 

2900 type(ANY.__eq__), 

2901) 

2902 

2903 

2904file_spec = None 

2905open_spec = None 

2906 

2907 

2908def _to_stream(read_data): 

2909 if isinstance(read_data, bytes): 

2910 return io.BytesIO(read_data) 

2911 else: 

2912 return io.StringIO(read_data) 

2913 

2914 

2915def mock_open(mock=None, read_data=''): 

2916 """ 

2917 A helper function to create a mock to replace the use of `open`. It works 

2918 for `open` called directly or used as a context manager. 

2919 

2920 The `mock` argument is the mock object to configure. If `None` (the 

2921 default) then a `MagicMock` will be created for you, with the API limited 

2922 to methods or attributes available on standard file handles. 

2923 

2924 `read_data` is a string for the `read`, `readline` and `readlines` of the 

2925 file handle to return. This is an empty string by default. 

2926 """ 

2927 _read_data = _to_stream(read_data) 

2928 _state = [_read_data, None] 

2929 

2930 def _readlines_side_effect(*args, **kwargs): 

2931 if handle.readlines.return_value is not None: 

2932 return handle.readlines.return_value 

2933 return _state[0].readlines(*args, **kwargs) 

2934 

2935 def _read_side_effect(*args, **kwargs): 

2936 if handle.read.return_value is not None: 

2937 return handle.read.return_value 

2938 return _state[0].read(*args, **kwargs) 

2939 

2940 def _readline_side_effect(*args, **kwargs): 

2941 yield from _iter_side_effect() 

2942 while True: 

2943 yield _state[0].readline(*args, **kwargs) 

2944 

2945 def _iter_side_effect(): 

2946 if handle.readline.return_value is not None: 

2947 while True: 

2948 yield handle.readline.return_value 

2949 for line in _state[0]: 

2950 yield line 

2951 

2952 def _next_side_effect(): 

2953 if handle.readline.return_value is not None: 

2954 return handle.readline.return_value 

2955 return next(_state[0]) 

2956 

2957 global file_spec 

2958 if file_spec is None: 

2959 import _io 

2960 file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO)))) 

2961 

2962 global open_spec 

2963 if open_spec is None: 

2964 import _io 

2965 open_spec = list(set(dir(_io.open))) 

2966 if mock is None: 

2967 mock = MagicMock(name='open', spec=open_spec) 

2968 

2969 handle = MagicMock(spec=file_spec) 

2970 handle.__enter__.return_value = handle 

2971 

2972 handle.write.return_value = None 

2973 handle.read.return_value = None 

2974 handle.readline.return_value = None 

2975 handle.readlines.return_value = None 

2976 

2977 handle.read.side_effect = _read_side_effect 

2978 _state[1] = _readline_side_effect() 

2979 handle.readline.side_effect = _state[1] 

2980 handle.readlines.side_effect = _readlines_side_effect 

2981 handle.__iter__.side_effect = _iter_side_effect 

2982 handle.__next__.side_effect = _next_side_effect 

2983 

2984 def reset_data(*args, **kwargs): 

2985 _state[0] = _to_stream(read_data) 

2986 if handle.readline.side_effect == _state[1]: 

2987 # Only reset the side effect if the user hasn't overridden it. 

2988 _state[1] = _readline_side_effect() 

2989 handle.readline.side_effect = _state[1] 

2990 return DEFAULT 

2991 

2992 mock.side_effect = reset_data 

2993 mock.return_value = handle 

2994 return mock 

2995 

2996 

2997class PropertyMock(Mock): 

2998 """ 

2999 A mock intended to be used as a property, or other descriptor, on a class. 

3000 `PropertyMock` provides `__get__` and `__set__` methods so you can specify 

3001 a return value when it is fetched. 

3002 

3003 Fetching a `PropertyMock` instance from an object calls the mock, with 

3004 no args. Setting it calls the mock with the value being set. 

3005 """ 

3006 def _get_child_mock(self, **kwargs): 

3007 return MagicMock(**kwargs) 

3008 

3009 def __get__(self, obj, obj_type=None): 

3010 return self() 

3011 def __set__(self, obj, val): 

3012 self(val) 

3013 

3014 

3015def seal(mock): 

3016 """Disable the automatic generation of child mocks. 

3017 

3018 Given an input Mock, seals it to ensure no further mocks will be generated 

3019 when accessing an attribute that was not already defined. 

3020 

3021 The operation recursively seals the mock passed in, meaning that 

3022 the mock itself, any mocks generated by accessing one of its attributes, 

3023 and all assigned mocks without a name or spec will be sealed. 

3024 """ 

3025 mock._mock_sealed = True 

3026 for attr in dir(mock): 

3027 try: 

3028 m = getattr(mock, attr) 

3029 except AttributeError: 

3030 continue 

3031 if not isinstance(m, NonCallableMock): 

3032 continue 

3033 if isinstance(m._mock_children.get(attr), _SpecState): 

3034 continue 

3035 if m._mock_new_parent is mock: 

3036 seal(m) 

3037 

3038 

3039class _AsyncIterator: 

3040 """ 

3041 Wraps an iterator in an asynchronous iterator. 

3042 """ 

3043 def __init__(self, iterator): 

3044 self.iterator = iterator 

3045 code_mock = NonCallableMock(spec_set=CodeType) 

3046 code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE 

3047 self.__dict__['__code__'] = code_mock 

3048 

3049 async def __anext__(self): 

3050 try: 

3051 return next(self.iterator) 

3052 except StopIteration: 

3053 pass 

3054 raise StopAsyncIteration